[jira] [Updated] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document

2024-05-14 Thread Yang Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Zhou updated FLINK-35357:
--
Description: 
In Flink Operator "Custom Flink Resource Listeners" in practice (doc: 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource]
 -listeners)

It was found that the "Operator Configuration Reference" document did not 
explain the "Custom Flink Resource Listeners" configuration parameters.

So I wanted to come up with adding:

kubernetes.operator.plugins.listeners..class: 


, after all it is useful.

I want to submit a PR to optimize the document.

  was:
In Flink Operator "Custom Flink Resource Listeners" in practice (doc: 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource
 -listeners)

It was found that the "Operator Configuration Reference" document did not 
explain the "Custom Flink Resource Listeners" configuration parameters.

So I wanted to come up with adding: 
kubernetes.operator.plugins.listeners..class: 
 , after all it is useful.


> Add "kubernetes.operator.plugins.listeners" parameter description to the 
> Operator configuration document
> 
>
> Key: FLINK-35357
> URL: https://issues.apache.org/jira/browse/FLINK-35357
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Yang Zhou
>Priority: Minor
>
> In Flink Operator "Custom Flink Resource Listeners" in practice (doc: 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource]
>  -listeners)
> It was found that the "Operator Configuration Reference" document did not 
> explain the "Custom Flink Resource Listeners" configuration parameters.
> So I wanted to come up with adding:
> kubernetes.operator.plugins.listeners..class: 
> 
> , after all it is useful.
> I want to submit a PR to optimize the document.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846452#comment-17846452
 ] 

Sergey Nuyanzin edited comment on FLINK-27741 at 5/15/24 5:38 AM:
--

Merged to master as 
[40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226]
1.19: 
[190522c2c051e0ec05213be71fb7a59a517353b1|https://github.com/apache/flink/commit/190522c2c051e0ec05213be71fb7a59a517353b1]
1.18: 
[1e1a7f16b6f272334d9f9a1053b657148151a789|https://github.com/apache/flink/commit/1e1a7f16b6f272334d9f9a1053b657148151a789]


was (Author: sergey nuyanzin):
Merged to master as 
[40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226]
1.19: 
[190522c2c051e0ec05213be71fb7a59a517353b1|https://github.com/apache/flink/commit/190522c2c051e0ec05213be71fb7a59a517353b1]

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> 

Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-14 Thread via GitHub


loserwang1024 commented on PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#issuecomment-2111625519

   > Could @loserwang1024 please take a look?
   I'd like to do it. But this PR do nothing with mysql cdc.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-27741:

Fix Version/s: 1.18.2

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
>   at 
> 

[jira] [Created] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document

2024-05-14 Thread Yang Zhou (Jira)
Yang Zhou created FLINK-35357:
-

 Summary: Add "kubernetes.operator.plugins.listeners" parameter 
description to the Operator configuration document
 Key: FLINK-35357
 URL: https://issues.apache.org/jira/browse/FLINK-35357
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Yang Zhou


In Flink Operator "Custom Flink Resource Listeners" in practice (doc: 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource
 -listeners)

It was found that the "Operator Configuration Reference" document did not 
explain the "Custom Flink Resource Listeners" configuration parameters.

So I wanted to come up with adding: 
kubernetes.operator.plugins.listeners..class: 
 , after all it is useful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-27741:

Fix Version/s: 1.19.1

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
>   at 
> 

Re: [PR] [BP-1.18][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]

2024-05-14 Thread via GitHub


snuyanzin merged PR #24785:
URL: https://github.com/apache/flink/pull/24785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846452#comment-17846452
 ] 

Sergey Nuyanzin edited comment on FLINK-27741 at 5/15/24 5:36 AM:
--

Merged to master as 
[40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226]
1.19: 
[190522c2c051e0ec05213be71fb7a59a517353b1|https://github.com/apache/flink/commit/190522c2c051e0ec05213be71fb7a59a517353b1]


was (Author: sergey nuyanzin):
Merged to master as 
[40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226]

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> 

Re: [PR] [BP-1.19][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]

2024-05-14 Thread via GitHub


snuyanzin merged PR #24786:
URL: https://github.com/apache/flink/pull/24786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34503][table] Migrate JoinDeriveNullFilterRule to java. [flink]

2024-05-14 Thread via GitHub


liuyongvs commented on PR #24373:
URL: https://github.com/apache/flink/pull/24373#issuecomment-2111617766

   hi @snuyanzin will you help review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35075][table] Migrate TwoStageOptimizedAggregateRule to java [flink]

2024-05-14 Thread via GitHub


liuyongvs commented on PR #24650:
URL: https://github.com/apache/flink/pull/24650#issuecomment-2111617324

   hi @snuyanzin will you help review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34555][table] Migrate JoinConditionTypeCoerceRule to java. [flink]

2024-05-14 Thread via GitHub


liuyongvs commented on PR #24420:
URL: https://github.com/apache/flink/pull/24420#issuecomment-2111615952

   hi @snuyanzin will you help review it?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34109) FileSystem sink connector restore job from historical checkpoint failure

2024-05-14 Thread Sergey Paryshev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846490#comment-17846490
 ] 

Sergey Paryshev commented on FLINK-34109:
-

bump

> FileSystem sink connector restore job from historical checkpoint failure
> 
>
> Key: FLINK-34109
> URL: https://issues.apache.org/jira/browse/FLINK-34109
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.18.0, 1.16.3, 1.17.2
>Reporter: Sergey Paryshev
>Priority: Minor
>  Labels: pull-request-available
>
> FileSystem connector sink with compaction setting is enabled can't restore 
> job from historical checkpoint (when MAX_RETAINED_CHECKPOINTS > 1 and 
> restroing checkpoint is not last)
> {code:java}
> java.io.UncheckedIOException: java.io.FileNotFoundException: File 
> file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1
>  does not exist or the user running Flink ('user') has insufficient 
> permissions to access it.
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:165)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.BinPacking.pack(BinPacking.java:40) 
> ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:175)
>  ~[classes/:?]
>     at java.util.HashMap.forEach(HashMap.java:1290) ~[?:1.8.0_312]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:171)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:153)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:143)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:262)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:155)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:554)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:245)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:848)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:797)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) 
> ~[classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:747) 
> ~[classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[classes/:?]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
> Caused by: java.io.FileNotFoundException: File 
> file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1
>  does not exist or the user running Flink ('user') has insufficient 
> permissions to access it.
>     at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:113)
>  ~[classes/:?]
>     at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>  ~[classes/:?]
>     ... 19 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-14 Thread via GitHub


yuxiqian commented on PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#issuecomment-2111557242

   Seems MySQL CI is taking significantly more time (90min vs. 30min) and fails 
eventually. Could @loserwang1024 please take a look?
   
   Failed CI job link: 
https://github.com/apache/flink-cdc/actions/runs/9073569533/job/24931065832


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Separation is a good idea, ok.
   The first part is to check if SubtaskStateMapper.FULL is used in any input 
(this mapper type always returns all I/O), the problem is reproduced only for 
this mapper type
   The second part is to check if the parallelism of any previous operator has 
changed. if it has changed, it means that the number of outputs has changed.
   But I made little mistake in condition. I will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-14 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846485#comment-17846485
 ] 

xuyang edited comment on FLINK-34380 at 5/15/24 4:01 AM:
-

Sorry for this late reply. The results of this repair still don’t seem to meet 
expectations a little.

Still based on the above test, the result is following. However, the row kind 
of the first data should be `+I`, right?

 
{code:java}
++---++---++
| op | a | b  | a0| b0 |
++---++---++
| +U | 1 |  1 | 1 | 99 |
| -U | 1 |  1 | 1 | 99 |
| +U | 1 | 99 | 1 | 99 |
| -D | 1 | 99 | 1 | 99 |
++---++---++ {code}
 

 


was (Author: xuyangzhong):
Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can 
you take a look to verify it again?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Separation is a good idea, ok.
   The first part is to check if SubtaskStateMapper.FULL is used in any input 
(this mapper type always returns all I/O), the problem is reproduced only for 
this mapper type
   The second part is to check if the parallelism of any previous operator has 
changed. if it has changed, it means that the number of outputs has changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33790) Upsert statement filter unique key field colume in mysql dielact

2024-05-14 Thread SuDewei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846487#comment-17846487
 ] 

SuDewei commented on FLINK-33790:
-

Hi [~jeyhunkarimov] , i think what [~lijingwei.5018]  tring to say is that the 
MySqlDialect does not need to update the unique key in the UpsertStatement. So 
it is an improvement. After adding this feature, the upsert statement would not 
include the key fields just like the example shows.

> Upsert statement filter unique key field colume in mysql dielact 
> -
>
> Key: FLINK-33790
> URL: https://issues.apache.org/jira/browse/FLINK-33790
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: JingWei Li
>Priority: Minor
>
> example: `col2` and `col4` is unique key in table `my_table`
>  
> {code:java}
> INSERT INTO `my_table`(`col1`, `col2`, `col3`, `col4`, `col5`) 
> VALUES (?, ?, ?, ?, ?)
> ON DUPLICATE KEY UPDATE 
> `col1`=VALUES(`col1`),
> `col2`=VALUES(`col2`),
> `col3`=VALUES(`col3`),
> `col4`=VALUES(`col4`),
> `col5`=VALUES(`col5`){code}
> result:
> {code:java}
> INSERT INTO `my_table`(`col1`, `col2`, `col3`, `col4`, `col5`) 
> VALUES (?, ?, ?, ?, ?)
> ON DUPLICATE KEY UPDATE 
> `col1`=VALUES(`col1`),
> `col3`=VALUES(`col3`),
> `col5`=VALUES(`col5`) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-14 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846485#comment-17846485
 ] 

xuyang commented on FLINK-34380:


Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can 
you take a look to verify it again?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35030][runtime] Introduce Epoch Manager under async execution [flink]

2024-05-14 Thread via GitHub


yunfengzhou-hub commented on code in PR #24748:
URL: https://github.com/apache/flink/pull/24748#discussion_r1600886728


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java:
##
@@ -188,6 +190,35 @@ public  InternalTimerService 
getInternalTimerService(
 (AsyncExecutionController) asyncExecutionController);
 }
 
+@Override
+public void processWatermark(Watermark mark) throws Exception {
+if (!isAsyncStateProcessingEnabled()) {
+super.processWatermark(mark);
+return;
+}
+asyncExecutionController.processNonRecord(() -> 
super.processWatermark(mark));

Review Comment:
   We may need to override `processWatermark1` and `processWatermark2` as well, 
or we can override `processWatermark(Watermark, int)`, like that for 
`processWatermarkStatus`. I also understand it that this PR is mainly 
responsible for introducing the epoch mechanism, and that we would have another 
jira ticket and PR to apply epoch to all events and all cases. So it is also OK 
for me if you would like to make the change in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]

2024-05-14 Thread via GitHub


LinMingQiang commented on code in PR #100:
URL: 
https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1600879571


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##
@@ -122,27 +122,21 @@ public RecordsWithSplitIds> fetch() throws IOExce
 KafkaPartitionSplitRecords recordsBySplits =
 new KafkaPartitionSplitRecords(consumerRecords, 
kafkaSourceReaderMetrics);
 List finishedPartitions = new ArrayList<>();
-for (TopicPartition tp : consumerRecords.partitions()) {
+for (TopicPartition tp : consumer.assignment()) {
 long stoppingOffset = getStoppingOffset(tp);
-final List> recordsFromPartition =
-consumerRecords.records(tp);
-
-if (recordsFromPartition.size() > 0) {
-final ConsumerRecord lastRecord =
-recordsFromPartition.get(recordsFromPartition.size() - 
1);
-
-// After processing a record with offset of "stoppingOffset - 
1", the split reader
-// should not continue fetching because the record with 
stoppingOffset may not
-// exist. Keep polling will just block forever.
-if (lastRecord.offset() >= stoppingOffset - 1) {
-recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
-finishSplitAtRecord(
-tp,
-stoppingOffset,
-lastRecord.offset(),
-finishedPartitions,
-recordsBySplits);
-}
+long consumerPosition = consumer.position(tp);
+// Stop fetching when the consumer's position reaches the 
stoppingOffset.
+// Control messages may follow the last record; therefore, using 
the last record's
+// offset as a stopping condition could result in indefinite 
blocking.
+if (consumerPosition >= stoppingOffset) {
+LOG.debug(
+"Position of {}: {}, has reached stopping offset: {}",
+tp,
+consumerPosition,
+stoppingOffset);
+recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
+finishSplitAtRecord(
+tp, stoppingOffset, consumerPosition, 
finishedPartitions, recordsBySplits);
 }
 // Track this partition's record lag if it never appears before
 kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);

Review Comment:
   we do not need to track tp when consumerRecords is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]

2024-05-14 Thread via GitHub


LinMingQiang commented on code in PR #100:
URL: 
https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1600878564


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##
@@ -122,27 +122,21 @@ public RecordsWithSplitIds> fetch() throws IOExce
 KafkaPartitionSplitRecords recordsBySplits =
 new KafkaPartitionSplitRecords(consumerRecords, 
kafkaSourceReaderMetrics);
 List finishedPartitions = new ArrayList<>();
-for (TopicPartition tp : consumerRecords.partitions()) {
+for (TopicPartition tp : consumer.assignment()) {
 long stoppingOffset = getStoppingOffset(tp);
-final List> recordsFromPartition =
-consumerRecords.records(tp);
-
-if (recordsFromPartition.size() > 0) {
-final ConsumerRecord lastRecord =
-recordsFromPartition.get(recordsFromPartition.size() - 
1);
-
-// After processing a record with offset of "stoppingOffset - 
1", the split reader
-// should not continue fetching because the record with 
stoppingOffset may not
-// exist. Keep polling will just block forever.
-if (lastRecord.offset() >= stoppingOffset - 1) {
-recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
-finishSplitAtRecord(
-tp,
-stoppingOffset,
-lastRecord.offset(),
-finishedPartitions,
-recordsBySplits);
-}
+long consumerPosition = consumer.position(tp);
+// Stop fetching when the consumer's position reaches the 
stoppingOffset.
+// Control messages may follow the last record; therefore, using 
the last record's
+// offset as a stopping condition could result in indefinite 
blocking.
+if (consumerPosition >= stoppingOffset) {
+LOG.debug(
+"Position of {}: {}, has reached stopping offset: {}",
+tp,
+consumerPosition,
+stoppingOffset);
+recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
+finishSplitAtRecord(
+tp, stoppingOffset, consumerPosition, 
finishedPartitions, recordsBySplits);
 }
 // Track this partition's record lag if it never appears before
 kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);

Review Comment:
consumerRecords.partitions().forEach(trackTp -> {
   // Track this partition's record lag if it never appears 
before
   kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, 
trackTp);
   });



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]

2024-05-14 Thread via GitHub


ruanhang1993 commented on code in PR #102:
URL: 
https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600864186


##
.github/workflows/push_pr.yml:
##
@@ -28,21 +28,16 @@ jobs:
   compile_and_test:
 strategy:
   matrix:
-flink: [ 1.17.2 ]
-jdk: [ '8, 11' ]
-include:
-  - flink: 1.18.1
-jdk: '8, 11, 17'
-  - flink: 1.19.0
-jdk: '8, 11, 17, 21'
+flink: [ 1.19.0, 1.20-SNAPSHOT ]
+jdk: [ '8, 11, 17, 21' ]

Review Comment:
   Hi, @MartijnVisser .
   I check these files in both jdbc connector[1] and mongodb connector[2]. It 
seems like they all use the Java 21 for Flink 1.19. Do I need to change them 
too?
   
   [1] 
https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml
   [2] 
https://github.com/apache/flink-connector-mongodb/blob/main/.github/workflows/weekly.yml



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]

2024-05-14 Thread via GitHub


ruanhang1993 commented on code in PR #102:
URL: 
https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600860337


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java:
##
@@ -43,28 +40,19 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Set;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
-import static org.hamcrest.CoreMatchers.not;
 
-/**
- * A test base for testing {@link TypeSerializer} upgrades.
- *
- * You can run {@link #generateTestSetupFiles(TestSpecification)} on a 
Flink branch to
- * (re-)generate the test data files.
- */
+/** A test base for testing {@link TypeSerializer} upgrades. */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public abstract class TypeSerializerUpgradeTestBase {
-
-public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17;
-
-public static final Set MIGRATION_VERSIONS =
-FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION);
+public abstract class TypeSerializerUpgradeTestBase

Review Comment:
   Hi, @MartijnVisser .
   The changes for `TypeSerializerUpgradeTestBase` in 
[Flink#24603](https://github.com/apache/flink/pull/24603/files#) and 
[Flink#23960](https://github.com/apache/flink/pull/23960/files) make the kafka 
connector not be able to compile with both 1.20-SNAPSHOT and 1.19.0.
   
   I think we still have to maintain `TypeSerializerUpgradeTestBase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-14 Thread via GitHub


loserwang1024 commented on PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#issuecomment-2111475946

   @PatrickRen , @ruanhang1993 , @leonardBang , CC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35346) Introduce pluggable workflow scheduler interface for materialized table

2024-05-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35346:
---
Labels: pull-request-available  (was: )

> Introduce pluggable workflow scheduler interface for materialized table
> ---
>
> Key: FLINK-35346
> URL: https://issues.apache.org/jira/browse/FLINK-35346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table [flink]

2024-05-14 Thread via GitHub


hackergin commented on code in PR #24767:
URL: https://github.com/apache/flink/pull/24767#discussion_r1600850246


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.refresh.RefreshHandler;
+import org.apache.flink.table.refresh.RefreshHandlerSerializer;
+
+/**
+ * This interface is used to interact with specific workflow scheduler 
services that support
+ * creating, modifying, and deleting refreshed workflow of Materialized Table.
+ *
+ * @param  The type of {@link RefreshHandler} used by specific {@link 
WorkflowScheduler} to
+ * locate the refresh workflow in scheduler service.
+ */
+@PublicEvolving
+public interface WorkflowScheduler {
+
+/**
+ * Open this workflow scheduler instance. Used for any required 
preparation in initialization
+ * phase.
+ *
+ * @throws WorkflowException if initializing workflow scheduler occur 
exception
+ */
+void open() throws WorkflowException;
+
+/**
+ * Close this workflow scheduler when it is no longer needed and release 
any resource that it
+ * might be holding.
+ *
+ * @throws WorkflowException if close the related resources of workflow 
scheduler failed
+ */
+void close() throws WorkflowException;
+
+/**
+ * Return a {@link RefreshHandlerSerializer} instance to serialize and 
deserialize {@link
+ * RefreshHandler} created by specific workflow scheduler service.
+ */
+RefreshHandlerSerializer getRefreshHandlerSerializer();
+
+/**
+ * Create a refresh workflow in specific scheduler service for the 
materialized table, return a
+ * {@link RefreshHandler} instance which can locate the refresh workflow 
detail information.
+ *
+ * This method supports creating workflow for periodic refresh, as well 
as workflow for a
+ * one-time refresh only.
+ *
+ * @param createRefreshWorkflow The detail info for create refresh 
workflow of materialized
+ * table.
+ * @return The meta info which points to the refresh workflow in scheduler 
service.
+ * @throws WorkflowException if create refresh workflow failed

Review Comment:
   ```suggestion
* @throws WorkflowException if creating refresh workflow failed
   ```



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.workflow.WorkflowScheduler;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import static 

Re: [PR] [FLINK-35318][table] use UTC timezone to handle TIMESTAMP_WITHOUT_TIM… [flink]

2024-05-14 Thread via GitHub


flinkbot commented on PR #24787:
URL: https://github.com/apache/flink/pull/24787#issuecomment-2111466404

   
   ## CI report:
   
   * 297b48d3db74056754600c1b01a5dfef0a0635f3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]

2024-05-14 Thread via GitHub


ruanhang1993 commented on code in PR #102:
URL: 
https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600864186


##
.github/workflows/push_pr.yml:
##
@@ -28,21 +28,16 @@ jobs:
   compile_and_test:
 strategy:
   matrix:
-flink: [ 1.17.2 ]
-jdk: [ '8, 11' ]
-include:
-  - flink: 1.18.1
-jdk: '8, 11, 17'
-  - flink: 1.19.0
-jdk: '8, 11, 17, 21'
+flink: [ 1.19.0, 1.20-SNAPSHOT ]
+jdk: [ '8, 11, 17, 21' ]

Review Comment:
   Hi, @MartijnVisser .
   I check these files in both jdbc connector and mongodb connector. It seems 
like they all use the Java 21 for Flink 1.19. Do I need to change them too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown

2024-05-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35318:
---
Labels: pull-request-available  (was: )

> incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during 
> predicate pushdown
> -
>
> Key: FLINK-35318
> URL: https://issues.apache.org/jira/browse/FLINK-35318
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1
> Environment: flink version 1.18.1
> iceberg version 1.15.1
>Reporter: linshangquan
>Assignee: linshangquan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-05-09-14-06-58-007.png, 
> image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, 
> image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, 
> image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png
>
>
> In our scenario, we have an Iceberg table that contains a column named 'time' 
> of the {{timestamptz}} data type. This column has 10 rows of data where the 
> 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" 
> timezone.
> !image-2024-05-09-14-06-58-007.png!
>  
> We encountered a strange phenomenon when accessing the table using 
> Iceberg-flink.
> When the {{WHERE}} clause includes the {{time}} column, the results are 
> incorrect.
> ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" 
> !image-2024-05-09-18-52-03-741.png!
> When there is no {{WHERE}} clause, the results are correct.
> !image-2024-05-09-18-52-28-584.png!
> During debugging, we found that when a {{WHERE}} clause is present, a 
> {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes 
> {{RexNodeToExpressionConverter}} for translation.
> !image-2024-05-09-14-11-38-476.png!
> !image-2024-05-09-14-22-59-370.png!
> When {{RexNodeToExpressionConverter#visitLiteral}} encounters a 
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone 
> "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} 
> type. However, the upstream {{TimestampString}} data has already been 
> processed in UTC timezone. By applying the local timezone processing here, an 
> error occurs due to the mismatch in timezones.
> Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in 
> {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should 
> process the data in UTC timezone.
>  
> Please help confirm if this is the issue, and if so, we can submit a patch to 
> fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35318][table] use UTC timezone to handle TIMESTAMP_WITHOUT_TIM… [flink]

2024-05-14 Thread via GitHub


lshangq opened a new pull request, #24787:
URL: https://github.com/apache/flink/pull/24787

   …E_ZONE type in RexNodeToExpressionConverter
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]

2024-05-14 Thread via GitHub


ruanhang1993 commented on code in PR #102:
URL: 
https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1600860337


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java:
##
@@ -43,28 +40,19 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Set;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
-import static org.hamcrest.CoreMatchers.not;
 
-/**
- * A test base for testing {@link TypeSerializer} upgrades.
- *
- * You can run {@link #generateTestSetupFiles(TestSpecification)} on a 
Flink branch to
- * (re-)generate the test data files.
- */
+/** A test base for testing {@link TypeSerializer} upgrades. */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public abstract class TypeSerializerUpgradeTestBase {
-
-public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17;
-
-public static final Set MIGRATION_VERSIONS =
-FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION);
+public abstract class TypeSerializerUpgradeTestBase

Review Comment:
   Hi, @MartijnVisser .
   The changes for `TypeSerializerUpgradeTestBase` in #24603 and #23960 make 
the kafka connector not be able to compile with both 1.20-SNAPSHOT and 1.19.0.
   
   I think we still have to maintain `TypeSerializerUpgradeTestBase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]

2024-05-14 Thread via GitHub


LinMingQiang commented on PR #100:
URL: 
https://github.com/apache/flink-connector-kafka/pull/100#issuecomment-2111454911

   its work , i had try.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-14 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846473#comment-17846473
 ] 

Rui Fan commented on FLINK-35293:
-

Hi [~xiasun] , I saw you added the release note for FLIP-445, would you mind 
recording FLIP-445 into the 1.20 release doc[1]? It's useful for release 
managers to follow it, thanks in advance.

[1]https://cwiki.apache.org/confluence/display/FLINK/1.20+Release

> FLIP-445: Support dynamic parallelism inference for HiveSource
> --
>
> Key: FLINK-35293
> URL: https://issues.apache.org/jira/browse/FLINK-35293
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]
>  introduces dynamic source parallelism inference, which, compared to static 
> inference, utilizes runtime information to more accurately determine the 
> source parallelism. The FileSource already possesses the capability for 
> dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP 
> plans to implement the dynamic parallelism inference interface for 
> HiveSource, and also switches the default static parallelism inference to 
> dynamic parallelism inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35353) Translate "Profiler" page into Chinese

2024-05-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-35353:
---

Assignee: Juan Zifeng

> Translate  "Profiler" page into Chinese
> ---
>
> Key: FLINK-35353
> URL: https://issues.apache.org/jira/browse/FLINK-35353
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.19.0
>Reporter: Juan Zifeng
>Assignee: Juan Zifeng
>Priority: Major
> Fix For: 1.19.0
>
>
> The links are 
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-14 Thread via GitHub


mumuhhh commented on code in PR #24600:
URL: https://github.com/apache/flink/pull/24600#discussion_r1600843684


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##
@@ -236,6 +238,9 @@ private void setTables(ContextResolvedTable catalogTable) {
 tables.add(catalogTable);
 } else {
 for (ContextResolvedTable thisTable : new ArrayList<>(tables)) 
{
+if (tables.contains(catalogTable)) {

Review Comment:
   > I think we can use a boolean flag to check here, then we don't need to 
call contains method every time, it is O(N) time complexity.
   > 
   > ```
   > boolean hasAdded = false;
   > for (ContextResolvedTable thisTable : new 
ArrayList<>(tables)) {
   > if (hasAdded) {
   > break;
   > }
   > if 
(!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
   > tables.add(catalogTable);
   > hasAdded = true;
   > }
   > }
   > ```
   
   I think we should modify the traversal logic.
   ```
   boolean hasAdded = false;
   for (ContextResolvedTable thisTable : new 
ArrayList<>(tables)) {
   if 
(thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
   hasAdded = true;
   break;
   }
   }
   if (!hasAdded) {
   tables.add(catalogTable);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-14 Thread via GitHub


mumuhhh commented on code in PR #24600:
URL: https://github.com/apache/flink/pull/24600#discussion_r1600843684


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##
@@ -236,6 +238,9 @@ private void setTables(ContextResolvedTable catalogTable) {
 tables.add(catalogTable);
 } else {
 for (ContextResolvedTable thisTable : new ArrayList<>(tables)) 
{
+if (tables.contains(catalogTable)) {

Review Comment:
   > 我想我们可以在这里使用布尔标志来检查,那么我们不需要每次都调用方法,它是 O(N) 时间复杂度。`contains`
   > 
   > ```
   > boolean hasAdded = false;
   > for (ContextResolvedTable thisTable : new 
ArrayList<>(tables)) {
   > if (hasAdded) {
   > break;
   > }
   > if 
(!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
   > tables.add(catalogTable);
   > hasAdded = true;
   > }
   > }
   > ```
   
   I think we should modify the traversal logic.
   ```
   boolean hasAdded = false;
   for (ContextResolvedTable thisTable : new 
ArrayList<>(tables)) {
   if 
(thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
   hasAdded = true;
   break;
   }
   }
   if (!hasAdded) {
   tables.add(catalogTable);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-14 Thread via GitHub


mumuhhh commented on code in PR #24600:
URL: https://github.com/apache/flink/pull/24600#discussion_r1600841072


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##
@@ -115,7 +117,7 @@ private static class DppDimSideChecker {
 private final RelNode relNode;
 private boolean hasFilter;
 private boolean hasPartitionedScan;
-private final List tables = new ArrayList<>();
+private final Set tables = new HashSet<>();

Review Comment:
   Why do we write traversal comparisons like that?
   ```
   boolean hasAdded = false;
   for (ContextResolvedTable thisTable : new 
ArrayList<>(tables)) {
   if 
(thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
   hasAdded = true;
   break;
   }
   }
   if (!hasAdded) {
   tables.add(catalogTable);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fsip module 2b [flink-training]

2024-05-14 Thread via GitHub


manoellins opened a new pull request, #82:
URL: https://github.com/apache/flink-training/pull/82

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(module-1): Implementing ride-cleansing and rides-and-fares [flink-training]

2024-05-14 Thread via GitHub


gerson23 closed pull request #81: feat(module-1): Implementing ride-cleansing 
and rides-and-fares
URL: https://github.com/apache/flink-training/pull/81


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] feat(module-1): Implementing ride-cleansing and rides-and-fares [flink-training]

2024-05-14 Thread via GitHub


gerson23 opened a new pull request, #81:
URL: https://github.com/apache/flink-training/pull/81

   * Implemented ride-cleaning by filtering in rides start and ending within 
NYC coordinates
   * In rids-and-fares, TaxiRide and TaxiFare may come in different order, so 
we need to have a state value for each. State is cleaned as soon as it is 
paired.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-14 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846454#comment-17846454
 ] 

Kanthi Vaidya commented on FLINK-35289:
---

Any updates on this issue ?

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-27741.
-
Fix Version/s: 1.20.0
   Resolution: Fixed

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
>   at 
> 

[jira] [Commented] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846452#comment-17846452
 ] 

Sergey Nuyanzin commented on FLINK-27741:
-

Merged to master as 
[40fb49dd17b3e1b6c5aa0249514273730ebe9226|https://github.com/apache/flink/commit/40fb49dd17b3e1b6c5aa0249514273730ebe9226]

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198)
>   

[jira] [Assigned] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2024-05-14 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-27741:
---

Assignee: chenzihao

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Assignee: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
>   at 
> 

Re: [PR] [BP-1.19][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]

2024-05-14 Thread via GitHub


flinkbot commented on PR #24786:
URL: https://github.com/apache/flink/pull/24786#issuecomment-272825

   
   ## CI report:
   
   * c83d644fa3eb0bce139ecd651719570de94eadd5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34896][table] Migrate CorrelateSortToRankRule to java [flink]

2024-05-14 Thread via GitHub


snuyanzin commented on code in PR #24545:
URL: https://github.com/apache/flink/pull/24545#discussion_r1600671064


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories;
+import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank. Typically, the 
following plan
+ *
+ * {@code
+ * LogicalProject(state=[$0], name=[$1])
+ * +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *:- LogicalAggregate(group=[{0}])
+ *:  +- LogicalProject(state=[$1])
+ *: +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *+- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ *   +- LogicalProject(name=[$0], pop=[$2])
+ *  +- LogicalFilter(condition=[=($1, $cor0.state)])
+ * +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }
+ *
+ * would be transformed to
+ *
+ * {@code
+ * LogicalProject(state=[$0], name=[$1])
+ *  +- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ * +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3],
+ *  partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *+- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }
+ *
+ * To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should be a
+ * Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * This rule can only be used in {@link HepPlanner}.
+ */
+@Value.Enclosing
+public class CorrelateSortToRankRule
+extends RelRule 
{
+
+public static final CorrelateSortToRankRule INSTANCE =
+
CorrelateSortToRankRule.CorrelateSortToRankRuleConfig.DEFAULT.toRule();
+
+protected CorrelateSortToRankRule(CorrelateSortToRankRuleConfig config) {
+super(config);
+}
+
+@Override
+public boolean matches(RelOptRuleCall call) {
+Correlate correlate = call.rel(0);
+if (correlate.getJoinType() != JoinRelType.INNER) {
+return false;
+}
+Aggregate agg = call.rel(1);
+if (!agg.getAggCallList().isEmpty() || agg.getGroupSets().size() > 1) {
+return false;
+}
+Project aggInput = call.rel(2);
+if (!aggInput.isMapping()) {
+return false;
+}
+Sort sort = call.rel(3);
+if (sort.offset != null || sort.fetch == null) {
+// 1. we can not 

Re: [PR] [BP-1.18][FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() [flink]

2024-05-14 Thread via GitHub


flinkbot commented on PR #24785:
URL: https://github.com/apache/flink/pull/24785#issuecomment-267273

   
   ## CI report:
   
   * 7122ef0fcce7c7a5419450fb40ec1e773c223642 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


snuyanzin commented on PR #19797:
URL: https://github.com/apache/flink/pull/19797#issuecomment-253012

   Thanks for the review @xuyangzhong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


snuyanzin closed pull request #19797: [FLINK-27741][table-planner] Fix NPE when 
use dense_rank() and rank()…
URL: https://github.com/apache/flink/pull/19797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-24298) Refactor Google PubSub sink to use Unified Sink API

2024-05-14 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846426#comment-17846426
 ] 

Ahmed Hamdy commented on FLINK-24298:
-

[~martijnvisser] I am happy to work on this,  to complete migration of sinks 
using deprecated {{SinkFunction}}.
Could you please assign to me?

> Refactor Google PubSub sink to use Unified Sink API
> ---
>
> Key: FLINK-24298
> URL: https://issues.apache.org/jira/browse/FLINK-24298
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> Refactor Google PubSub source to use Unified Sink API 
> [FLIP-143|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-14 Thread via GitHub


JingGe commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1600605397


##
docs/content/docs/deployment/advanced/job_status_listener.md:
##
@@ -0,0 +1,81 @@
+
+---
+title: "Job Status Changed Listener"
+nav-title: job-status-listener
+nav-parent_id: advanced
+nav-pos: 3
+---
+
+
+## Job status changed listener
+Flink provides a pluggable interface for users to register their custom logic 
for handling with the job status changes in which lineage info about 
source/sink is provided.

Review Comment:
   If I am not mistaken, you are implementing the second part of FLIP-314. Does 
it make sense to update the Jira ticket and the PR description with the info 
and the content of this md? It will be easier for others to quickly understand 
the context and join the discussion/review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-14 Thread via GitHub


vahmed-hamdy commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2111082697

   @snuyanzin  would you mind taking a look, We want to complete this migration 
since the `SinkFuction` is deprecated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests

2024-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846380#comment-17846380
 ] 

João Boto commented on FLINK-35333:
---

on #120 I try to fix the error, but another error appears so I believe that 
remove the check for v3.1 is the best solution (done on #121)

I left the two PRs open so we can evaluate how to solve this.

> JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
> -
>
> Key: FLINK-35333
> URL: https://issues.apache.org/jira/browse/FLINK-35333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-jdbc: Compilation failure
> Error:  
> /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37]
>   is not 
> abstract and does not override abstract method getTaskInfo() in 
> org.apache.flink.api.common.functions.RuntimeContext
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35333] Remove weekly check for v3.1 with 1.19 [flink-connector-jdbc]

2024-05-14 Thread via GitHub


eskabetxe commented on PR #121:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/121#issuecomment-2110737886

   @MartijnVisser I think this is the best solution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35333] Remove weekly check for v3.1 with 1.19 [flink-connector-jdbc]

2024-05-14 Thread via GitHub


eskabetxe opened a new pull request, #121:
URL: https://github.com/apache/flink-connector-jdbc/pull/121

   the branch cut for v3.1 was not prepared for 1.19 versions
   
   in another PR we introduce this check, but I think that should be avoided as 
the cut is not prepared for that..
   
   to prepare the branch to 1.19 we will have to bring some changes that are in 
master to solve some 1.19 problems


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests

2024-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846373#comment-17846373
 ] 

João Boto edited comment on FLINK-35333 at 5/14/24 4:42 PM:


I think I resolve the problem, but on v3.1 cut the branch was not prepared for 
1.19

I introduce this change on weekly workflow with the new Sink, I know I don't 
know if was ok..
{code:json}
{
flink: 1.19.0,
jdk: '8, 11, 17, 21',
branch: v3.1
}
{code}


Should we avoid add new version checks on weekly that the cut was not prepared 
for??

The simple solution is to remove that from weekly file from master as v3.1 cut 
was not prepared for 1.19 versions



was (Author: eskabetxe):
I think I resolve the problem, but on v3.1 cut the branch was not prepared for 
1.19

I introduce this change on weekly workflow with the new Sink, I know I don't 
know if was ok..
{code:json}
{
flink: 1.19.0,
jdk: '8, 11, 17, 21',
branch: v3.1
}
{code}


Should we avoid add new version checks on weekly that the cut was not prepared 
for??


> JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
> -
>
> Key: FLINK-35333
> URL: https://issues.apache.org/jira/browse/FLINK-35333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-jdbc: Compilation failure
> Error:  
> /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37]
>   is not 
> abstract and does not override abstract method getTaskInfo() in 
> org.apache.flink.api.common.functions.RuntimeContext
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests

2024-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846373#comment-17846373
 ] 

João Boto commented on FLINK-35333:
---

I think I resolve the problem, but on v3.1 cut the branch was not prepared for 
1.19

I introduce this change on weekly workflow with the new Sink, I know I don't 
know if was ok..
{code:json}
{
flink: 1.19.0,
jdk: '8, 11, 17, 21',
branch: v3.1
}
{code}


Should we avoid add new version checks on weekly that the cut was not prepared 
for??


> JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
> -
>
> Key: FLINK-35333
> URL: https://issues.apache.org/jira/browse/FLINK-35333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-jdbc: Compilation failure
> Error:  
> /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37]
>   is not 
> abstract and does not override abstract method getTaskInfo() in 
> org.apache.flink.api.common.functions.RuntimeContext
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests

2024-05-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35333:
---
Labels: pull-request-available test-stability  (was: test-stability)

> JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
> -
>
> Key: FLINK-35333
> URL: https://issues.apache.org/jira/browse/FLINK-35333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-jdbc: Compilation failure
> Error:  
> /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37]
>   is not 
> abstract and does not override abstract method getTaskInfo() in 
> org.apache.flink.api.common.functions.RuntimeContext
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35333) JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests

2024-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846370#comment-17846370
 ] 

João Boto commented on FLINK-35333:
---

[~martijnvisser] could you assign this to me..

> JdbcXaSinkTestBase fails in weekly Flink JDBC Connector tests
> -
>
> Key: FLINK-35333
> URL: https://issues.apache.org/jira/browse/FLINK-35333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-jdbc/actions/runs/9047366679/job/24859224407#step:15:147
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-jdbc: Compilation failure
> Error:  
> /home/runner/work/flink-connector-jdbc/flink-connector-jdbc/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java:[164,37]
>   is not 
> abstract and does not override abstract method getTaskInfo() in 
> org.apache.flink.api.common.functions.RuntimeContext
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35333] Fix JdbcXaSinkTestBase fails in weekly workflows [flink-connector-jdbc]

2024-05-14 Thread via GitHub


eskabetxe opened a new pull request, #120:
URL: https://github.com/apache/flink-connector-jdbc/pull/120

   Fix JdbcXaSinkTestBase fails in weekly workflows 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30284][flink-metrics] Modify configuration of DataDog http reporter url. [flink]

2024-05-14 Thread via GitHub


anleib commented on PR #23610:
URL: https://github.com/apache/flink/pull/23610#issuecomment-2110657281

   @Wosin can you please fix formatting. Would like to push this for a merge; 
my org recently migrated to US3 and broke all Flink metrics/dashboards/alerts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-24298) Refactor Google PubSub sink to use Unified Sink API

2024-05-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24298:
---
Labels: pull-request-available  (was: )

> Refactor Google PubSub sink to use Unified Sink API
> ---
>
> Key: FLINK-24298
> URL: https://issues.apache.org/jira/browse/FLINK-24298
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> Refactor Google PubSub source to use Unified Sink API 
> [FLIP-143|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-14 Thread via GitHub


vahmed-hamdy opened a new pull request, #27:
URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27

   
   
   ## Purpose of the change
   
   Add implementation of `PubSubV2Sink` extending new `Sink` API.
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   - Added unit tests
   - Added Integration tests
   
   ## Dependency Updates
   - Updated least compatible Flink version to 1.19
   
   ## Documentation
   - Added Documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-05-14 Thread via GitHub


gyfora commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1600288388


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   here again I would prefer to completely separate the logic for the 
pulser/kafka counting as the logic doesn't really overlap and this is just more 
confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-05-14 Thread via GitHub


gyfora commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1600286605


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();

Review Comment:
   I wonder if we should simply have 2 separate regex / predicates and just 
`or(...)` them together to have a cleaner logic here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34931) Update Kudu DataStream connector to use Sink V2

2024-05-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-34931:
-
Summary: Update Kudu DataStream connector to use Sink V2  (was: Update Kudu 
connector DataStream Sink implementation)

> Update Kudu DataStream connector to use Sink V2
> ---
>
> Key: FLINK-34931
> URL: https://issues.apache.org/jira/browse/FLINK-34931
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kudu
>Reporter: Ferenc Csaky
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
>
> Update the DataSource API classes to use the current interfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34931) Update Kudu connector DataStream Sink implementation

2024-05-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34931:
---
Labels: pull-request-available  (was: )

> Update Kudu connector DataStream Sink implementation
> 
>
> Key: FLINK-34931
> URL: https://issues.apache.org/jira/browse/FLINK-34931
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kudu
>Reporter: Ferenc Csaky
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
>
> Update the DataSource API classes to use the current interfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


xuyangzhong commented on code in PR #19797:
URL: https://github.com/apache/flink/pull/19797#discussion_r1600169931


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   Oh, I missed it. Pardon me...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


snuyanzin commented on code in PR #19797:
URL: https://github.com/apache/flink/pull/19797#discussion_r1600155947


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   Besides plans there are already a test in `OverAggregateITCase. scala` both 
for `RANK` and `DENSE_RANK`, that's way I have this question
   
https://github.com/apache/flink/pull/19797/files#diff-0fa625835219d2a3fcacb2fa8de5274ed6795f10b06035c6c9b68d11b700f9a8R198-R203



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


xuyangzhong commented on code in PR #19797:
URL: https://github.com/apache/flink/pull/19797#discussion_r1600130717


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   What I mean is to add a test `testRankOnOver` in` OverAggregateITCase. 
scala` to test the function `Rank()`. Because the UT tests about plan here 
cannot test the modified part (when I reversed the changes in 
`AggFunctionFactory.scala`, these tests still passed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-14 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846319#comment-17846319
 ] 

Ryan Skraba commented on FLINK-35342:
-

Thank you for the fix!

Just to be complete, this failure occurred before the fix was merged:

* 1.20 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/9072668322/job/24928769693#step:10:12490

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-05-14 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846318#comment-17846318
 ] 

Ryan Skraba commented on FLINK-35002:
-

* 1.18 Default (Java 8) / Test (module: tests) 
https://github.com/apache/flink/commit/1f604da2dfc831d04826a20b3cb272d2ad9dfb56/checks/24935906143/logs

> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
> 2024-04-02T02:22:59.9893296Z Post job cleanup.
> 2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
> (This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


snuyanzin commented on code in PR #19797:
URL: https://github.com/apache/flink/pull/19797#discussion_r1600062134


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   >I've attempted this case, and it seems only possible to test it within an 
integration test in stream mode. I propose we can add another testRankOnOver 
test specifically for testing the RANK() function.
   
   this I didn't get, there is already`testRankOnOver`, would you like to add 
another one?
   
   >Maybe we'd better address this as well
   yep, makes sense, I extracted the logic and reused it for 
§createPercentRankAggFunction` as well
   



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   >I've attempted this case, and it seems only possible to test it within an 
integration test in stream mode. I propose we can add another testRankOnOver 
test specifically for testing the RANK() function.
   
   this I didn't get, there is already`testRankOnOver`, would you like to add 
another one?
   
   >Maybe we'd better address this as well
   
   yep, makes sense, I extracted the logic and reused it for 
§createPercentRankAggFunction` as well
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-05-14 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin updated FLINK-35354:

Description: 
In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.

 

Add param:

 tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9

  was:
In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.

 

add param:

 tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9


> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
> Fix For: cdc-3.1.0, cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode in tikv, we 
> need to do network mapping, such as `spark.tispark.host_mapping` in 
> [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.
>  
> Add param:
>  tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-05-14 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin updated FLINK-35354:

Description: 
In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.

 

add param:

 tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9

  was:
In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.

 

add param:

 tikv.host_mapping


> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
> Fix For: cdc-3.1.0, cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode in tikv, we 
> need to do network mapping, such as `spark.tispark.host_mapping` in 
> [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.
>  
> add param:
>  tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-05-14 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin updated FLINK-35354:

Description: 
In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.

 

add param:

 tikv.host_mapping

  was:In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.


> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
> Fix For: cdc-3.1.0, cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode in tikv, we 
> need to do network mapping, such as `spark.tispark.host_mapping` in 
> [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.
>  
> add param:
>  tikv.host_mapping



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-05-14 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin updated FLINK-35354:

Description: In tidb production environment deployment, there are usually 
two kinds of network: internal network and public network. When we use pd mode 
in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in 
[https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.  (was: In 
tidb production environment deployment, there are usually two kinds of network: 
internal network and public network. When we use pd mode kv, we need to do 
network mapping, such as `spark.tispark.host_mapping` in 
https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.)

> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
> Fix For: cdc-3.1.0, cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode in tikv, we 
> need to do network mapping, such as `spark.tispark.host_mapping` in 
> [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35356) Async reducing state

2024-05-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35356:
---

 Summary: Async reducing state
 Key: FLINK-35356
 URL: https://issues.apache.org/jira/browse/FLINK-35356
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35355) Async aggregating state

2024-05-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35355:
---

 Summary: Async aggregating state
 Key: FLINK-35355
 URL: https://issues.apache.org/jira/browse/FLINK-35355
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-05-14 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin updated FLINK-35354:

Summary: Support host mapping in Flink tikv cdc  (was: [discuss] Support 
host mapping in Flink tikv cdc)

> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
> Fix For: cdc-3.1.0, cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode kv, we need 
> to do network mapping, such as `spark.tispark.host_mapping` in 
> https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35354) [discuss] Support host mapping in Flink tikv cdc

2024-05-14 Thread ouyangwulin (Jira)
ouyangwulin created FLINK-35354:
---

 Summary: [discuss] Support host mapping in Flink tikv cdc
 Key: FLINK-35354
 URL: https://issues.apache.org/jira/browse/FLINK-35354
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0, cdc-3.2.0
Reporter: ouyangwulin
 Fix For: cdc-3.1.0, cdc-3.2.0


In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode kv, we need 
to do network mapping, such as `spark.tispark.host_mapping` in 
https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-14 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846312#comment-17846312
 ] 

Feng Jin commented on FLINK-35342:
--

fixed by:   94d861b08fef1e350d80a3f5f0f63168d327bc64

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode

2024-05-14 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu resolved FLINK-35193.
---
Resolution: Fixed

> Support drop materialized table syntax and execution in continuous refresh 
> mode
> ---
>
> Key: FLINK-35193
> URL: https://issues.apache.org/jira/browse/FLINK-35193
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: Feng Jin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support drop materialized table and the 
> background refresh job.
> {code:SQL}
> DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode

2024-05-14 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846309#comment-17846309
 ] 

dalongliu commented on FLINK-35193:
---

Merged in master: 94d861b08fef1e350d80a3f5f0f63168d327bc64

> Support drop materialized table syntax and execution in continuous refresh 
> mode
> ---
>
> Key: FLINK-35193
> URL: https://issues.apache.org/jira/browse/FLINK-35193
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: Feng Jin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support drop materialized table and the 
> background refresh job.
> {code:SQL}
> DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35193][table] Support drop materialized table syntax and execution in continuous refresh mode [flink]

2024-05-14 Thread via GitHub


lsyldliu merged PR #24777:
URL: https://github.com/apache/flink/pull/24777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33986) Extend shuffleMaster to support batch snapshot.

2024-05-14 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-33986:

Component/s: Runtime / Coordination

> Extend shuffleMaster to support batch snapshot.
> ---
>
> Key: FLINK-33986
> URL: https://issues.apache.org/jira/browse/FLINK-33986
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Extend shuffleMaster to support batch snapshot as follows:
>  # Add method supportsBatchSnapshot to identify whether the shuffle master 
> supports taking snapshot in batch scenarios
>  # Add method snapshotState and restoreState to snapshot and restore the 
> shuffle master's state.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33986) Extend shuffleMaster to support batch snapshot.

2024-05-14 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-33986.
---
Fix Version/s: 1.20.0
   Resolution: Done

65d31e26534836909f6b8139c6bd6cd45b91bba4

> Extend shuffleMaster to support batch snapshot.
> ---
>
> Key: FLINK-33986
> URL: https://issues.apache.org/jira/browse/FLINK-33986
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Extend shuffleMaster to support batch snapshot as follows:
>  # Add method supportsBatchSnapshot to identify whether the shuffle master 
> supports taking snapshot in batch scenarios
>  # Add method snapshotState and restoreState to snapshot and restore the 
> shuffle master's state.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35353) Translate "Profiler" page into Chinese

2024-05-14 Thread Juan Zifeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846301#comment-17846301
 ] 

Juan Zifeng commented on FLINK-35353:
-

Hi [~jark], could you assign this to me?

> Translate  "Profiler" page into Chinese
> ---
>
> Key: FLINK-35353
> URL: https://issues.apache.org/jira/browse/FLINK-35353
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.19.0
>Reporter: Juan Zifeng
>Priority: Major
> Fix For: 1.19.0
>
>
> The links are 
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state. [flink]

2024-05-14 Thread via GitHub


zhuzhurk merged PR #24774:
URL: https://github.com/apache/flink/pull/24774


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35353) Translate "Profiler" page into Chinese

2024-05-14 Thread Juan Zifeng (Jira)
Juan Zifeng created FLINK-35353:
---

 Summary: Translate  "Profiler" page into Chinese
 Key: FLINK-35353
 URL: https://issues.apache.org/jira/browse/FLINK-35353
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Affects Versions: 1.19.0
Reporter: Juan Zifeng
 Fix For: 1.19.0


The links are 
https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846289#comment-17846289
 ] 

Weijie Guo commented on FLINK-35342:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59528=logs=f2c100be-250b-5e85-7bbe-176f68fcddc5=05efd11e-5400-54a4-0d27-a4663be008a9=12763

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


xuyangzhong commented on code in PR #19797:
URL: https://github.com/apache/flink/pull/19797#discussion_r1599910377


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   The plan tests in batch mode appear not to be able to capture the 
modifications introduced by this PR, and I apologize for mentioning the unit 
test for the plan earlier. 
   
   I've attempted this case, and it seems only possible to test it within an 
integration test in stream mode. I propose we can add another testRankOnOver 
test specifically for testing the RANK() function. 
   
   Additionally, there seems to be a potential risk of encountering a NPE in 
`AggFunctionFactory#createPercentRankAggFunction` (even though `PercentRank` is 
not currently supported in streaming mode, the code suggests such a risk). 
Maybe we'd better address this as well? 
   
   // -- 
   // Broadening the discussion a bit, perhaps we should consider fundamentally 
preventing the `orderKeyIndexes`, which isn't marked as Nullable, from being 
null, although this would require substantial changes 樂. I agree that we can 
just avoid huge changes as demonstrated by this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-14 Thread via GitHub


xuyangzhong commented on code in PR #19797:
URL: https://github.com/apache/flink/pull/19797#discussion_r1599910377


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala:
##
@@ -47,6 +48,18 @@ class OverAggregateTest extends TableTestBase {
 util.verifyExecPlan("SELECT c, SUM(a) OVER (ORDER BY b) FROM MyTable")
   }
 
+  @Test
+  def testDenseRankOnOrder(): Unit = {

Review Comment:
   The plan tests in batch mode appear not to be able to capture the 
modifications introduced by this PR, and I apologize for mentioning the unit 
test for the plan earlier. 
   
   I've attempted this case, and it seems only possible to test it within an 
integration test in stream mode. I propose we add another testRankOnOver test 
specifically for testing the RANK() function. 
   
   Additionally, there seems to be a potential risk of encountering a NPE in 
`AggFunctionFactory#createPercentRankAggFunction` (even though `PercentRank` is 
not currently supported in streaming mode, the code suggests such a risk). 
Maybe we'd better address this as well? 
   
   // -- 
   // Broadening the discussion a bit, perhaps we should consider fundamentally 
preventing the `orderKeyIndexes`, which isn't marked as Nullable, from being 
null, although this would require substantial changes 樂. I agree that we can 
just avoid huge changes as demonstrated by this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]

2024-05-14 Thread via GitHub


reswqa commented on PR #24741:
URL: https://github.com/apache/flink/pull/24741#issuecomment-2110044178

   Sorry for the delay. The ci build seems also have some problem, would you 
mind taking a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33186) CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished fails on AZP

2024-05-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846284#comment-17846284
 ] 

Weijie Guo commented on FLINK-33186:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59529=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8036

>  CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished 
> fails on AZP
> -
>
> Key: FLINK-33186
> URL: https://issues.apache.org/jira/browse/FLINK-33186
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Sergey Nuyanzin
>Assignee: Jiang Xin
>Priority: Critical
>  Labels: test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53509=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8762
> fails as
> {noformat}
> Sep 28 01:23:43 Caused by: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task local 
> checkpoint failure.
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:550)
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2248)
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2235)
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:817)
> Sep 28 01:23:43   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> Sep 28 01:23:43   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Sep 28 01:23:43   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> Sep 28 01:23:43   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> Sep 28 01:23:43   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> Sep 28 01:23:43   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> Sep 28 01:23:43   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-14 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1599601719


##
docs/data/sql_functions.yml:
##
@@ -350,6 +350,17 @@ string:
   - sql: LOCATE(string1, string2[, integer])
 table: STRING1.locate(STRING2[, INTEGER])
 description: Returns the position of the first occurrence of string1 in 
string2 after position integer. Returns 0 if not found. Returns NULL if any of 
arguments is NULL.
+  - sql: URL_DECODE(string)
+table: STRING.urlDecode()
+description:
+  Decodes a given string in 'application/x-www-form-urlencoded' format 
using the UTF-8 encoding scheme, will be null if input is null.
+  If there is an issue with the decoding process, such as encountering an 
illegal escape pattern, the function returns the original input value.
+  If the encoding scheme is not supported, an error will be reported.

Review Comment:
   So should I construct a string encoded with UTF_16 to test? Actually I have 
no idea of this, could you give me some advice? After all, I noticed that the 
Calcite has no test on non UTF8.
   
   Maybe we just keep it like Calcite does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-14 Thread xingbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846276#comment-17846276
 ] 

xingbe commented on FLINK-35293:


Thank you [~zhuzh]  for helping to review the PR! I will add a release note and 
close the ticket.

> FLIP-445: Support dynamic parallelism inference for HiveSource
> --
>
> Key: FLINK-35293
> URL: https://issues.apache.org/jira/browse/FLINK-35293
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]
>  introduces dynamic source parallelism inference, which, compared to static 
> inference, utilizes runtime information to more accurately determine the 
> source parallelism. The FileSource already possesses the capability for 
> dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP 
> plans to implement the dynamic parallelism inference interface for 
> HiveSource, and also switches the default static parallelism inference to 
> dynamic parallelism inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-14 Thread xingbe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xingbe resolved FLINK-35293.

Fix Version/s: 1.20.0
 Release Note: 
In Flink 1.20, we have introduced support for dynamic source parallelism 
inference in batch jobs for the Hive source connector. This allows the 
connector to dynamically determine parallelism based on the actual partitions 
with dynamic partition pruning.
Additionally, we have introduced a new configuration option, 
'table.exec.hive.infer-source-parallelism.mode,' to enable users to choose 
between static and dynamic inference modes for source parallelism. By default, 
the mode is set to 'dynamic'. Users may configure it to 'static' for static 
inference, 'dynamic' for dynamic inference, or 'none' to disable automatic 
parallelism inference altogether. It should be noted that in Flink 1.20, the 
previous configration option 'table.exec.hive.infer-source-parallelism' has 
been marked as deprecated, but it will continue to serve as a switch for 
automatic parallelism inference until it is fully phased out.
   Resolution: Fixed

> FLIP-445: Support dynamic parallelism inference for HiveSource
> --
>
> Key: FLINK-35293
> URL: https://issues.apache.org/jira/browse/FLINK-35293
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]
>  introduces dynamic source parallelism inference, which, compared to static 
> inference, utilizes runtime information to more accurately determine the 
> source parallelism. The FileSource already possesses the capability for 
> dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP 
> plans to implement the dynamic parallelism inference interface for 
> HiveSource, and also switches the default static parallelism inference to 
> dynamic parallelism inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]

2024-05-14 Thread via GitHub


MartijnVisser commented on code in PR #102:
URL: 
https://github.com/apache/flink-connector-kafka/pull/102#discussion_r1599848408


##
.github/workflows/push_pr.yml:
##
@@ -28,21 +28,16 @@ jobs:
   compile_and_test:
 strategy:
   matrix:
-flink: [ 1.17.2 ]
-jdk: [ '8, 11' ]
-include:
-  - flink: 1.18.1
-jdk: '8, 11, 17'
-  - flink: 1.19.0
-jdk: '8, 11, 17, 21'
+flink: [ 1.19.0, 1.20-SNAPSHOT ]
+jdk: [ '8, 11, 17, 21' ]

Review Comment:
   Flink 1.19 doesn't support Java 21, so you'll probably have to make the 
correct changes here. 



##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.java:
##
@@ -43,28 +40,19 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Set;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
-import static org.hamcrest.CoreMatchers.not;
 
-/**
- * A test base for testing {@link TypeSerializer} upgrades.
- *
- * You can run {@link #generateTestSetupFiles(TestSpecification)} on a 
Flink branch to
- * (re-)generate the test data files.
- */
+/** A test base for testing {@link TypeSerializer} upgrades. */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public abstract class TypeSerializerUpgradeTestBase {
-
-public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17;
-
-public static final Set MIGRATION_VERSIONS =
-FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION);
+public abstract class TypeSerializerUpgradeTestBase

Review Comment:
   Can't we now get rid of this one completely? I thought we copied this one 
over as part of 
https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
 as a temporary solution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >