[jira] [Updated] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)
[ https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-33925: Fix Version/s: opensearch-2.0.0 > Extended failure handling for bulk requests (elasticsearch back port) > - > > Key: FLINK-33925 > URL: https://issues.apache.org/jira/browse/FLINK-33925 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Peter Schulz >Assignee: Peter Schulz >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.2.0, opensearch-2.0.0 > > > This is a back port of the implementation for the elasticsearch connector, > see FLINK-32028, to achieve consistent APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
dongwoo6kim commented on code in PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1601015487 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ## @@ -122,27 +122,21 @@ public RecordsWithSplitIds> fetch() throws IOExce KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics); List finishedPartitions = new ArrayList<>(); -for (TopicPartition tp : consumerRecords.partitions()) { +for (TopicPartition tp : consumer.assignment()) { long stoppingOffset = getStoppingOffset(tp); -final List> recordsFromPartition = -consumerRecords.records(tp); - -if (recordsFromPartition.size() > 0) { -final ConsumerRecord lastRecord = -recordsFromPartition.get(recordsFromPartition.size() - 1); - -// After processing a record with offset of "stoppingOffset - 1", the split reader -// should not continue fetching because the record with stoppingOffset may not -// exist. Keep polling will just block forever. -if (lastRecord.offset() >= stoppingOffset - 1) { -recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); -finishSplitAtRecord( -tp, -stoppingOffset, -lastRecord.offset(), -finishedPartitions, -recordsBySplits); -} +long consumerPosition = consumer.position(tp); +// Stop fetching when the consumer's position reaches the stoppingOffset. +// Control messages may follow the last record; therefore, using the last record's +// offset as a stopping condition could result in indefinite blocking. +if (consumerPosition >= stoppingOffset) { +LOG.debug( +"Position of {}: {}, has reached stopping offset: {}", +tp, +consumerPosition, +stoppingOffset); +recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); +finishSplitAtRecord( +tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits); } // Track this partition's record lag if it never appears before kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp); Review Comment: @LinMingQiang, thanks for the review. I've changed to track tp, only when there is record for that tp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30537) Add support for OpenSearch 2.3
[ https://issues.apache.org/jira/browse/FLINK-30537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin closed FLINK-30537. --- Resolution: Fixed Closed in favor of FLINK-33859 > Add support for OpenSearch 2.3 > -- > > Key: FLINK-30537 > URL: https://issues.apache.org/jira/browse/FLINK-30537 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Reporter: Martijn Visser >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > Create a version for Flink’s Opensearch connector that supports version 2.3. > From the ASF Flink Slack: > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1672339157102319 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34942) Support Flink 1.19, 1.20-SNAPSHOT for OpenSearch connector
[ https://issues.apache.org/jira/browse/FLINK-34942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-34942. - Fix Version/s: opensearch-1.2.0 opensearch-2.0.0 Resolution: Fixed > Support Flink 1.19, 1.20-SNAPSHOT for OpenSearch connector > -- > > Key: FLINK-34942 > URL: https://issues.apache.org/jira/browse/FLINK-34942 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: 3.1.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.2.0, opensearch-2.0.0 > > > Currently it fails with similar issue as FLINK-33493 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34942) Support Flink 1.19, 1.20-SNAPSHOT for OpenSearch connector
[ https://issues.apache.org/jira/browse/FLINK-34942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846511#comment-17846511 ] Sergey Nuyanzin commented on FLINK-34942: - Merged as [00f1a5b13bfbadcb8efce8e16fb06ddea0d8e48e|https://github.com/apache/flink-connector-opensearch/commit/00f1a5b13bfbadcb8efce8e16fb06ddea0d8e48e] > Support Flink 1.19, 1.20-SNAPSHOT for OpenSearch connector > -- > > Key: FLINK-34942 > URL: https://issues.apache.org/jira/browse/FLINK-34942 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: 3.1.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.2.0, opensearch-2.0.0 > > > Currently it fails with similar issue as FLINK-33493 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
dongwoo6kim commented on code in PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1601015487 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ## @@ -122,27 +122,21 @@ public RecordsWithSplitIds> fetch() throws IOExce KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics); List finishedPartitions = new ArrayList<>(); -for (TopicPartition tp : consumerRecords.partitions()) { +for (TopicPartition tp : consumer.assignment()) { long stoppingOffset = getStoppingOffset(tp); -final List> recordsFromPartition = -consumerRecords.records(tp); - -if (recordsFromPartition.size() > 0) { -final ConsumerRecord lastRecord = -recordsFromPartition.get(recordsFromPartition.size() - 1); - -// After processing a record with offset of "stoppingOffset - 1", the split reader -// should not continue fetching because the record with stoppingOffset may not -// exist. Keep polling will just block forever. -if (lastRecord.offset() >= stoppingOffset - 1) { -recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); -finishSplitAtRecord( -tp, -stoppingOffset, -lastRecord.offset(), -finishedPartitions, -recordsBySplits); -} +long consumerPosition = consumer.position(tp); +// Stop fetching when the consumer's position reaches the stoppingOffset. +// Control messages may follow the last record; therefore, using the last record's +// offset as a stopping condition could result in indefinite blocking. +if (consumerPosition >= stoppingOffset) { +LOG.debug( +"Position of {}: {}, has reached stopping offset: {}", +tp, +consumerPosition, +stoppingOffset); +recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); +finishSplitAtRecord( +tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits); } // Track this partition's record lag if it never appears before kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp); Review Comment: @LinMingQiang, thanks for the review. I've changed to track tp, only when records for that tp is not empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
snuyanzin commented on code in PR #102: URL: https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1601012883 ## .github/workflows/push_pr.yml: ## @@ -28,21 +28,16 @@ jobs: compile_and_test: strategy: matrix: -flink: [ 1.17.2 ] -jdk: [ '8, 11' ] -include: - - flink: 1.18.1 -jdk: '8, 11, 17' - - flink: 1.19.0 -jdk: '8, 11, 17, 21' +flink: [ 1.19.0, 1.20-SNAPSHOT ] +jdk: [ '8, 11, 17, 21' ] Review Comment: May be I didn't get your message about java 21 support, however it is mentioned in 1.19 release notes [1] Also we have nightly with jdk21 tests in Flink main repo (same as for jdk17) starting 1.19 e.g. [2] [1] https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/#beta-support-for-java-21 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59560&view=results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
snuyanzin commented on code in PR #102: URL: https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1601012883 ## .github/workflows/push_pr.yml: ## @@ -28,21 +28,16 @@ jobs: compile_and_test: strategy: matrix: -flink: [ 1.17.2 ] -jdk: [ '8, 11' ] -include: - - flink: 1.18.1 -jdk: '8, 11, 17' - - flink: 1.19.0 -jdk: '8, 11, 17, 21' +flink: [ 1.19.0, 1.20-SNAPSHOT ] +jdk: [ '8, 11, 17, 21' ] Review Comment: May be I didn't get your message about java 21 support, however it is mentioned in 1.19 release notes [1] Also we have nightly with jdk21 tests (same as for jdk17) starting 1.19 e.g. [2] [1] https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/#beta-support-for-java-21 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59560&view=results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document
[ https://issues.apache.org/jira/browse/FLINK-35357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Zhou updated FLINK-35357: -- Description: In Flink Operator "Custom Flink Resource Listeners" in practice (doc: [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource] -listeners) It was found that the "Operator Configuration Reference" document did not explain the "Custom Flink Resource Listeners" configuration parameters. So I wanted to come up with adding: kubernetes.operator.plugins.listeners..class: , after all it is useful. I want to submit a PR to optimize the document. was: In Flink Operator "Custom Flink Resource Listeners" in practice (doc: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource -listeners) It was found that the "Operator Configuration Reference" document did not explain the "Custom Flink Resource Listeners" configuration parameters. So I wanted to come up with adding: kubernetes.operator.plugins.listeners..class: , after all it is useful. > Add "kubernetes.operator.plugins.listeners" parameter description to the > Operator configuration document > > > Key: FLINK-35357 > URL: https://issues.apache.org/jira/browse/FLINK-35357 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Yang Zhou >Priority: Minor > > In Flink Operator "Custom Flink Resource Listeners" in practice (doc: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource] > -listeners) > It was found that the "Operator Configuration Reference" document did not > explain the "Custom Flink Resource Listeners" configuration parameters. > So I wanted to come up with adding: > kubernetes.operator.plugins.listeners..class: > > , after all it is useful. > I want to submit a PR to optimize the document. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846452#comment-17846452 ] Sergey Nuyanzin edited comment on FLINK-27741 at 5/15/24 5:38 AM: -- Merged to master as [40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226] 1.19: [190522c2c051e0ec05213be71fb7a59a517353b1|https://github.com/apache/flink/commit/190522c2c051e0ec05213be71fb7a59a517353b1] 1.18: [1e1a7f16b6f272334d9f9a1053b657148151a789|https://github.com/apache/flink/commit/1e1a7f16b6f272334d9f9a1053b657148151a789] was (Author: sergey nuyanzin): Merged to master as [40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226] 1.19: [190522c2c051e0ec05213be71fb7a59a517353b1|https://github.com/apache/flink/commit/190522c2c051e0ec05213be71fb7a59a517353b1] > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.util
Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]
loserwang1024 commented on PR #3319: URL: https://github.com/apache/flink-cdc/pull/3319#issuecomment-2111625519 > Could @loserwang1024 please take a look? I'd like to do it. But this PR do nothing with mysql cdc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-27741: Fix Version/s: 1.18.2 > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > at > org.apache.flink.tab
[jira] [Created] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document
Yang Zhou created FLINK-35357: - Summary: Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document Key: FLINK-35357 URL: https://issues.apache.org/jira/browse/FLINK-35357 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Yang Zhou In Flink Operator "Custom Flink Resource Listeners" in practice (doc: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource -listeners) It was found that the "Operator Configuration Reference" document did not explain the "Custom Flink Resource Listeners" configuration parameters. So I wanted to come up with adding: kubernetes.operator.plugins.listeners..class: , after all it is useful. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-27741: Fix Version/s: 1.19.1 > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.20.0, 1.19.1 > > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > at > org.apache.flink.table.plann
Re: [PR] [BP-1.18][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]
snuyanzin merged PR #24785: URL: https://github.com/apache/flink/pull/24785 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846452#comment-17846452 ] Sergey Nuyanzin edited comment on FLINK-27741 at 5/15/24 5:36 AM: -- Merged to master as [40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226] 1.19: [190522c2c051e0ec05213be71fb7a59a517353b1|https://github.com/apache/flink/commit/190522c2c051e0ec05213be71fb7a59a517353b1] was (Author: sergey nuyanzin): Merged to master as [40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226] > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.20.0 > > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUti
Re: [PR] [BP-1.19][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]
snuyanzin merged PR #24786: URL: https://github.com/apache/flink/pull/24786 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34503][table] Migrate JoinDeriveNullFilterRule to java. [flink]
liuyongvs commented on PR #24373: URL: https://github.com/apache/flink/pull/24373#issuecomment-2111617766 hi @snuyanzin will you help review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35075][table] Migrate TwoStageOptimizedAggregateRule to java [flink]
liuyongvs commented on PR #24650: URL: https://github.com/apache/flink/pull/24650#issuecomment-2111617324 hi @snuyanzin will you help review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34555][table] Migrate JoinConditionTypeCoerceRule to java. [flink]
liuyongvs commented on PR #24420: URL: https://github.com/apache/flink/pull/24420#issuecomment-2111615952 hi @snuyanzin will you help review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34109) FileSystem sink connector restore job from historical checkpoint failure
[ https://issues.apache.org/jira/browse/FLINK-34109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846490#comment-17846490 ] Sergey Paryshev commented on FLINK-34109: - bump > FileSystem sink connector restore job from historical checkpoint failure > > > Key: FLINK-34109 > URL: https://issues.apache.org/jira/browse/FLINK-34109 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.18.0, 1.16.3, 1.17.2 >Reporter: Sergey Paryshev >Priority: Minor > Labels: pull-request-available > > FileSystem connector sink with compaction setting is enabled can't restore > job from historical checkpoint (when MAX_RETAINED_CHECKPOINTS > 1 and > restroing checkpoint is not last) > {code:java} > java.io.UncheckedIOException: java.io.FileNotFoundException: File > file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1 > does not exist or the user running Flink ('user') has insufficient > permissions to access it. > at > org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:165) > ~[classes/:?] > at > org.apache.flink.connector.file.table.BinPacking.pack(BinPacking.java:40) > ~[classes/:?] > at > org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:175) > ~[classes/:?] > at java.util.HashMap.forEach(HashMap.java:1290) ~[?:1.8.0_312] > at > org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:171) > ~[classes/:?] > at > org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:153) > ~[classes/:?] > at > org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:143) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:262) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:155) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:554) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:245) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:848) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:797) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) > ~[classes/:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:747) > ~[classes/:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > ~[classes/:?] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] > Caused by: java.io.FileNotFoundException: File > file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1 > does not exist or the user running Flink ('user') has insufficient > permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:113) > ~[classes/:?] > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65) > ~[classes/:?] > at > org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > ~[classes/:?] > ... 19 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]
yuxiqian commented on PR #3319: URL: https://github.com/apache/flink-cdc/pull/3319#issuecomment-2111557242 Seems MySQL CI is taking significantly more time (90min vs. 30min) and fails eventually. Could @loserwang1024 please take a look? Failed CI job link: https://github.com/apache/flink-cdc/actions/runs/9073569533/job/24931065832 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Separation is a good idea, ok. The first part is to check if SubtaskStateMapper.FULL is used in any input (this mapper type always returns all I/O), the problem is reproduced only for this mapper type The second part is to check if the parallelism of any previous operator has changed. if it has changed, it means that the number of outputs has changed. But I made little mistake in condition. I will fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846485#comment-17846485 ] xuyang edited comment on FLINK-34380 at 5/15/24 4:01 AM: - Sorry for this late reply. The results of this repair still don’t seem to meet expectations a little. Still based on the above test, the result is following. However, the row kind of the first data should be `+I`, right? {code:java} ++---++---++ | op | a | b | a0| b0 | ++---++---++ | +U | 1 | 1 | 1 | 99 | | -U | 1 | 1 | 1 | 99 | | +U | 1 | 99 | 1 | 99 | | -D | 1 | 99 | 1 | 99 | ++---++---++ {code} was (Author: xuyangzhong): Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can you take a look to verify it again? > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Separation is a good idea, ok. The first part is to check if SubtaskStateMapper.FULL is used in any input (this mapper type always returns all I/O), the problem is reproduced only for this mapper type The second part is to check if the parallelism of any previous operator has changed. if it has changed, it means that the number of outputs has changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33790) Upsert statement filter unique key field colume in mysql dielact
[ https://issues.apache.org/jira/browse/FLINK-33790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846487#comment-17846487 ] SuDewei commented on FLINK-33790: - Hi [~jeyhunkarimov] , i think what [~lijingwei.5018] tring to say is that the MySqlDialect does not need to update the unique key in the UpsertStatement. So it is an improvement. After adding this feature, the upsert statement would not include the key fields just like the example shows. > Upsert statement filter unique key field colume in mysql dielact > - > > Key: FLINK-33790 > URL: https://issues.apache.org/jira/browse/FLINK-33790 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: JingWei Li >Priority: Minor > > example: `col2` and `col4` is unique key in table `my_table` > > {code:java} > INSERT INTO `my_table`(`col1`, `col2`, `col3`, `col4`, `col5`) > VALUES (?, ?, ?, ?, ?) > ON DUPLICATE KEY UPDATE > `col1`=VALUES(`col1`), > `col2`=VALUES(`col2`), > `col3`=VALUES(`col3`), > `col4`=VALUES(`col4`), > `col5`=VALUES(`col5`){code} > result: > {code:java} > INSERT INTO `my_table`(`col1`, `col2`, `col3`, `col4`, `col5`) > VALUES (?, ?, ?, ?, ?) > ON DUPLICATE KEY UPDATE > `col1`=VALUES(`col1`), > `col3`=VALUES(`col3`), > `col5`=VALUES(`col5`) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846485#comment-17846485 ] xuyang commented on FLINK-34380: Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can you take a look to verify it again? > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35030][runtime] Introduce Epoch Manager under async execution [flink]
yunfengzhou-hub commented on code in PR #24748: URL: https://github.com/apache/flink/pull/24748#discussion_r1600886728 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java: ## @@ -188,6 +190,35 @@ public InternalTimerService getInternalTimerService( (AsyncExecutionController) asyncExecutionController); } +@Override +public void processWatermark(Watermark mark) throws Exception { +if (!isAsyncStateProcessingEnabled()) { +super.processWatermark(mark); +return; +} +asyncExecutionController.processNonRecord(() -> super.processWatermark(mark)); Review Comment: We may need to override `processWatermark1` and `processWatermark2` as well, or we can override `processWatermark(Watermark, int)`, like that for `processWatermarkStatus`. I also understand it that this PR is mainly responsible for introducing the epoch mechanism, and that we would have another jira ticket and PR to apply epoch to all events and all cases. So it is also OK for me if you would like to make the change in the next PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
LinMingQiang commented on code in PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1600879571 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ## @@ -122,27 +122,21 @@ public RecordsWithSplitIds> fetch() throws IOExce KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics); List finishedPartitions = new ArrayList<>(); -for (TopicPartition tp : consumerRecords.partitions()) { +for (TopicPartition tp : consumer.assignment()) { long stoppingOffset = getStoppingOffset(tp); -final List> recordsFromPartition = -consumerRecords.records(tp); - -if (recordsFromPartition.size() > 0) { -final ConsumerRecord lastRecord = -recordsFromPartition.get(recordsFromPartition.size() - 1); - -// After processing a record with offset of "stoppingOffset - 1", the split reader -// should not continue fetching because the record with stoppingOffset may not -// exist. Keep polling will just block forever. -if (lastRecord.offset() >= stoppingOffset - 1) { -recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); -finishSplitAtRecord( -tp, -stoppingOffset, -lastRecord.offset(), -finishedPartitions, -recordsBySplits); -} +long consumerPosition = consumer.position(tp); +// Stop fetching when the consumer's position reaches the stoppingOffset. +// Control messages may follow the last record; therefore, using the last record's +// offset as a stopping condition could result in indefinite blocking. +if (consumerPosition >= stoppingOffset) { +LOG.debug( +"Position of {}: {}, has reached stopping offset: {}", +tp, +consumerPosition, +stoppingOffset); +recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); +finishSplitAtRecord( +tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits); } // Track this partition's record lag if it never appears before kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp); Review Comment: we do not need to track tp when consumerRecords is empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
LinMingQiang commented on code in PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1600878564 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ## @@ -122,27 +122,21 @@ public RecordsWithSplitIds> fetch() throws IOExce KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics); List finishedPartitions = new ArrayList<>(); -for (TopicPartition tp : consumerRecords.partitions()) { +for (TopicPartition tp : consumer.assignment()) { long stoppingOffset = getStoppingOffset(tp); -final List> recordsFromPartition = -consumerRecords.records(tp); - -if (recordsFromPartition.size() > 0) { -final ConsumerRecord lastRecord = -recordsFromPartition.get(recordsFromPartition.size() - 1); - -// After processing a record with offset of "stoppingOffset - 1", the split reader -// should not continue fetching because the record with stoppingOffset may not -// exist. Keep polling will just block forever. -if (lastRecord.offset() >= stoppingOffset - 1) { -recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); -finishSplitAtRecord( -tp, -stoppingOffset, -lastRecord.offset(), -finishedPartitions, -recordsBySplits); -} +long consumerPosition = consumer.position(tp); +// Stop fetching when the consumer's position reaches the stoppingOffset. +// Control messages may follow the last record; therefore, using the last record's +// offset as a stopping condition could result in indefinite blocking. +if (consumerPosition >= stoppingOffset) { +LOG.debug( +"Position of {}: {}, has reached stopping offset: {}", +tp, +consumerPosition, +stoppingOffset); +recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); +finishSplitAtRecord( +tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits); } // Track this partition's record lag if it never appears before kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp); Review Comment: consumerRecords.partitions().forEach(trackTp -> { // Track this partition's record lag if it never appears before kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp); }); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
ruanhang1993 commented on code in PR #102: URL: https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600864186 ## .github/workflows/push_pr.yml: ## @@ -28,21 +28,16 @@ jobs: compile_and_test: strategy: matrix: -flink: [ 1.17.2 ] -jdk: [ '8, 11' ] -include: - - flink: 1.18.1 -jdk: '8, 11, 17' - - flink: 1.19.0 -jdk: '8, 11, 17, 21' +flink: [ 1.19.0, 1.20-SNAPSHOT ] +jdk: [ '8, 11, 17, 21' ] Review Comment: Hi, @MartijnVisser . I check these files in both jdbc connector[1] and mongodb connector[2]. It seems like they all use the Java 21 for Flink 1.19. Do I need to change them too? [1] https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml [2] https://github.com/apache/flink-connector-mongodb/blob/main/.github/workflows/weekly.yml -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
ruanhang1993 commented on code in PR #102: URL: https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600860337 ## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java: ## @@ -43,28 +40,19 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; -import java.util.Set; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; -import static org.hamcrest.CoreMatchers.not; -/** - * A test base for testing {@link TypeSerializer} upgrades. - * - * You can run {@link #generateTestSetupFiles(TestSpecification)} on a Flink branch to - * (re-)generate the test data files. - */ +/** A test base for testing {@link TypeSerializer} upgrades. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class TypeSerializerUpgradeTestBase { - -public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17; - -public static final Set MIGRATION_VERSIONS = -FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION); +public abstract class TypeSerializerUpgradeTestBase Review Comment: Hi, @MartijnVisser . The changes for `TypeSerializerUpgradeTestBase` in [Flink#24603](https://github.com/apache/flink/pull/24603/files#) and [Flink#23960](https://github.com/apache/flink/pull/23960/files) make the kafka connector not be able to compile with both 1.20-SNAPSHOT and 1.19.0. I think we still have to maintain `TypeSerializerUpgradeTestBase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]
loserwang1024 commented on PR #3319: URL: https://github.com/apache/flink-cdc/pull/3319#issuecomment-2111475946 @PatrickRen , @ruanhang1993 , @leonardBang , CC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35346) Introduce pluggable workflow scheduler interface for materialized table
[ https://issues.apache.org/jira/browse/FLINK-35346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35346: --- Labels: pull-request-available (was: ) > Introduce pluggable workflow scheduler interface for materialized table > --- > > Key: FLINK-35346 > URL: https://issues.apache.org/jira/browse/FLINK-35346 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table [flink]
hackergin commented on code in PR #24767: URL: https://github.com/apache/flink/pull/24767#discussion_r1600850246 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.refresh.RefreshHandler; +import org.apache.flink.table.refresh.RefreshHandlerSerializer; + +/** + * This interface is used to interact with specific workflow scheduler services that support + * creating, modifying, and deleting refreshed workflow of Materialized Table. + * + * @param The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to + * locate the refresh workflow in scheduler service. + */ +@PublicEvolving +public interface WorkflowScheduler { + +/** + * Open this workflow scheduler instance. Used for any required preparation in initialization + * phase. + * + * @throws WorkflowException if initializing workflow scheduler occur exception + */ +void open() throws WorkflowException; + +/** + * Close this workflow scheduler when it is no longer needed and release any resource that it + * might be holding. + * + * @throws WorkflowException if close the related resources of workflow scheduler failed + */ +void close() throws WorkflowException; + +/** + * Return a {@link RefreshHandlerSerializer} instance to serialize and deserialize {@link + * RefreshHandler} created by specific workflow scheduler service. + */ +RefreshHandlerSerializer getRefreshHandlerSerializer(); + +/** + * Create a refresh workflow in specific scheduler service for the materialized table, return a + * {@link RefreshHandler} instance which can locate the refresh workflow detail information. + * + * This method supports creating workflow for periodic refresh, as well as workflow for a + * one-time refresh only. + * + * @param createRefreshWorkflow The detail info for create refresh workflow of materialized + * table. + * @return The meta info which points to the refresh workflow in scheduler service. + * @throws WorkflowException if create refresh workflow failed Review Comment: ```suggestion * @throws WorkflowException if creating refresh workflow failed ``` ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.workflow.WorkflowScheduler; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; +import static org.apache.flink.table.factories.Fa
Re: [PR] [FLINK-35318][table] use UTC timezone to handle TIMESTAMP_WITHOUT_TIM… [flink]
flinkbot commented on PR #24787: URL: https://github.com/apache/flink/pull/24787#issuecomment-2111466404 ## CI report: * 297b48d3db74056754600c1b01a5dfef0a0635f3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
ruanhang1993 commented on code in PR #102: URL: https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600864186 ## .github/workflows/push_pr.yml: ## @@ -28,21 +28,16 @@ jobs: compile_and_test: strategy: matrix: -flink: [ 1.17.2 ] -jdk: [ '8, 11' ] -include: - - flink: 1.18.1 -jdk: '8, 11, 17' - - flink: 1.19.0 -jdk: '8, 11, 17, 21' +flink: [ 1.19.0, 1.20-SNAPSHOT ] +jdk: [ '8, 11, 17, 21' ] Review Comment: Hi, @MartijnVisser . I check these files in both jdbc connector and mongodb connector. It seems like they all use the Java 21 for Flink 1.19. Do I need to change them too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown
[ https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35318: --- Labels: pull-request-available (was: ) > incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during > predicate pushdown > - > > Key: FLINK-35318 > URL: https://issues.apache.org/jira/browse/FLINK-35318 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.1 > Environment: flink version 1.18.1 > iceberg version 1.15.1 >Reporter: linshangquan >Assignee: linshangquan >Priority: Major > Labels: pull-request-available > Attachments: image-2024-05-09-14-06-58-007.png, > image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, > image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, > image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png > > > In our scenario, we have an Iceberg table that contains a column named 'time' > of the {{timestamptz}} data type. This column has 10 rows of data where the > 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" > timezone. > !image-2024-05-09-14-06-58-007.png! > > We encountered a strange phenomenon when accessing the table using > Iceberg-flink. > When the {{WHERE}} clause includes the {{time}} column, the results are > incorrect. > ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" > !image-2024-05-09-18-52-03-741.png! > When there is no {{WHERE}} clause, the results are correct. > !image-2024-05-09-18-52-28-584.png! > During debugging, we found that when a {{WHERE}} clause is present, a > {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes > {{RexNodeToExpressionConverter}} for translation. > !image-2024-05-09-14-11-38-476.png! > !image-2024-05-09-14-22-59-370.png! > When {{RexNodeToExpressionConverter#visitLiteral}} encounters a > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone > "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} > type. However, the upstream {{TimestampString}} data has already been > processed in UTC timezone. By applying the local timezone processing here, an > error occurs due to the mismatch in timezones. > Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in > {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should > process the data in UTC timezone. > > Please help confirm if this is the issue, and if so, we can submit a patch to > fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35318][table] use UTC timezone to handle TIMESTAMP_WITHOUT_TIM… [flink]
lshangq opened a new pull request, #24787: URL: https://github.com/apache/flink/pull/24787 …E_ZONE type in RexNodeToExpressionConverter ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
ruanhang1993 commented on code in PR #102: URL: https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600860337 ## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java: ## @@ -43,28 +40,19 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; -import java.util.Set; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; -import static org.hamcrest.CoreMatchers.not; -/** - * A test base for testing {@link TypeSerializer} upgrades. - * - * You can run {@link #generateTestSetupFiles(TestSpecification)} on a Flink branch to - * (re-)generate the test data files. - */ +/** A test base for testing {@link TypeSerializer} upgrades. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class TypeSerializerUpgradeTestBase { - -public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17; - -public static final Set MIGRATION_VERSIONS = -FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION); +public abstract class TypeSerializerUpgradeTestBase Review Comment: Hi, @MartijnVisser . The changes for `TypeSerializerUpgradeTestBase` in #24603 and #23960 make the kafka connector not be able to compile with both 1.20-SNAPSHOT and 1.19.0. I think we still have to maintain `TypeSerializerUpgradeTestBase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
LinMingQiang commented on PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#issuecomment-2111454911 its work , i had try. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource
[ https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846473#comment-17846473 ] Rui Fan commented on FLINK-35293: - Hi [~xiasun] , I saw you added the release note for FLIP-445, would you mind recording FLIP-445 into the 1.20 release doc[1]? It's useful for release managers to follow it, thanks in advance. [1]https://cwiki.apache.org/confluence/display/FLINK/1.20+Release > FLIP-445: Support dynamic parallelism inference for HiveSource > -- > > Key: FLINK-35293 > URL: https://issues.apache.org/jira/browse/FLINK-35293 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.20.0 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs] > introduces dynamic source parallelism inference, which, compared to static > inference, utilizes runtime information to more accurately determine the > source parallelism. The FileSource already possesses the capability for > dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP > plans to implement the dynamic parallelism inference interface for > HiveSource, and also switches the default static parallelism inference to > dynamic parallelism inference. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35353) Translate "Profiler" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-35353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-35353: --- Assignee: Juan Zifeng > Translate "Profiler" page into Chinese > --- > > Key: FLINK-35353 > URL: https://issues.apache.org/jira/browse/FLINK-35353 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Affects Versions: 1.19.0 >Reporter: Juan Zifeng >Assignee: Juan Zifeng >Priority: Major > Fix For: 1.19.0 > > > The links are > https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
mumuhhh commented on code in PR #24600: URL: https://github.com/apache/flink/pull/24600#discussion_r1600843684 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java: ## @@ -236,6 +238,9 @@ private void setTables(ContextResolvedTable catalogTable) { tables.add(catalogTable); } else { for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { +if (tables.contains(catalogTable)) { Review Comment: > I think we can use a boolean flag to check here, then we don't need to call contains method every time, it is O(N) time complexity. > > ``` > boolean hasAdded = false; > for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { > if (hasAdded) { > break; > } > if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { > tables.add(catalogTable); > hasAdded = true; > } > } > ``` I think we should modify the traversal logic. ``` boolean hasAdded = false; for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { if (thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { hasAdded = true; break; } } if (!hasAdded) { tables.add(catalogTable); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
mumuhhh commented on code in PR #24600: URL: https://github.com/apache/flink/pull/24600#discussion_r1600843684 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java: ## @@ -236,6 +238,9 @@ private void setTables(ContextResolvedTable catalogTable) { tables.add(catalogTable); } else { for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { +if (tables.contains(catalogTable)) { Review Comment: > 我想我们可以在这里使用布尔标志来检查,那么我们不需要每次都调用方法,它是 O(N) 时间复杂度。`contains` > > ``` > boolean hasAdded = false; > for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { > if (hasAdded) { > break; > } > if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { > tables.add(catalogTable); > hasAdded = true; > } > } > ``` I think we should modify the traversal logic. ``` boolean hasAdded = false; for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { if (thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { hasAdded = true; break; } } if (!hasAdded) { tables.add(catalogTable); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
mumuhhh commented on code in PR #24600: URL: https://github.com/apache/flink/pull/24600#discussion_r1600841072 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java: ## @@ -115,7 +117,7 @@ private static class DppDimSideChecker { private final RelNode relNode; private boolean hasFilter; private boolean hasPartitionedScan; -private final List tables = new ArrayList<>(); +private final Set tables = new HashSet<>(); Review Comment: Why do we write traversal comparisons like that? ``` boolean hasAdded = false; for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { if (thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { hasAdded = true; break; } } if (!hasAdded) { tables.add(catalogTable); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fsip module 2b [flink-training]
manoellins opened a new pull request, #82: URL: https://github.com/apache/flink-training/pull/82 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(module-1): Implementing ride-cleansing and rides-and-fares [flink-training]
gerson23 closed pull request #81: feat(module-1): Implementing ride-cleansing and rides-and-fares URL: https://github.com/apache/flink-training/pull/81 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] feat(module-1): Implementing ride-cleansing and rides-and-fares [flink-training]
gerson23 opened a new pull request, #81: URL: https://github.com/apache/flink-training/pull/81 * Implemented ride-cleaning by filtering in rides start and ending within NYC coordinates * In rids-and-fares, TaxiRide and TaxiFare may come in different order, so we need to have a state value for each. State is cleaned as soon as it is paired. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846454#comment-17846454 ] Kanthi Vaidya commented on FLINK-35289: --- Any updates on this issue ? > Incorrect timestamp of stream elements collected from onTimer in batch mode > --- > > Key: FLINK-35289 > URL: https://issues.apache.org/jira/browse/FLINK-35289 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.18.1 >Reporter: Kanthi Vaidya >Priority: Major > > In batch mode all registered timers will fire at the _end of time. Given > this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned > to the elements that are collected from the onTimer context ends up being > Long.MAX_VALUE. Ideally this should be the time when the batch actually > executed the onTimer function._ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-27741. - Fix Version/s: 1.20.0 Resolution: Fixed > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.20.0 > > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > at > org.apach
[jira] [Commented] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846452#comment-17846452 ] Sergey Nuyanzin commented on FLINK-27741: - Merged to master as [40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226] > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Fix For: 1.20.0 > > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggrega
[jira] [Assigned] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-27741: --- Assignee: chenzihao > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Assignee: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.transla
Re: [PR] [BP-1.19][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]
flinkbot commented on PR #24786: URL: https://github.com/apache/flink/pull/24786#issuecomment-272825 ## CI report: * c83d644fa3eb0bce139ecd651719570de94eadd5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34896][table] Migrate CorrelateSortToRankRule to java [flink]
snuyanzin commented on code in PR #24545: URL: https://github.com/apache/flink/pull/24545#discussion_r1600671064 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; +import org.apache.flink.table.planner.calcite.FlinkRelFactories; +import org.apache.flink.table.runtime.operators.rank.ConstantRankRange; +import org.apache.flink.table.runtime.operators.rank.RankType; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ImmutableBitSet; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Planner rule that rewrites sort correlation to a Rank. Typically, the following plan + * + * {@code + * LogicalProject(state=[$0], name=[$1]) + * +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + *:- LogicalAggregate(group=[{0}]) + *: +- LogicalProject(state=[$1]) + *: +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) + *+- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3]) + * +- LogicalProject(name=[$0], pop=[$2]) + * +- LogicalFilter(condition=[=($1, $cor0.state)]) + * +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) + * } + * + * would be transformed to + * + * {@code + * LogicalProject(state=[$0], name=[$1]) + * +- LogicalProject(state=[$1], name=[$0], pop=[$2]) + * +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], + * partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, pop=$2]) + *+- LogicalTableScan(table=[[default_catalog, default_database, cities]]) + * } + * + * To match the Correlate, the LHS needs to be a global Aggregate on a scan, the RHS should be a + * Sort with an equal Filter predicate whose keys are same with the LHS grouping keys. + * + * This rule can only be used in {@link HepPlanner}. + */ +@Value.Enclosing +public class CorrelateSortToRankRule +extends RelRule { + +public static final CorrelateSortToRankRule INSTANCE = + CorrelateSortToRankRule.CorrelateSortToRankRuleConfig.DEFAULT.toRule(); + +protected CorrelateSortToRankRule(CorrelateSortToRankRuleConfig config) { +super(config); +} + +@Override +public boolean matches(RelOptRuleCall call) { +Correlate correlate = call.rel(0); +if (correlate.getJoinType() != JoinRelType.INNER) { +return false; +} +Aggregate agg = call.rel(1); +if (!agg.getAggCallList().isEmpty() || agg.getGroupSets().size() > 1) { +return false; +} +Project aggInput = call.rel(2); +if (!aggInput.isMapping()) { +return false; +} +Sort sort = call.rel(3); +if (sort.offset != null || sort.fetch == null) { +// 1. we can not descr
Re: [PR] [BP-1.18][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]
flinkbot commented on PR #24785: URL: https://github.com/apache/flink/pull/24785#issuecomment-267273 ## CI report: * 7122ef0fcce7c7a5419450fb40ec1e773c223642 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]
snuyanzin commented on PR #19797: URL: https://github.com/apache/flink/pull/19797#issuecomment-253012 Thanks for the review @xuyangzhong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]
snuyanzin closed pull request #19797: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… URL: https://github.com/apache/flink/pull/19797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24298) Refactor Google PubSub sink to use Unified Sink API
[ https://issues.apache.org/jira/browse/FLINK-24298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846426#comment-17846426 ] Ahmed Hamdy commented on FLINK-24298: - [~martijnvisser] I am happy to work on this, to complete migration of sinks using deprecated {{SinkFunction}}. Could you please assign to me? > Refactor Google PubSub sink to use Unified Sink API > --- > > Key: FLINK-24298 > URL: https://issues.apache.org/jira/browse/FLINK-24298 > Project: Flink > Issue Type: Sub-task >Reporter: Martijn Visser >Priority: Major > Labels: pull-request-available > > Refactor Google PubSub source to use Unified Sink API > [FLIP-143|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
JingGe commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1600605397 ## docs/content/docs/deployment/advanced/job_status_listener.md: ## @@ -0,0 +1,81 @@ + +--- +title: "Job Status Changed Listener" +nav-title: job-status-listener +nav-parent_id: advanced +nav-pos: 3 +--- + + +## Job status changed listener +Flink provides a pluggable interface for users to register their custom logic for handling with the job status changes in which lineage info about source/sink is provided. Review Comment: If I am not mistaken, you are implementing the second part of FLIP-314. Does it make sense to update the Jira ticket and the PR description with the info and the content of this md? It will be easier for others to quickly understand the context and join the discussion/review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2111082697 @snuyanzin would you mind taking a look, We want to complete this migration since the `SinkFuction` is deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
[ https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846380#comment-17846380 ] João Boto commented on FLINK-35333: --- on #120 I try to fix the error, but another error appears so I believe that remove the check for v3.1 is the best solution (done on #121) I left the two PRs open so we can evaluate how to solve this. > JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests > - > > Key: FLINK-35333 > URL: https://issues.apache.org/jira/browse/FLINK-35333 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147 > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-jdbc: Compilation failure > Error: > /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37] > is not > abstract and does not override abstract method getTaskInfo() in > org.apache.flink.api.common.functions.RuntimeContext > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35333] Remove weekly check for v3.1 with 1.19 [flink-connector-jdbc]
eskabetxe commented on PR #121: URL: https://github.com/apache/flink-connector-jdbc/pull/121#issuecomment-2110737886 @MartijnVisser I think this is the best solution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35333] Remove weekly check for v3.1 with 1.19 [flink-connector-jdbc]
eskabetxe opened a new pull request, #121: URL: https://github.com/apache/flink-connector-jdbc/pull/121 the branch cut for v3.1 was not prepared for 1.19 versions in another PR we introduce this check, but I think that should be avoided as the cut is not prepared for that.. to prepare the branch to 1.19 we will have to bring some changes that are in master to solve some 1.19 problems -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
[ https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846373#comment-17846373 ] João Boto edited comment on FLINK-35333 at 5/14/24 4:42 PM: I think I resolve the problem, but on v3.1 cut the branch was not prepared for 1.19 I introduce this change on weekly workflow with the new Sink, I know I don't know if was ok.. {code:json} { flink: 1.19.0, jdk: '8, 11, 17, 21', branch: v3.1 } {code} Should we avoid add new version checks on weekly that the cut was not prepared for?? The simple solution is to remove that from weekly file from master as v3.1 cut was not prepared for 1.19 versions was (Author: eskabetxe): I think I resolve the problem, but on v3.1 cut the branch was not prepared for 1.19 I introduce this change on weekly workflow with the new Sink, I know I don't know if was ok.. {code:json} { flink: 1.19.0, jdk: '8, 11, 17, 21', branch: v3.1 } {code} Should we avoid add new version checks on weekly that the cut was not prepared for?? > JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests > - > > Key: FLINK-35333 > URL: https://issues.apache.org/jira/browse/FLINK-35333 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147 > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-jdbc: Compilation failure > Error: > /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37] > is not > abstract and does not override abstract method getTaskInfo() in > org.apache.flink.api.common.functions.RuntimeContext > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
[ https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846373#comment-17846373 ] João Boto commented on FLINK-35333: --- I think I resolve the problem, but on v3.1 cut the branch was not prepared for 1.19 I introduce this change on weekly workflow with the new Sink, I know I don't know if was ok.. {code:json} { flink: 1.19.0, jdk: '8, 11, 17, 21', branch: v3.1 } {code} Should we avoid add new version checks on weekly that the cut was not prepared for?? > JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests > - > > Key: FLINK-35333 > URL: https://issues.apache.org/jira/browse/FLINK-35333 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147 > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-jdbc: Compilation failure > Error: > /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37] > is not > abstract and does not override abstract method getTaskInfo() in > org.apache.flink.api.common.functions.RuntimeContext > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
[ https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35333: --- Labels: pull-request-available test-stability (was: test-stability) > JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests > - > > Key: FLINK-35333 > URL: https://issues.apache.org/jira/browse/FLINK-35333 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147 > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-jdbc: Compilation failure > Error: > /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37] > is not > abstract and does not override abstract method getTaskInfo() in > org.apache.flink.api.common.functions.RuntimeContext > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
[ https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846370#comment-17846370 ] João Boto commented on FLINK-35333: --- [~martijnvisser] could you assign this to me.. > JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests > - > > Key: FLINK-35333 > URL: https://issues.apache.org/jira/browse/FLINK-35333 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147 > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-jdbc: Compilation failure > Error: > /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37] > is not > abstract and does not override abstract method getTaskInfo() in > org.apache.flink.api.common.functions.RuntimeContext > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35333] Fix JdbcXaSinkTestBase fails in weekly workflows [flink-connector-jdbc]
eskabetxe opened a new pull request, #120: URL: https://github.com/apache/flink-connector-jdbc/pull/120 Fix JdbcXaSinkTestBase fails in weekly workflows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30284][flink-metrics] Modify configuration of DataDog http reporter url. [flink]
anleib commented on PR #23610: URL: https://github.com/apache/flink/pull/23610#issuecomment-2110657281 @Wosin can you please fix formatting. Would like to push this for a merge; my org recently migrated to US3 and broke all Flink metrics/dashboards/alerts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24298) Refactor Google PubSub sink to use Unified Sink API
[ https://issues.apache.org/jira/browse/FLINK-24298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-24298: --- Labels: pull-request-available (was: ) > Refactor Google PubSub sink to use Unified Sink API > --- > > Key: FLINK-24298 > URL: https://issues.apache.org/jira/browse/FLINK-24298 > Project: Flink > Issue Type: Sub-task >Reporter: Martijn Visser >Priority: Major > Labels: pull-request-available > > Refactor Google PubSub source to use Unified Sink API > [FLIP-143|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy opened a new pull request, #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27 ## Purpose of the change Add implementation of `PubSubV2Sink` extending new `Sink` API. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests - Added Integration tests ## Dependency Updates - Updated least compatible Flink version to 1.19 ## Documentation - Added Documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
gyfora commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1600288388 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: here again I would prefer to completely separate the logic for the pulser/kafka counting as the logic doesn't really overlap and this is just more confusing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
gyfora commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1600286605 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); Review Comment: I wonder if we should simply have 2 separate regex / predicates and just `or(...)` them together to have a cleaner logic here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34931) Update Kudu DataStream connector to use Sink V2
[ https://issues.apache.org/jira/browse/FLINK-34931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-34931: - Summary: Update Kudu DataStream connector to use Sink V2 (was: Update Kudu connector DataStream Sink implementation) > Update Kudu DataStream connector to use Sink V2 > --- > > Key: FLINK-34931 > URL: https://issues.apache.org/jira/browse/FLINK-34931 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kudu >Reporter: Ferenc Csaky >Assignee: Ferenc Csaky >Priority: Major > Labels: pull-request-available > > Update the DataSource API classes to use the current interfaces. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34931) Update Kudu connector DataStream Sink implementation
[ https://issues.apache.org/jira/browse/FLINK-34931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34931: --- Labels: pull-request-available (was: ) > Update Kudu connector DataStream Sink implementation > > > Key: FLINK-34931 > URL: https://issues.apache.org/jira/browse/FLINK-34931 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kudu >Reporter: Ferenc Csaky >Assignee: Ferenc Csaky >Priority: Major > Labels: pull-request-available > > Update the DataSource API classes to use the current interfaces. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]
xuyangzhong commented on code in PR #19797: URL: https://github.com/apache/flink/pull/19797#discussion_r1600169931 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala: ## @@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase { util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable") } + @Test + def testDenseRankOnOrder(): Unit = { Review Comment: Oh, I missed it. Pardon me... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]
snuyanzin commented on code in PR #19797: URL: https://github.com/apache/flink/pull/19797#discussion_r1600155947 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala: ## @@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase { util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable") } + @Test + def testDenseRankOnOrder(): Unit = { Review Comment: Besides plans there are already a test in `OverAggregateITCase. scala` both for `RANK` and `DENSE_RANK`, that's way I have this question https://github.com/apache/flink/pull/19797/files#diff-0fa625835219d2a3fcacb2fa8de5274ed6795f10b06035c6c9b68d11b700f9a8R198-R203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]
xuyangzhong commented on code in PR #19797: URL: https://github.com/apache/flink/pull/19797#discussion_r1600130717 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala: ## @@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase { util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable") } + @Test + def testDenseRankOnOrder(): Unit = { Review Comment: What I mean is to add a test `testRankOnOver` in` OverAggregateITCase. scala` to test the function `Rank()`. Because the UT tests about plan here cannot test the modified part (when I reversed the changes in `AggFunctionFactory.scala`, these tests still passed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status
[ https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846319#comment-17846319 ] Ryan Skraba commented on FLINK-35342: - Thank you for the fix! Just to be complete, this failure occurred before the fix was merged: * 1.20 AdaptiveScheduler / Test (module: table) https://github.com/apache/flink/actions/runs/9072668322/job/24928769693#step:10:12490 > MaterializedTableStatementITCase test can check for wrong status > > > Key: FLINK-35342 > URL: https://issues.apache.org/jira/browse/FLINK-35342 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Priority: Critical > Labels: test-stability > > * 1.20 AdaptiveScheduler / Test (module: table) > https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490 > > It looks like > {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}} > can be flaky, where the expected status is not yet RUNNING: > {code} > Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase > Error: 03:24:03 03:24:03.902 [ERROR] > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path, > RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE! > May 13 03:24:03 org.opentest4j.AssertionFailedError: > May 13 03:24:03 > May 13 03:24:03 expected: "RUNNING" > May 13 03:24:03 but was: "CREATED" > May 13 03:24:03 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > May 13 03:24:03 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > May 13 03:24:03 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > May 13 03:24:03 at > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650) > May 13 03:24:03 at java.lang.reflect.Method.invoke(Method.java:498) > May 13 03:24:03 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > May 13 03:24:03 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > May 13 03:24:03 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > May 13 03:24:03 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > May 13 03:24:03 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > May 13 03:24:03 > May 13 03:24:04 03:24:04.270 [INFO] > May 13 03:24:04 03:24:04.270 [INFO] Results: > May 13 03:24:04 03:24:04.270 [INFO] > Error: 03:24:04 03:24:04.270 [ERROR] Failures: > Error: 03:24:04 03:24:04.271 [ERROR] > MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650 > > May 13 03:24:04 expected: "RUNNING" > May 13 03:24:04 but was: "CREATED" > May 13 03:24:04 03:24:04.271 [INFO] > Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, > Skipped: 0 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService
[ https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846318#comment-17846318 ] Ryan Skraba commented on FLINK-35002: - * 1.18 Default (Java 8) / Test (module: tests) https://github.com/apache/flink/commit/1f604da2dfc831d04826a20b3cb272d2ad9dfb56/checks/24935906143/logs > GitHub action request timeout to ArtifactService > - > > Key: FLINK-35002 > URL: https://issues.apache.org/jira/browse/FLINK-35002 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ryan Skraba >Priority: Major > Labels: github-actions, test-stability > > A timeout can occur when uploading a successfully built artifact: > * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650] > {code:java} > 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file > uploaded > 2024-04-02T02:20:15.6360133Z Artifact name is valid! > 2024-04-02T02:20:15.6362872Z Root directory input is valid! > 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 3000 ms... > 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 4785 ms... > 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 7375 ms... > 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 14988 ms... > 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to > make request after 5 attempts: Request timeout: > /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact > 2024-04-02T02:22:59.9893296Z Post job cleanup. > 2024-04-02T02:22:59.9958844Z Post job cleanup. {code} > (This is unlikely to be something we can fix, but we can track it.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]
snuyanzin commented on code in PR #19797: URL: https://github.com/apache/flink/pull/19797#discussion_r1600062134 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala: ## @@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase { util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable") } + @Test + def testDenseRankOnOrder(): Unit = { Review Comment: >I've attempted this case, and it seems only possible to test it within an integration test in stream mode. I propose we can add another testRankOnOver test specifically for testing the RANK() function. this I didn't get, there is already`testRankOnOver`, would you like to add another one? >Maybe we'd better address this as well yep, makes sense, I extracted the logic and reused it for §createPercentRankAggFunction` as well ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala: ## @@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase { util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable") } + @Test + def testDenseRankOnOrder(): Unit = { Review Comment: >I've attempted this case, and it seems only possible to test it within an integration test in stream mode. I propose we can add another testRankOnOver test specifically for testing the RANK() function. this I didn't get, there is already`testRankOnOver`, would you like to add another one? >Maybe we'd better address this as well yep, makes sense, I extracted the logic and reused it for §createPercentRankAggFunction` as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ouyangwulin updated FLINK-35354: Description: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. Add param: tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 was: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. add param: tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Fix For: cdc-3.1.0, cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode in tikv, we > need to do network mapping, such as `spark.tispark.host_mapping` in > [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I > think we need support `host_mapping` in our Flink tikv cdc connector. > > Add param: > tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ouyangwulin updated FLINK-35354: Description: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. add param: tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 was: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. add param: tikv.host_mapping > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Fix For: cdc-3.1.0, cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode in tikv, we > need to do network mapping, such as `spark.tispark.host_mapping` in > [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I > think we need support `host_mapping` in our Flink tikv cdc connector. > > add param: > tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ouyangwulin updated FLINK-35354: Description: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. add param: tikv.host_mapping was:In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Fix For: cdc-3.1.0, cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode in tikv, we > need to do network mapping, such as `spark.tispark.host_mapping` in > [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I > think we need support `host_mapping` in our Flink tikv cdc connector. > > add param: > tikv.host_mapping -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ouyangwulin updated FLINK-35354: Description: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I think we need support `host_mapping` in our Flink tikv cdc connector. (was: In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode kv, we need to do network mapping, such as `spark.tispark.host_mapping` in https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I think we need support `host_mapping` in our Flink tikv cdc connector.) > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Fix For: cdc-3.1.0, cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode in tikv, we > need to do network mapping, such as `spark.tispark.host_mapping` in > [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I > think we need support `host_mapping` in our Flink tikv cdc connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35356) Async reducing state
Zakelly Lan created FLINK-35356: --- Summary: Async reducing state Key: FLINK-35356 URL: https://issues.apache.org/jira/browse/FLINK-35356 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Zakelly Lan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35355) Async aggregating state
Zakelly Lan created FLINK-35355: --- Summary: Async aggregating state Key: FLINK-35355 URL: https://issues.apache.org/jira/browse/FLINK-35355 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Zakelly Lan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ouyangwulin updated FLINK-35354: Summary: Support host mapping in Flink tikv cdc (was: [discuss] Support host mapping in Flink tikv cdc) > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Fix For: cdc-3.1.0, cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode kv, we need > to do network mapping, such as `spark.tispark.host_mapping` in > https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I > think we need support `host_mapping` in our Flink tikv cdc connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35354) [discuss] Support host mapping in Flink tikv cdc
ouyangwulin created FLINK-35354: --- Summary: [discuss] Support host mapping in Flink tikv cdc Key: FLINK-35354 URL: https://issues.apache.org/jira/browse/FLINK-35354 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.0, cdc-3.2.0 Reporter: ouyangwulin Fix For: cdc-3.1.0, cdc-3.2.0 In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode kv, we need to do network mapping, such as `spark.tispark.host_mapping` in https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I think we need support `host_mapping` in our Flink tikv cdc connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status
[ https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846312#comment-17846312 ] Feng Jin commented on FLINK-35342: -- fixed by: 94d861b08fef1e350d80a3f5f0f63168d327bc64 > MaterializedTableStatementITCase test can check for wrong status > > > Key: FLINK-35342 > URL: https://issues.apache.org/jira/browse/FLINK-35342 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Priority: Critical > Labels: test-stability > > * 1.20 AdaptiveScheduler / Test (module: table) > https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490 > > It looks like > {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}} > can be flaky, where the expected status is not yet RUNNING: > {code} > Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase > Error: 03:24:03 03:24:03.902 [ERROR] > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path, > RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE! > May 13 03:24:03 org.opentest4j.AssertionFailedError: > May 13 03:24:03 > May 13 03:24:03 expected: "RUNNING" > May 13 03:24:03 but was: "CREATED" > May 13 03:24:03 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > May 13 03:24:03 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > May 13 03:24:03 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > May 13 03:24:03 at > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650) > May 13 03:24:03 at java.lang.reflect.Method.invoke(Method.java:498) > May 13 03:24:03 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > May 13 03:24:03 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > May 13 03:24:03 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > May 13 03:24:03 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > May 13 03:24:03 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > May 13 03:24:03 > May 13 03:24:04 03:24:04.270 [INFO] > May 13 03:24:04 03:24:04.270 [INFO] Results: > May 13 03:24:04 03:24:04.270 [INFO] > Error: 03:24:04 03:24:04.270 [ERROR] Failures: > Error: 03:24:04 03:24:04.271 [ERROR] > MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650 > > May 13 03:24:04 expected: "RUNNING" > May 13 03:24:04 but was: "CREATED" > May 13 03:24:04 03:24:04.271 [INFO] > Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, > Skipped: 0 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode
[ https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu resolved FLINK-35193. --- Resolution: Fixed > Support drop materialized table syntax and execution in continuous refresh > mode > --- > > Key: FLINK-35193 > URL: https://issues.apache.org/jira/browse/FLINK-35193 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: Feng Jin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > In continuous refresh mode, support drop materialized table and the > background refresh job. > {code:SQL} > DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode
[ https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846309#comment-17846309 ] dalongliu commented on FLINK-35193: --- Merged in master: 94d861b08fef1e350d80a3f5f0f63168d327bc64 > Support drop materialized table syntax and execution in continuous refresh > mode > --- > > Key: FLINK-35193 > URL: https://issues.apache.org/jira/browse/FLINK-35193 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: Feng Jin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > In continuous refresh mode, support drop materialized table and the > background refresh job. > {code:SQL} > DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35193][table] Support drop materialized table syntax and execution in continuous refresh mode [flink]
lsyldliu merged PR #24777: URL: https://github.com/apache/flink/pull/24777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33986) Extend shuffleMaster to support batch snapshot.
[ https://issues.apache.org/jira/browse/FLINK-33986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-33986: Component/s: Runtime / Coordination > Extend shuffleMaster to support batch snapshot. > --- > > Key: FLINK-33986 > URL: https://issues.apache.org/jira/browse/FLINK-33986 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Extend shuffleMaster to support batch snapshot as follows: > # Add method supportsBatchSnapshot to identify whether the shuffle master > supports taking snapshot in batch scenarios > # Add method snapshotState and restoreState to snapshot and restore the > shuffle master's state. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33986) Extend shuffleMaster to support batch snapshot.
[ https://issues.apache.org/jira/browse/FLINK-33986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-33986. --- Fix Version/s: 1.20.0 Resolution: Done 65d31e26534836909f6b8139c6bd6cd45b91bba4 > Extend shuffleMaster to support batch snapshot. > --- > > Key: FLINK-33986 > URL: https://issues.apache.org/jira/browse/FLINK-33986 > Project: Flink > Issue Type: Sub-task >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Extend shuffleMaster to support batch snapshot as follows: > # Add method supportsBatchSnapshot to identify whether the shuffle master > supports taking snapshot in batch scenarios > # Add method snapshotState and restoreState to snapshot and restore the > shuffle master's state. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35353) Translate "Profiler" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-35353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846301#comment-17846301 ] Juan Zifeng commented on FLINK-35353: - Hi [~jark], could you assign this to me? > Translate "Profiler" page into Chinese > --- > > Key: FLINK-35353 > URL: https://issues.apache.org/jira/browse/FLINK-35353 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Affects Versions: 1.19.0 >Reporter: Juan Zifeng >Priority: Major > Fix For: 1.19.0 > > > The links are > https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state. [flink]
zhuzhurk merged PR #24774: URL: https://github.com/apache/flink/pull/24774 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35353) Translate "Profiler" page into Chinese
Juan Zifeng created FLINK-35353: --- Summary: Translate "Profiler" page into Chinese Key: FLINK-35353 URL: https://issues.apache.org/jira/browse/FLINK-35353 Project: Flink Issue Type: Improvement Components: chinese-translation, Documentation Affects Versions: 1.19.0 Reporter: Juan Zifeng Fix For: 1.19.0 The links are https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status
[ https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846289#comment-17846289 ] Weijie Guo commented on FLINK-35342: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59528&view=logs&j=f2c100be-250b-5e85-7bbe-176f68fcddc5&t=05efd11e-5400-54a4-0d27-a4663be008a9&l=12763 > MaterializedTableStatementITCase test can check for wrong status > > > Key: FLINK-35342 > URL: https://issues.apache.org/jira/browse/FLINK-35342 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Priority: Critical > Labels: test-stability > > * 1.20 AdaptiveScheduler / Test (module: table) > https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490 > > It looks like > {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}} > can be flaky, where the expected status is not yet RUNNING: > {code} > Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase > Error: 03:24:03 03:24:03.902 [ERROR] > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path, > RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE! > May 13 03:24:03 org.opentest4j.AssertionFailedError: > May 13 03:24:03 > May 13 03:24:03 expected: "RUNNING" > May 13 03:24:03 but was: "CREATED" > May 13 03:24:03 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > May 13 03:24:03 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > May 13 03:24:03 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > May 13 03:24:03 at > org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650) > May 13 03:24:03 at java.lang.reflect.Method.invoke(Method.java:498) > May 13 03:24:03 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > May 13 03:24:03 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > May 13 03:24:03 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > May 13 03:24:03 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > May 13 03:24:03 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > May 13 03:24:03 > May 13 03:24:04 03:24:04.270 [INFO] > May 13 03:24:04 03:24:04.270 [INFO] Results: > May 13 03:24:04 03:24:04.270 [INFO] > Error: 03:24:04 03:24:04.270 [ERROR] Failures: > Error: 03:24:04 03:24:04.271 [ERROR] > MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650 > > May 13 03:24:04 expected: "RUNNING" > May 13 03:24:04 but was: "CREATED" > May 13 03:24:04 03:24:04.271 [INFO] > Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, > Skipped: 0 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)