Re: [PR] [fix-issue-2676] repair a snapshot-split bug: [flink-cdc]

2024-07-11 Thread via GitHub


yuxiqian commented on PR #2968:
URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2224947947

   Hi @AidenPerce, is there any updates on this PR? Feel free to comment here 
if you need any help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.

2024-07-11 Thread Yuan Kui (Jira)
Yuan Kui created FLINK-35826:


 Summary: [SQL] Sliding window may produce unstable calculations 
when processing changelog data.
 Key: FLINK-35826
 URL: https://issues.apache.org/jira/browse/FLINK-35826
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.20.0
 Environment: flink with release-1.20
Reporter: Yuan Kui
 Attachments: image-2024-07-12-14-27-58-061.png

Calculation results may be unstable when using a sliding window to process 
changelog data. The test results are partial success and partial failure:

!image-2024-07-12-14-27-58-061.png!

See the documentation and code for more details.

[https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing]

code:

[[BUG] Reproduce the issue of unstable sliding window calculation results · 
yuchengxin/flink@c003e45 
(github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35816][table] Non-mergeable proctime tvf window aggregate needs to fallback to group aggregate [flink]

2024-07-11 Thread via GitHub


xuyangzhong commented on code in PR #25075:
URL: https://github.com/apache/flink/pull/25075#discussion_r1675358942


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##
@@ -385,35 +384,97 @@ object WindowUtil {
 }
   }
 
-  private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+  private def isValidRowtimeWindow(windowProperties: RelWindowProperties): 
Boolean = {
+// rowtime tvf window can support calculation on window columns even 
before aggregation
+windowProperties.isRowtime
+  }
 
-@tailrec
-def find(rel: RelNode): Unit = {
-  rel match {
-case rss: RelSubset =>
-  val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
-  find(innerRel)
+  /**
+   * If the middle Calc(s) contains call(s) on window columns, we should not 
convert the Aggregate
+   * into WindowAggregate but GroupAggregate instead.
+   *
+   * The valid plan structure is like:
+   *
+   * {{{
+   * Aggregate
+   *  |
+   * Calc (should not contain call on window columns)
+   *  |
+   * WindowTableFunctionScan
+   * }}}
+   *
+   * and unlike:
+   *
+   * {{{
+   * Aggregate
+   *  |
+   * Calc
+   *  |
+   * Aggregate
+   *  |
+   * Calc
+   *  |
+   * WindowTableFunctionScan
+   * }}}
+   */
+  private def isValidProcTimeWindow(
+  windowProperties: RelWindowProperties,
+  fmq: FlinkRelMetadataQuery,
+  agg: FlinkLogicalAggregate): Boolean = {
+var existNeighbourWindowTableFunc = false
+val calcMatcher = new CalcWindowFunctionScanMatcher
+try {
+  calcMatcher.go(agg.getInput(0))
+} catch {
+  case r: Util.FoundOne =>
+r.getNode match {
+  case _: Some[_] =>
+existNeighbourWindowTableFunc = true
+  case _ => // do nothing
+}
+}
+var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
+  calcMatcher.calcNodes.exists(calc => 
calcContainsCallsOnWindowColumns(calc, fmq))
+
+// aggregate call shouldn't be on window columns
+val aggInputWindowProps = windowProperties.getWindowColumns
+existCallOnWindowColumns = existCallOnWindowColumns || 
!agg.getAggCallList.forall {
+  call => 
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
+}
 
+// proctime window can's support calculation on window columns before 
aggregation,
+// and need to check if there is a neighbour windowTableFunctionCall
+!existCallOnWindowColumns && existNeighbourWindowTableFunc
+  }
+
+  private class CalcWindowFunctionScanMatcher extends RelVisitor {
+val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]()
+
+override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+  node match {
+case calc: Calc =>
+  calcNodes += calc
+  // continue to visit children
+  super.visit(calc, 0, parent)
 case scan: FlinkLogicalTableFunctionScan =>
   if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
-throw new Util.FoundOne
+throw new Util.FoundOne(Some(0))
   }
-  find(scan.getInput(0))
-
-// proctime attribute comes from these operators can not be used 
directly for proctime
-// window aggregate, so further traversal of child nodes is unnecessary
-case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _: 
FlinkLogicalJoin =>
-
-case sr: SingleRel => find(sr.getInput)
+case rss: RelSubset =>
+  val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+  // special case doesn't call super.visit for RelSubSet because it 
has no children
+  visit(innerRel, 0, rss)
+case _: FlinkLogicalAggregate | _: FlinkLogicalCorrelate | _: 
FlinkLogicalIntersect |
+_: FlinkLogicalJoin | _: FlinkLogicalMatch | _: FlinkLogicalMinus |
+_: FlinkLogicalOverAggregate | _: FlinkLogicalRank | _: 
FlinkLogicalUnion =>
+  // proctime attribute comes from these operators can not be used 
directly for proctime
+  // window aggregate, so further traversal of child nodes is 
unnecessary
+  throw new Util.FoundOne(Option.empty)

Review Comment:
   We can return directly here to stop the further traversal.



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##
@@ -385,35 +384,97 @@ object WindowUtil {
 }
   }
 
-  private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+  private def isValidRowtimeWindow(windowProperties: RelWindowProperties): 
Boolean = {
+// rowtime tvf window can support calculation on window columns even 
before aggregation
+windowProperties.isRowtime
+  }
 
-@tailrec
-def find(rel: RelNode): Unit = {
-  rel match {
-case rss: RelSubset =>
-  val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
- 

[jira] [Commented] (FLINK-21436) Speed ​​up the restore of UnionListState

2024-07-11 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865323#comment-17865323
 ] 

Yue Ma commented on FLINK-21436:


[~fanrui] Yes, i think you are right , Most of the operators using UnionState 
are legacy sources and sinks. But there are still some users who also use 
unionState themselves. My question is if we plan to deprecate unionState in the 
feature, can we add some notice in the document 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/)?
 If we don't have this intention, it's necessary to optimize the recovery time 
of union State?

> Speed ​​up the restore of UnionListState
> 
>
> Key: FLINK-21436
> URL: https://issues.apache.org/jira/browse/FLINK-21436
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Rui Fan
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: JM 启动火焰图.svg, akka timeout Exception.png
>
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large 
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to 
> read 2000 files during restore.
>  2000*2000 = 4 million, so 4 million small files need to be read to hdfs 
> during restore. HDFS has become a bottleneck, causing restore to be 
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small. 
> Typical usage scenario: Kafka offset information.
>  When restoring, JM can directly read all 2000 small files, merge 
> UnionListState into a byte array and send it to all TMs to avoid frequent 
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
>  After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1.  4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
>  Add configuration and decide whether to enable this feature. The default is 
> false, which means the old plan is used. When the user is set to true, JM 
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to enabling 
> merge by default.
> JM detects the size of the state before merge, and if it is less than the 
> threshold, the state is considered to be relatively small, and the state is 
> sent to all TMs through ByteStreamStateHandle.
> If the threshold is exceeded, the state is considered to be greater. At this 
> time, write an hdfs file, and send FileStateHandle to all TMs, and TM can 
> read this file.
>  
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset 
> (small state). In theory, most jobs are risk-free.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35804][table-planner] Fix incorrect calc merge to avoid wrong plans about udtf+join+udf [flink]

2024-07-11 Thread via GitHub


lincoln-lil commented on code in PR #25068:
URL: https://github.com/apache/flink/pull/25068#discussion_r167536


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala:
##
@@ -207,4 +210,19 @@ class CalcTest extends TableTestBase {
 val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE 
random_udf(b) > 10"
 util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr3(): Unit = {

Review Comment:
   We can use a minimal case to reproduce the error, e.g.,
   ```scala
 @Test
 def testCalcMergeWithCorrelate(): Unit = {
   util.addTemporarySystemFunction("str_split", new StringSplit())
   val sqlQuery =
 """
   |
   |SELECT a, r FROM (
   | SELECT a, random_udf(b) r FROM (
   |  select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
   | ) t
   |)
   |WHERE r > 10
   |""".stripMargin
   util.verifyRelPlan(sqlQuery)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime] Avoid duplicating broadcast records redundantly for hybrid shuffle [flink]

2024-07-11 Thread via GitHub


reswqa merged PR #25025:
URL: https://github.com/apache/flink/pull/25025


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35624) Release Testing: Verify FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-07-11 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865319#comment-17865319
 ] 

Zakelly Lan commented on FLINK-35624:
-

Hi [~fanrui], would you please re-run your testing? I think we have fix all the 
remaining issues. And thanks [~Yanfei Lei] for your manual test!

> Release Testing: Verify FLIP-306 Unified File Merging Mechanism for 
> Checkpoints
> ---
>
> Key: FLINK-35624
> URL: https://issues.apache.org/jira/browse/FLINK-35624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: image-2024-07-07-14-04-47-065.png, 
> image-2024-07-08-17-05-40-546.png
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-32070
>  
> 1.20 is the MVP version for FLIP-306. It is a little bit complex and should 
> be tested carefully. The main idea of FLIP-306 is to merge checkpoint files 
> in TM side, and provide new {{{}StateHandle{}}}s to the JM. There will be a 
> TM-managed directory under the 'shared' checkpoint directory for each 
> subtask, and a TM-managed directory under the 'taskowned' checkpoint 
> directory for each Task Manager. Under those new introduced directories, the 
> checkpoint files will be merged into smaller file set. The following 
> scenarios need to be tested, including but not limited to:
>  # With the file merging enabled, periodic checkpoints perform properly, and 
> the failover, restore and rescale would also work well.
>  # Switch the file merging on and off across jobs, checkpoints and recovery 
> also work properly.
>  # There will be no left-over TM-managed directory, especially when there is 
> no cp complete before the job cancellation.
>  # File merging takes no effect in (native) savepoints.
> Besides the behaviors above, it is better to validate the function of space 
> amplification control and metrics. All the config options can be found under 
> 'execution.checkpointing.file-merging'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35810) AsyncWaitOperatorTest.testProcessingTimeRepeatedCompleteUnorderedWithRetry fails

2024-07-11 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865318#comment-17865318
 ] 

Weijie Guo commented on FLINK-35810:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60871&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10509

> AsyncWaitOperatorTest.testProcessingTimeRepeatedCompleteUnorderedWithRetry 
> fails
> 
>
> Key: FLINK-35810
> URL: https://issues.apache.org/jira/browse/FLINK-35810
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.0.0
>Reporter: Rui Fan
>Priority: Major
>
> AsyncWaitOperatorTest.testProcessingTimeRepeatedCompleteUnorderedWithRetry 
> fails
>  
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60837&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10159



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34513) GroupAggregateRestoreTest.testRestore fails

2024-07-11 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865317#comment-17865317
 ] 

Weijie Guo commented on FLINK-34513:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60838&view=logs&j=de826397-1924-5900-0034-51895f69d4b7&t=f311e913-93a2-5a37-acab-4a63e1328f94&l=11746

> GroupAggregateRestoreTest.testRestore fails
> ---
>
> Key: FLINK-34513
> URL: https://issues.apache.org/jira/browse/FLINK-34513
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Matthias Pohl
>Assignee: Bonnie Varghese
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57828&view=logs&j=26b84117-e436-5720-913e-3e280ce55cae&t=77cc7e77-39a0-5007-6d65-4137ac13a471&l=10881
> {code}
> Feb 24 01:12:01 01:12:01.384 [ERROR] Tests run: 10, Failures: 1, Errors: 0, 
> Skipped: 1, Time elapsed: 2.957 s <<< FAILURE! -- in 
> org.apache.flink.table.planner.plan.nodes.exec.stream.GroupAggregateRestoreTest
> Feb 24 01:12:01 01:12:01.384 [ERROR] 
> org.apache.flink.table.planner.plan.nodes.exec.stream.GroupAggregateRestoreTest.testRestore(TableTestProgram,
>  ExecNodeMetadata)[4] -- Time elapsed: 0.653 s <<< FAILURE!
> Feb 24 01:12:01 java.lang.AssertionError: 
> Feb 24 01:12:01 
> Feb 24 01:12:01 Expecting actual:
> Feb 24 01:12:01   ["+I[3, 1, 2, 8, 31, 10.0, 3]",
> Feb 24 01:12:01 "+I[2, 1, 4, 14, 42, 7.0, 6]",
> Feb 24 01:12:01 "+I[1, 1, 4, 12, 24, 6.0, 4]",
> Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 8.0, 7]",
> Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 6.0, 5]",
> Feb 24 01:12:01 "+I[7, 0, 1, 7, 7, 7.0, 1]",
> Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 7.0, 7]",
> Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 5.0, 5]",
> Feb 24 01:12:01 "+U[3, 1, 2, 8, 31, 9.0, 3]",
> Feb 24 01:12:01 "+U[7, 0, 1, 7, 7, 7.0, 2]"]
> Feb 24 01:12:01 to contain exactly in any order:
> Feb 24 01:12:01   ["+I[3, 1, 2, 8, 31, 10.0, 3]",
> Feb 24 01:12:01 "+I[2, 1, 4, 14, 42, 7.0, 6]",
> Feb 24 01:12:01 "+I[1, 1, 4, 12, 24, 6.0, 4]",
> Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 8.0, 7]",
> Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 6.0, 5]",
> Feb 24 01:12:01 "+U[3, 1, 2, 8, 31, 9.0, 3]",
> Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 7.0, 7]",
> Feb 24 01:12:01 "+I[7, 0, 1, 7, 7, 7.0, 2]",
> Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 5.0, 5]"]
> Feb 24 01:12:01 elements not found:
> Feb 24 01:12:01   ["+I[7, 0, 1, 7, 7, 7.0, 2]"]
> Feb 24 01:12:01 and elements not expected:
> Feb 24 01:12:01   ["+I[7, 0, 1, 7, 7, 7.0, 1]", "+U[7, 0, 1, 7, 7, 7.0, 2]"]
> Feb 24 01:12:01 
> Feb 24 01:12:01   at 
> org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase.testRestore(RestoreTestBase.java:313)
> Feb 24 01:12:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35825][hive] HiveTableSource supports report statistics for text file [flink]

2024-07-11 Thread via GitHub


flinkbot commented on PR #25078:
URL: https://github.com/apache/flink/pull/25078#issuecomment-2224721039

   
   ## CI report:
   
   * 922576ae0ae4cd3cf36ceeb0f53f81fa82f0fa0d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35825) HiveTableSource supports report statistics for text file

2024-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35825:
---
Labels: pull-request-available  (was: )

> HiveTableSource supports report statistics for text file
> 
>
> Key: FLINK-35825
> URL: https://issues.apache.org/jira/browse/FLINK-35825
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35825][hive] HiveTableSource supports report statistics for text file [flink]

2024-07-11 Thread via GitHub


reswqa opened a new pull request, #25078:
URL: https://github.com/apache/flink/pull/25078

   ## What is the purpose of the change
   
   *HiveTableSource supports report statistics for text file*
   
   
   ## Brief change log
   
 - *Introduce a `TextFormatStatisticsReportUtil` to estimate statistics for 
text fie.*
   
   
   ## Verifying this change
   
   covered by ut.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-11 Thread via GitHub


yuxiqian commented on code in PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1675329046


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java:
##
@@ -416,6 +416,28 @@ public void testTableWithoutPrimaryKey() {
 }
 }
 
+@Test
+public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() {
+String tableWithoutPrimaryKey = "customers_no_pk";
+List expected =
+Arrays.asList(
+"customers_no_pk null [462]",
+"customers_no_pk [462] [823]",
+"customers_no_pk [823] [1184]",
+"customers_no_pk [1184] [1545]",
+"customers_no_pk [1545] [1906]",
+"customers_no_pk [1906] null");
+List splits =
+getTestAssignSnapshotSplits(
+customerDatabase,
+4,
+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+new String[] {tableWithoutPrimaryKey},
+"id");
+assertEquals(expected, splits);
+}

Review Comment:
   It would be nice if we can also test using non-primary key columns as chunk 
keys for table with primary keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35825) HiveTableSource supports report statistics for text file

2024-07-11 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35825:
--

 Summary: HiveTableSource supports report statistics for text file
 Key: FLINK-35825
 URL: https://issues.apache.org/jira/browse/FLINK-35825
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-11 Thread via GitHub


SML0127 commented on PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2224625883

   > @leonardBang PTAL
   
   @leonardBang @yuxiqian PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35154] Javadoc generating fix [flink]

2024-07-11 Thread via GitHub


ldadima commented on PR #24684:
URL: https://github.com/apache/flink/pull/24684#issuecomment-2224569550

   Thanks for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] pipeline add param scan.newly-added-table.enabled [flink-cdc]

2024-07-11 Thread via GitHub


yuxiqian commented on PR #3470:
URL: https://github.com/apache/flink-cdc/pull/3470#issuecomment-2224424493

   Thanks for @yangup's contribution, but seems it's a duplicate of #3411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675080346


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that writes String
+ * values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 10
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 5000
+ *   {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code maxTimeInBufferMs} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code failOnError} will be false
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 10;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+private Boolean failOnError;
+private String sqsUrl;
+private Properties sqsClientProperties;
+private SerializationSchema serializationSchema;
+
+SqsSinkBuilder() {}
+
+/**
+ * Sets the url of the SQS that the sink will connect to. There is no 
default for this
+ * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+ * fail.
+ *
+ * @param sqsUrl the url of the Sqs
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSqsUrl(String sqsUrl) {
+this.sqsUrl = sqsUrl;
+return this;
+}
+
+/**
+ * Allows the user to specify a serialization schema to serialize each 
record to persist to SQS.
+ *
+ * @param schema serialization schema to use
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSerializationSchema(final 
SerializationSchema schema) {
+serializationSchema = schema;
+return this;
+}

Review Comment:
   I tried to update the same, please let me know if that looks good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   Test case passed without this, hence removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675069242


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,105 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.4-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests

Review Comment:
   Above command didn't work for me but I tried to run the test case by putting 
the test case file in same sqs-sink/test case folder and test passed 
successfully there. Please let me know if that still fails at your end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35824) Polish JDBC documentation : fix link and add back to top button

2024-07-11 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35824:
---

 Summary: Polish JDBC documentation : fix link and add back to top 
button
 Key: FLINK-35824
 URL: https://issues.apache.org/jira/browse/FLINK-35824
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: Zhongqiang Gong
Assignee: Zhongqiang Gong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.

2024-07-11 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35823:

Fix Version/s: (was: 2.0.0)

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> expansion due to server-side bottlenecks or data skew.
> 
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35804][table-planner] Fix incorrect calc merge to avoid wrong plans about udtf+join+udf [flink]

2024-07-11 Thread via GitHub


zhaorongsheng commented on PR #25068:
URL: https://github.com/apache/flink/pull/25068#issuecomment-2224329818

   @flinkbot Could someone review this pr? Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35240) Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record

2024-07-11 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong resolved FLINK-35240.
-
Fix Version/s: 2.0.0
   Resolution: Fixed

Merged into master via: 4154b8d08c1a2b901b058b020d546de59ba6989d

> Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record
> -
>
> Key: FLINK-35240
> URL: https://issues.apache.org/jira/browse/FLINK-35240
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
> Attachments: image-2024-04-26-00-23-29-975.png, 
> image-2024-04-26-17-16-07-925.png, image-2024-04-26-17-16-20-647.png, 
> image-2024-04-26-17-16-30-293.png, screenshot-1.png
>
>
> *Reproduce:*
> * According to user email: 
> https://lists.apache.org/thread/9j5z8hv4vjkd54dkzqy1ryyvm0l5rxhc
> *  !image-2024-04-26-00-23-29-975.png! 
> *Analysis:*
> * `org.apache.flink.formats.csv.CsvBulkWriter#addElement` will flush per 
> record.
> *Solution:*
> * I think maybe we can disable `FLUSH_AFTER_WRITE_VALUE` to avoid flush when 
> a record added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.

2024-07-11 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-35823.
---
Resolution: Fixed

It's duplicated with https://issues.apache.org/jira/browse/FLINK-35814, so 
close this.

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> expansion due to server-side bottlenecks or data skew.
> 
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865280#comment-17865280
 ] 

yuanfenghu commented on FLINK-35823:


I have discussed this issue with [~fanrui] . I wonder if other people in the 
community have any suggestions on this?
 

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> expansion due to server-side bottlenecks or data skew.
> 
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35823:
---
Summary: Introduce parameters to control the upper limit of rescale to 
avoid unlimited expansion due to server-side bottlenecks or data skew.  (was: 
Introduce parameters to control the upper limit of rescale to avoid unlimited 
扩容 due to server-side bottlenecks or data skew.)

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> expansion due to server-side bottlenecks or data skew.
> 
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited 扩容 due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35823:
---
Summary: Introduce parameters to control the upper limit of rescale to 
avoid unlimited 扩容 due to server-side bottlenecks or data skew.  (was: 
Introduce parameters to control the upper limit of rescale to avoid unlimited 
shrinkage due to server-side bottlenecks or data skew.)

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> 扩容 due to server-side bottlenecks or data skew.
> -
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited shrinkage due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35823:
--

 Summary: Introduce parameters to control the upper limit of 
rescale to avoid unlimited shrinkage due to server-side bottlenecks or data 
skew.
 Key: FLINK-35823
 URL: https://issues.apache.org/jira/browse/FLINK-35823
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: yuanfenghu
 Fix For: 2.0.0


1. If a Flink application writes data to other external storage systems, such 
as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
entire task, such as the throughput of HDFS decreases, the writing IO time will 
increase, and the corresponding Flink The metric busy will also increase. At 
this time, the autoscaler will determine that the parallelism needs to be 
increased to increase the write rate. However, in the above case, due to the 
bottleneck of the external server, this will not work. This will cause the next 
determination cycle to continue to increase the parallelism until parallelism = 
max-parallelism.

2. If some tasks have data skew, it will also cause the same problem.

 
Therefore, we should introduce a new parameter judgment. If the degree of 
parallelism continues to increase, the throughput will basically remain the 
same. There is no need to expand  anymore.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35240) Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record

2024-07-11 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong reassigned FLINK-35240:
---

Assignee: Zhongqiang Gong

> Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record
> -
>
> Key: FLINK-35240
> URL: https://issues.apache.org/jira/browse/FLINK-35240
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2024-04-26-00-23-29-975.png, 
> image-2024-04-26-17-16-07-925.png, image-2024-04-26-17-16-20-647.png, 
> image-2024-04-26-17-16-30-293.png, screenshot-1.png
>
>
> *Reproduce:*
> * According to user email: 
> https://lists.apache.org/thread/9j5z8hv4vjkd54dkzqy1ryyvm0l5rxhc
> *  !image-2024-04-26-00-23-29-975.png! 
> *Analysis:*
> * `org.apache.flink.formats.csv.CsvBulkWriter#addElement` will flush per 
> record.
> *Solution:*
> * I think maybe we can disable `FLUSH_AFTER_WRITE_VALUE` to avoid flush when 
> a record added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core [flink]

2024-07-11 Thread via GitHub


GOODBOY008 commented on PR #24939:
URL: https://github.com/apache/flink/pull/24939#issuecomment-2224309638

   > #24945 reverts the InitOutputPathTest to junit4.
   > 
   > We can research why InitOutputPathTest fails after migrating juint5, and 
fix it later in this PR as well.
   
   @1996fanrui The class failed because of the jdk verison, and I fixed in this 
pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35363] FLIP-449: Reorganization of flink-connector-jdbc [flink-connector-jdbc]

2024-07-11 Thread via GitHub


RocMarshal commented on code in PR #123:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/123#discussion_r1675007647


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java:
##
@@ -18,252 +18,17 @@
 
 package org.apache.flink.connector.jdbc.converter;
 
-import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
-import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import 
org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** Base class for all converters that convert between JDBC object and Flink 
internal object. */
-public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
-
-protected final RowType rowType;
-protected final JdbcDeserializationConverter[] toInternalConverters;
-protected final JdbcSerializationConverter[] toExternalConverters;
-protected final LogicalType[] fieldTypes;
-
-public abstract String converterName();
 
+/**
+ * Base class for all converters that convert between JDBC object and Flink 
internal object.
+ *
+ * @deprecated use AbstractDialectConverter

Review Comment:
   ```suggestion
* @deprecated Use {@link AbstractDialectConverter}.
   ```



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##
@@ -18,551 +18,22 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.Factory;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.TemporaryClassLoaderContext;
-
-import org.apache.commons.compress.utils.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Co

[jira] [Commented] (FLINK-21436) Speed ​​up the restore of UnionListState

2024-07-11 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865277#comment-17865277
 ] 

Rui Fan commented on FLINK-21436:
-

> When does Union State plan to deprecated?

I didn't notice any discusstion about this topic after 
[https://lists.apache.org/thread/sxbmyoko01h568qtb1wk3ot2s2rb72nz]

> Since some source and sink operator still rely on UnionState, so it may still 
> makes sense to speed up UnionState restore time currently  ? 

Do you mean Legacy source and sink or new source and sink? I saw most of new 
Sources and new Sinks doesn't use Union List State.

Would you mind listing some new sources and sinks that using Union List State?

 

> Speed ​​up the restore of UnionListState
> 
>
> Key: FLINK-21436
> URL: https://issues.apache.org/jira/browse/FLINK-21436
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Rui Fan
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: JM 启动火焰图.svg, akka timeout Exception.png
>
>
> h1. 1. Problem introduction and cause analysis
> Problem description: The duration of UnionListState restore under large 
> concurrency is more than 2 minutes.
> h2. the reason:
> 2000 subtasks write 2000 files during checkpoint, and each subtask needs to 
> read 2000 files during restore.
>  2000*2000 = 4 million, so 4 million small files need to be read to hdfs 
> during restore. HDFS has become a bottleneck, causing restore to be 
> particularly time-consuming.
> h1. 2. Optimize ideas
> Under normal circumstances, the UnionListState state is relatively small. 
> Typical usage scenario: Kafka offset information.
>  When restoring, JM can directly read all 2000 small files, merge 
> UnionListState into a byte array and send it to all TMs to avoid frequent 
> access to hdfs by TMs.
> h1. 3. Benefits after optimization
> Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s.
>  After optimization: 2000 concurrent, Kafka offset restore takes less than 1s.
> h1.  4. Risk points
> Too big UnionListState leads to too much pressure on JM.
> Solution 1:
>  Add configuration and decide whether to enable this feature. The default is 
> false, which means the old plan is used. When the user is set to true, JM 
> will merge.
> Solution 2:
> The above configuration is not required, which is equivalent to enabling 
> merge by default.
> JM detects the size of the state before merge, and if it is less than the 
> threshold, the state is considered to be relatively small, and the state is 
> sent to all TMs through ByteStreamStateHandle.
> If the threshold is exceeded, the state is considered to be greater. At this 
> time, write an hdfs file, and send FileStateHandle to all TMs, and TM can 
> read this file.
>  
> Note: Most of the scenarios where Flink uses UnionListState are Kafka offset 
> (small state). In theory, most jobs are risk-free.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu resolved FLINK-35754.
---
Resolution: Fixed

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-07-11 Thread via GitHub


GOODBOY008 merged PR #24730:
URL: https://github.com/apache/flink/pull/24730


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865275#comment-17865275
 ] 

dalongliu commented on FLINK-35754:
---

Hi [~pnowojski], I think they are two different problems.

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864853#comment-17864853
 ] 

dalongliu edited comment on FLINK-35754 at 7/12/24 1:56 AM:


Merged in master: d04c70fae033bb04ed7a3bb69832be55b4425700
Merged in release-1.20: 8244941ed18c9c063449759927b58b629110d89e


was (Author: lsy):
Merged in master: d04c70fae033bb04ed7a3bb69832be55b4425700

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.20][FLINK-35754][e2e] Fix SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error [flink]

2024-07-11 Thread via GitHub


lsyldliu merged PR #25073:
URL: https://github.com/apache/flink/pull/25073


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-07-11 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin closed FLINK-35354.
---
Resolution: Fixed

master:302a69122538fdb76b98e73ebb3c83ee733a0c02

> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode in tikv, we 
> need to do network mapping, such as `spark.tispark.host_mapping` in 
> [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.
>  
> Add param:
>  tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31520] [table-planner] Move SqlXXXModel conversion logic to SqlNodeConverter [flink]

2024-07-11 Thread via GitHub


nateab commented on PR #22218:
URL: https://github.com/apache/flink/pull/22218#issuecomment-2224189971

   Hi @xuzhiwen1255 there is a typo in the title of the PR, it should be 
`module` not `model`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   I will validate that once i will be able to run this test locally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   This need to be validated once i will able to run this test locally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832262


##
flink-connector-aws/flink-connector-sqs/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension:
##
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension

Review Comment:
   This got copied from other sink package, doesn't look like it is required, 
deleting it now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


snuyanzin commented on PR #24967:
URL: https://github.com/apache/flink/pull/24967#issuecomment-2224133227

   One more diff between MySQL and Flink behavior, however this I'm not sure 
need to fix since at least from my point of view MySQL's behavior is 
questionable
   MySQL
   ```sql
   SELECT  
JSON_UNQUOTE('"\\u0022\\u005c\\u005c\\u0075\\u0030\\u0030\\u0061\\u0061\\u0022"');
   SELECT  
JSON_UNQUOTE(JSON_UNQUOTE('"\\u0022\\u005c\\u005c\\u0075\\u0030\\u0030\\u0061\\u0061\\u0022"'));
   ```
   returns
   ```
   "\\u00aa"
   \u00aa  
   ```
   Flink
   ```sql
   SELECT  
JSON_UNQUOTE('"\u0022\u005c\u005c\u0075\u0030\u0030\u0061\u0061\u0022"');
   SELECT  
JSON_UNQUOTE(JSON_UNQUOTE('"\u0022\u005c\u005c\u0075\u0030\u0030\u0061\u0061\u0022"'));
   ```
   returns 
   ```
   "\\u00aa"
   ª 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


snuyanzin commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674812992


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+public @Nullable Object eval(Object input) {
+if (input == null) {
+return null;
+}
+BinaryStringData bs = (BinaryStringData) input;
+String inputStr = bs.toString();
+try {
+if (isValidJsonVal(inputStr)) {
+return new BinaryStringData(unescapeValidJson(inputStr));
+}
+} catch (FlinkRuntimeException | IllegalArgumentException t) {

Review Comment:
   Ooops, seems I was wrong about `FlinkRuntimeException`
   if we want to rethrow it further then it would make sense
   
   however of we use it only within this class then `IllegalArgumentException` 
is enough
   sorry about that
   I should have checked the whole cycle of it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674703677


##
flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties:
##
@@ -0,0 +1,25 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+rootLogger.level = OFF

Review Comment:
   Why do we want to do that? I can see other sinks also have their independent 
log4j configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder

2024-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35808:
---
Labels: pull-request-available  (was: )

> Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder
> ---
>
> Key: FLINK-35808
> URL: https://issues.apache.org/jira/browse/FLINK-35808
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.2.0
>Reporter: Kevin Lam
>Assignee: Kevin Lam
>Priority: Minor
>  Labels: pull-request-available
>
> This issue is a follow-up to [this mailing list 
> discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 
> I'd like to propose letting the 
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder, as shown in this DRAFT PR:
>  
> [https://github.com/apache/flink-connector-kafka/pull/108]
>  
> From the PR description: 
> {quote}This allows users to easily implement the [{{claim check}} large 
> message 
> pattern|https://developer.confluent.io/patterns/event-processing/claim-check/]
>  without bringing any concerns into the Flink codebase otherwise, by 
> specifying a {{value.deserializer}} that handles it, but otherwise passes 
> through the bytes.
> Note: overriding value.serializer is already supported on the Producer side: 
> |[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]|
>  
> Other Reading:
> [https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/]
> [https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]
> {quote}
>  
> What do folks think? If it seems reasonable I can follow the steps in the 
> [contribution guide|https://flink.apache.org/how-to-contribute/overview/].  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35808] Let `ConsumerConfig.(KEY|VALUE)_DESERIALIZER_CLASS_CONFIG` be overridable by user in `KafkaSourceBuilder` [flink-connector-kafka]

2024-07-11 Thread via GitHub


klam-shop commented on PR #108:
URL: 
https://github.com/apache/flink-connector-kafka/pull/108#issuecomment-2223875792

   Thanks for your review @fapaul ! I did another pass on trying to incorporate 
your comments, and also updated some of the documentation that mentions the 
deserializer configuration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][table] Apply uppercase for URL_ENCODE/URL_DECODE remove logging from function class [flink]

2024-07-11 Thread via GitHub


snuyanzin merged PR #25076:
URL: https://github.com/apache/flink/pull/25076


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35819) Update to AWS SDKv2 2.26.19

2024-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35819:
---
Labels: pull-request-available  (was: )

> Update to AWS SDKv2 2.26.19
> ---
>
> Key: FLINK-35819
> URL: https://issues.apache.org/jira/browse/FLINK-35819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.4.0
>Reporter: Burak Ozakinci
>Assignee: Burak Ozakinci
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
>
> As part of the work in FLINK-31922, AWS SDK needs to be updated to use 
> RetryStrategy constructs instead of deprecated RetryPolicy classes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35819][Connectors/AWS] Update to AWS SDKv2 2.26.19 [flink-connector-aws]

2024-07-11 Thread via GitHub


karubian opened a new pull request, #149:
URL: https://github.com/apache/flink-connector-aws/pull/149

   ## Purpose of the change
   
   Update to AWS SDKv2 2.26.19
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Significant changes
   - [X] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35819) Update to AWS SDKv2 2.26.19

2024-07-11 Thread Burak Ozakinci (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Ozakinci updated FLINK-35819:
---
Summary: Update to AWS SDKv2 2.26.19  (was: Update to AWS SDK 2.26.19)

> Update to AWS SDKv2 2.26.19
> ---
>
> Key: FLINK-35819
> URL: https://issues.apache.org/jira/browse/FLINK-35819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.4.0
>Reporter: Burak Ozakinci
>Assignee: Burak Ozakinci
>Priority: Blocker
> Fix For: aws-connector-4.4.0
>
>
> As part of the work in FLINK-31922, AWS SDK needs to be updated to use 
> RetryStrategy constructs instead of deprecated RetryPolicy classes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35819) Update to AWS SDK 2.26.19

2024-07-11 Thread Burak Ozakinci (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Ozakinci updated FLINK-35819:
---
Summary: Update to AWS SDK 2.26.19  (was: Upgrade to AWS SDK 2.26.19)

> Update to AWS SDK 2.26.19
> -
>
> Key: FLINK-35819
> URL: https://issues.apache.org/jira/browse/FLINK-35819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.4.0
>Reporter: Burak Ozakinci
>Assignee: Burak Ozakinci
>Priority: Blocker
> Fix For: aws-connector-4.4.0
>
>
> As part of the work in FLINK-31922, AWS SDK needs to be updated to use 
> RetryStrategy constructs instead of deprecated RetryPolicy classes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35822) FLIP-465: Introduce DESCRIBE FUNCTION

2024-07-11 Thread Natea Eshetu Beshada (Jira)
Natea Eshetu Beshada created FLINK-35822:


 Summary: FLIP-465: Introduce DESCRIBE FUNCTION
 Key: FLINK-35822
 URL: https://issues.apache.org/jira/browse/FLINK-35822
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
Reporter: Natea Eshetu Beshada
Assignee: Natea Eshetu Beshada


https://cwiki.apache.org/confluence/display/FLINK/FLIP-465%3A+Introduce+DESCRIBE+FUNCTION



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25920) Allow receiving updates of CommittableSummary

2024-07-11 Thread Alexis Sarda-Espinosa (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865154#comment-17865154
 ] 

Alexis Sarda-Espinosa edited comment on FLINK-25920 at 7/11/24 4:53 PM:


[^trace_again.zip]

This time it happened very close to the start of the job after a restart with 
savepoint, I see the first checkpoint succeeded and the second one failed.

Thinking aloud, could checkpoint mode EXACTLY_ONCE mitigate this?


was (Author: asardaes):
[^trace_again.zip]

This time it happened very close to the start of the job after a restart with 
savepoint, I see the first checkpoint succeeded and the second one failed.

> Allow receiving updates of CommittableSummary
> -
>
> Key: FLINK-25920
> URL: https://issues.apache.org/jira/browse/FLINK-25920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Connectors / Common
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Fabian Paul
>Assignee: Arvid Heise
>Priority: Major
> Attachments: trace.zip, trace_again.zip
>
>
> In the case of unaligned checkpoints, it might happen that the checkpoint 
> barrier overtakes the records and an empty committable summary is emitted 
> that needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865155#comment-17865155
 ] 

Zakelly Lan commented on FLINK-35821:
-

[~pnowojski] I think this is the same problem as 
https://issues.apache.org/jira/browse/FLINK-35803 . Would you please rebase and 
try again?

> ResumeCheckpointManuallyITCase failed with File X does not exist or the user 
> running Flink C has insufficient permissions to access it
> --
>
> Key: FLINK-35821
> URL: https://issues.apache.org/jira/browse/FLINK-35821
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba
> primary failure:
> {noformat}
> Caused by: java.io.FileNotFoundException: File 
> file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863
>  does not exist or the user running Flink ('agent01_azpcontainer') has 
> insufficient permissions to access it.
> {noformat}
> Full stack trace
> {noformat}
> 2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
> 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
> in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
> 2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
> org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
>  = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
> 2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2024-07-11T13:49:46.4142766Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> 2024-07-11T13:49:46.4144185Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> 2024-07-11T13:49:46.4145249Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> 2024-07-11T13:49:46.4146510Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
> 2024-07-11T13:49:46.4147599Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
> 2024-07-11T13:49:46.4148975Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
> 2024-07-11T13:49:46.4150467Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
> 2024-07-11T13:49:46.4151977Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
> 2024-07-11T13:49:46.4153308Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> 2024-07-11T13:49:46.4154713Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
> 2024-07-11T13:49:46.4155416Z Jul 11 13:49:46  at 
> sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
> 2024-07-11T13:49:46.4156342Z Jul 11 13:49:46  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2024-07-11T13:49:46.4157291Z Jul 11 13:49:46  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2024-07-11T13:49:46.4158065Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
> 2024-07-11T13:49:46.4159387Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> 2024-07-11T13:49:46.4160469Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
> 2024-07-11T13:49:46.4161819Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
> 2024-07-11T13:49:46.4163253Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
> 2024-07-11T13:49:46.41

[jira] [Commented] (FLINK-25920) Allow receiving updates of CommittableSummary

2024-07-11 Thread Alexis Sarda-Espinosa (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865154#comment-17865154
 ] 

Alexis Sarda-Espinosa commented on FLINK-25920:
---

[^trace_again.zip]

This time it happened very close to the start of the job after a restart with 
savepoint, I see the first checkpoint succeeded and the second one failed.

> Allow receiving updates of CommittableSummary
> -
>
> Key: FLINK-25920
> URL: https://issues.apache.org/jira/browse/FLINK-25920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Connectors / Common
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Fabian Paul
>Assignee: Arvid Heise
>Priority: Major
> Attachments: trace.zip, trace_again.zip
>
>
> In the case of unaligned checkpoints, it might happen that the checkpoint 
> barrier overtakes the records and an empty committable summary is emitted 
> that needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-25920) Allow receiving updates of CommittableSummary

2024-07-11 Thread Alexis Sarda-Espinosa (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Sarda-Espinosa updated FLINK-25920:
--
Attachment: trace_again.zip

> Allow receiving updates of CommittableSummary
> -
>
> Key: FLINK-25920
> URL: https://issues.apache.org/jira/browse/FLINK-25920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Connectors / Common
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Fabian Paul
>Assignee: Arvid Heise
>Priority: Major
> Attachments: trace.zip, trace_again.zip
>
>
> In the case of unaligned checkpoints, it might happen that the checkpoint 
> barrier overtakes the records and an empty committable summary is emitted 
> that needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to Flink 1.19 [flink-connector-rabbitmq]

2024-07-11 Thread via GitHub


vahmed-hamdy commented on PR #29:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/29#issuecomment-2223412688

   @paulh86 Blocked on review from the community, will reach out on dev slack 
channel for a reviewer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35819) Upgrade to AWS SDK 2.26.19

2024-07-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh reassigned FLINK-35819:
---

Assignee: Burak Ozakinci

> Upgrade to AWS SDK 2.26.19
> --
>
> Key: FLINK-35819
> URL: https://issues.apache.org/jira/browse/FLINK-35819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.4.0
>Reporter: Burak Ozakinci
>Assignee: Burak Ozakinci
>Priority: Blocker
> Fix For: aws-connector-4.4.0
>
>
> As part of the work in FLINK-31922, AWS SDK needs to be updated to use 
> RetryStrategy constructs instead of deprecated RetryPolicy classes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32324) Implement watermark alignment on KDS and DDB source

2024-07-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-32324:

Summary: Implement watermark alignment on KDS and DDB source  (was: 
Implement watermark alignment on KDS source)

> Implement watermark alignment on KDS and DDB source
> ---
>
> Key: FLINK-32324
> URL: https://issues.apache.org/jira/browse/FLINK-32324
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Hong Liang Teoh
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: pull-request-available
>
> Implement watermark alignment interfaces suggested by this FLIP in the KDS 
> Source. 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32324) Implement watermark alignment on KDS and DDB source

2024-07-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh resolved FLINK-32324.
-
Resolution: Fixed

> Implement watermark alignment on KDS and DDB source
> ---
>
> Key: FLINK-32324
> URL: https://issues.apache.org/jira/browse/FLINK-32324
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Hong Liang Teoh
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: pull-request-available
>
> Implement watermark alignment interfaces suggested by this FLIP in the KDS 
> and DDB Source. 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32324) Implement watermark alignment on KDS and DDB source

2024-07-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-32324:

Description: Implement watermark alignment interfaces suggested by this 
FLIP in the KDS and DDB Source. 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources]
  (was: Implement watermark alignment interfaces suggested by this FLIP in the 
KDS Source. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources)

> Implement watermark alignment on KDS and DDB source
> ---
>
> Key: FLINK-32324
> URL: https://issues.apache.org/jira/browse/FLINK-32324
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Hong Liang Teoh
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: pull-request-available
>
> Implement watermark alignment interfaces suggested by this FLIP in the KDS 
> and DDB Source. 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]

2024-07-11 Thread via GitHub


jnh5y commented on PR #25064:
URL: https://github.com/apache/flink/pull/25064#issuecomment-2223289135

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32324) Implement watermark alignment on KDS source

2024-07-11 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865140#comment-17865140
 ] 

Hong Liang Teoh commented on FLINK-32324:
-

 merged commit 
[{{f81473e}}|https://github.com/apache/flink-connector-aws/commit/f81473e20d7df91b668faf2ca4d2d00576007862]
 into   apache:main

> Implement watermark alignment on KDS source
> ---
>
> Key: FLINK-32324
> URL: https://issues.apache.org/jira/browse/FLINK-32324
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Hong Liang Teoh
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: pull-request-available
>
> Implement watermark alignment interfaces suggested by this FLIP in the KDS 
> Source. 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 merged PR #148:
URL: https://github.com/apache/flink-connector-aws/pull/148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


anupamaggarwal commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674222067


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+public @Nullable Object eval(Object input) {
+if (input == null) {
+return null;
+}
+BinaryStringData bs = (BinaryStringData) input;
+String inputStr = bs.toString();
+try {
+if (isValidJsonVal(inputStr)) {
+return new BinaryStringData(unescapeValidJson(inputStr));
+}
+} catch (Throwable t) {
+// ignore
+}

Review Comment:
   thanks @fhueske, @snuyanzin  makes sense, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674213420


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,105 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.4-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests

Review Comment:
   This test doesn't seem to work. Please make sure we have all the 
dependencies.
   
   
   ```
   Caused by: java.lang.NoClassDefFoundError: 
software/amazon/awssdk/services/s3/S3Client
   at 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client(AWSServicesTestUtils.java:65)
   at 
org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.list(LocalstackContainer.java:79)
   at 
org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51)
   at 
org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.lambda$waitUntilReady$0(LocalstackContainer.java:72)
   at 
org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.ClassNotFoundException: 
software.amazon.awssdk.services.s3.S3Client
   ```
   
   I ran using the below command
   
   ```
   mvn clean verify -Prun-end-to-end-tests 
-DdistDir=/Users/liangtl/workplace/flink_os/flink-1.19.0/lib/flink-dist-1.19.0.jar
 -pl flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests -am
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35819) Upgrade to AWS SDK 2.26.19

2024-07-11 Thread Burak Ozakinci (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865126#comment-17865126
 ] 

Burak Ozakinci edited comment on FLINK-35819 at 7/11/24 3:20 PM:
-

[~Hong Teoh]  Please assign this to me, thank you


was (Author: JIRAUSER303336):
[~Hong Teoh]  Please assign this to me, thank you
[|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13533827]

> Upgrade to AWS SDK 2.26.19
> --
>
> Key: FLINK-35819
> URL: https://issues.apache.org/jira/browse/FLINK-35819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.4.0
>Reporter: Burak Ozakinci
>Priority: Blocker
> Fix For: aws-connector-4.4.0
>
>
> As part of the work in FLINK-31922, AWS SDK needs to be updated to use 
> RetryStrategy constructs instead of deprecated RetryPolicy classes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]

2024-07-11 Thread via GitHub


jnh5y commented on code in PR #25064:
URL: https://github.com/apache/flink/pull/25064#discussion_r1674211997


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java:
##
@@ -19,16 +19,16 @@
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Restore tests for {@link BatchExecCalc}. */
-public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase {
+public class CalcCompiledBatchTest extends CompiledBatchTestBase {

Review Comment:
   Ok, I think I got the names updated as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35819) Upgrade to AWS SDK 2.26.19

2024-07-11 Thread Burak Ozakinci (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865126#comment-17865126
 ] 

Burak Ozakinci commented on FLINK-35819:


[~Hong Teoh]  Please assign this to me, thank you
[|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13533827]

> Upgrade to AWS SDK 2.26.19
> --
>
> Key: FLINK-35819
> URL: https://issues.apache.org/jira/browse/FLINK-35819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.4.0
>Reporter: Burak Ozakinci
>Priority: Blocker
> Fix For: aws-connector-4.4.0
>
>
> As part of the work in FLINK-31922, AWS SDK needs to be updated to use 
> RetryStrategy constructs instead of deprecated RetryPolicy classes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674205703


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,105 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.4-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+
+
+com.google.guava
+guava
+test
+
+
+
+com.fasterxml.jackson.core
+jackson-databind
+test
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jsr310
+test
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-dependency-plugin
+
+
+copy
+pre-integration-test
+
+copy
+
+
+
+
+
+

Review Comment:
   This should be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-11 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865121#comment-17865121
 ] 

Trystan commented on FLINK-35285:
-

Any suggestions on this? We are consistently finding a 50%-60% scale down max 
factor to be too aggressive, leading to flapping. But we can't set a safer 
20-30% factor because of this since it never scales at all.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]

2024-07-11 Thread via GitHub


z3d1k commented on code in PR #148:
URL: 
https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674196316


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java:
##
@@ -228,6 +231,97 @@ void testWakeUpIsNoOp() {
 assertThatNoException().isThrownBy(splitReader::wakeUp);
 }
 
+@Test
+void testPauseOrResumeSplits() throws Exception {
+testStreamProxy.addShards(TEST_SHARD_ID);
+KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID);
+
+List expectedRecords =
+Stream.of(getTestRecord("data-1"), getTestRecord("data-2"))
+.collect(Collectors.toList());
+testStreamProxy.addRecords(
+TestUtil.STREAM_ARN,
+TEST_SHARD_ID,
+Collections.singletonList(expectedRecords.get(0)));
+testStreamProxy.addRecords(
+TestUtil.STREAM_ARN,
+TEST_SHARD_ID,
+Collections.singletonList(expectedRecords.get(1)));
+splitReader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(testSplit)));
+
+// read data from split
+RecordsWithSplitIds records = splitReader.fetch();
+
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0));
+
+// pause split
+splitReader.pauseOrResumeSplits(
+Collections.singletonList(testSplit), Collections.emptyList());
+records = splitReader.fetch();
+// returns incomplete split with no records
+assertThat(records.finishedSplits()).isEmpty();
+assertThat(records.nextSplit()).isNull();
+assertThat(records.nextRecordFromSplit()).isNull();
+
+// resume split
+splitReader.pauseOrResumeSplits(
+Collections.emptyList(), Collections.singletonList(testSplit));
+records = splitReader.fetch();
+
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(1));
+}
+
+@Test
+void testPauseOrResumeSplitsOnlyPauseReadsFromSpecifiedSplits() throws 
Exception {
+KinesisShardSplit testSplit1 = getTestSplit(generateShardId(1));
+KinesisShardSplit testSplit2 = getTestSplit(generateShardId(2));
+
+shardMetricGroupMap.put(
+testSplit1.splitId(),
+new KinesisShardMetrics(testSplit1, 
metricListener.getMetricGroup()));
+shardMetricGroupMap.put(
+testSplit2.splitId(),
+new KinesisShardMetrics(testSplit2, 
metricListener.getMetricGroup()));
+
+testStreamProxy.addShards(testSplit1.splitId(), testSplit2.splitId());
+
+List recordsFromSplit1 =
+Arrays.asList(getTestRecord("split-1-data-1"), 
getTestRecord("split-1-data-2"));
+List recordsFromSplit2 =
+Arrays.asList(
+getTestRecord("split-2-data-1"),
+getTestRecord("split-2-data-2"),
+getTestRecord("split-2-data-3"));
+
+recordsFromSplit1.forEach(
+record ->
+testStreamProxy.addRecords(
+STREAM_ARN,
+testSplit1.getShardId(),
+Collections.singletonList(record)));
+recordsFromSplit2.forEach(
+record ->
+testStreamProxy.addRecords(
+STREAM_ARN,
+testSplit2.getShardId(),
+Collections.singletonList(record)));
+
+splitReader.handleSplitsChanges(
+new SplitsAddition<>(Arrays.asList(testSplit1, testSplit2)));
+
+// pause split 1
+splitReader.pauseOrResumeSplits(
+Collections.singletonList(testSplit1), 
Collections.emptyList());
+
+// read data from splits
+List fetchedRecords = new ArrayList<>();
+for (int i = 0; i < 10; i++) {
+RecordsWithSplitIds records = splitReader.fetch();
+fetchedRecords.addAll(readAllRecords(records));
+}
+
+// verify that only records from split 2 were fetched by reader
+
assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new 
Record[0]));

Review Comment:
   Updated test case to use 3 splits, pause 2 and then resume 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674182875


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   Do we need this for SQS? I'm aware we need it for KDS, but just checking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865115#comment-17865115
 ] 

Piotr Nowojski edited comment on FLINK-35754 at 7/11/24 3:08 PM:
-

[~hackergin], can you elaborate what was the problem here? Is this the same 
problem as: https://issues.apache.org/jira/browse/FLINK-35821 ?


was (Author: pnowojski):
[~hackergin], can you elaborate what's the problem? Is this the same problem 
as: https://issues.apache.org/jira/browse/FLINK-35821 ?

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865115#comment-17865115
 ] 

Piotr Nowojski commented on FLINK-35754:


[~hackergin], can you elaborate what's the problem? Is this the same problem 
as: https://issues.apache.org/jira/browse/FLINK-35821 ?

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][table] Apply uppercase for URL_ENCODE/URL_DECODE remove logging from function class [flink]

2024-07-11 Thread via GitHub


snuyanzin commented on code in PR #25076:
URL: https://github.com/apache/flink/pull/25076#discussion_r1674178015


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -433,7 +433,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
 public static final BuiltInFunctionDefinition URL_DECODE =
 BuiltInFunctionDefinition.newBuilder()
-.name("url_decode")
+.name("URL_DECODE")
 .kind(SCALAR)
 
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
 
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING().nullable(

Review Comment:
   @lincoln-lil thanks for the feedback
   
   now I see what you mean
   Yep, it makes, thanks for clarification
   
   I updated the PR to address this, could you please approve/reject?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674177861


##
flink-connector-aws/flink-connector-sqs/pom.xml:
##
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+4.0.0
+
+
+org.apache.flink
+flink-connector-aws-parent
+4.4-SNAPSHOT
+
+
+flink-connector-sqs
+Flink : Connectors : AWS : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+provided
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+
+
+
+software.amazon.awssdk
+sqs
+
+
+
+software.amazon.awssdk
+netty-nio-client
+
+
+
+
+org.apache.flink
+flink-test-utils
+${flink.version}
+test
+
+
+org.apache.flink
+flink-connector-test-utils
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test-jar
+test
+
+
+
+org.apache.flink
+flink-connector-base
+${flink.version}
+test-jar
+test
+
+
+
+org.testcontainers
+testcontainers
+test
+
+
+
+com.fasterxml.jackson.core
+jackson-core
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jsr310
+
+
+

Review Comment:
   nit `ArchUnit`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35821:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba


{noformat}
2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
 = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
2024-07-11T13:49:46.4142766Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
2024-07-11T13:49:46.4144185Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
2024-07-11T13:49:46.4145249Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
2024-07-11T13:49:46.4146510Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
2024-07-11T13:49:46.4147599Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
2024-07-11T13:49:46.4148975Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
2024-07-11T13:49:46.4150467Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
2024-07-11T13:49:46.4151977Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
2024-07-11T13:49:46.4153308Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
2024-07-11T13:49:46.4154713Z Jul 11 13:49:46at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
2024-07-11T13:49:46.4155416Z Jul 11 13:49:46at 
sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
2024-07-11T13:49:46.4156342Z Jul 11 13:49:46at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-07-11T13:49:46.4157291Z Jul 11 13:49:46at 
java.lang.reflect.Method.invoke(Method.java:498)
2024-07-11T13:49:46.4158065Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
2024-07-11T13:49:46.4159387Z Jul 11 13:49:46at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
2024-07-11T13:49:46.4160469Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
2024-07-11T13:49:46.4161819Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
2024-07-11T13:49:46.4163253Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
2024-07-11T13:49:46.4164717Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
2024-07-11T13:49:46.4165948Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
2024-07-11T13:49:46.4167080Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
2024-07-11T13:49:46.4168228Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
2024-07-11T13:49:46.4169380Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
2024-07-11T13:49:46.4170327Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
2024-07-11T13:49:46.4171192Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
2024-07-11T13:49:46.4171814Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
2024-07-11T13:49:46.4172433Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
2024-07-11T13:49:46.4173029Z Jul 11 13:49:46at 
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
2024-07-11T13:49:46.4173622Z Jul 11 13:49:46at 
org.apache.pekko.act

[jira] [Updated] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35821:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba

primary failure:

{noformat}
Caused by: java.io.FileNotFoundException: File 
file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863
 does not exist or the user running Flink ('agent01_azpcontainer') has 
insufficient permissions to access it.
{noformat}

Full stack trace

{noformat}
2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
 = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
2024-07-11T13:49:46.4142766Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
2024-07-11T13:49:46.4144185Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
2024-07-11T13:49:46.4145249Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
2024-07-11T13:49:46.4146510Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
2024-07-11T13:49:46.4147599Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
2024-07-11T13:49:46.4148975Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
2024-07-11T13:49:46.4150467Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
2024-07-11T13:49:46.4151977Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
2024-07-11T13:49:46.4153308Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
2024-07-11T13:49:46.4154713Z Jul 11 13:49:46at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
2024-07-11T13:49:46.4155416Z Jul 11 13:49:46at 
sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
2024-07-11T13:49:46.4156342Z Jul 11 13:49:46at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-07-11T13:49:46.4157291Z Jul 11 13:49:46at 
java.lang.reflect.Method.invoke(Method.java:498)
2024-07-11T13:49:46.4158065Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
2024-07-11T13:49:46.4159387Z Jul 11 13:49:46at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
2024-07-11T13:49:46.4160469Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
2024-07-11T13:49:46.4161819Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
2024-07-11T13:49:46.4163253Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
2024-07-11T13:49:46.4164717Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
2024-07-11T13:49:46.4165948Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
2024-07-11T13:49:46.4167080Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
2024-07-11T13:49:46.4168228Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
2024-07-11T13:49:46.4169380Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
2024-07-11T13:49:46.4170327Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
2024-07-11T13:49:46.4171192Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scal

[jira] [Created] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35821:
--

 Summary: ResumeCheckpointManuallyITCase failed with File X does 
not exist or the user running Flink C has insufficient permissions to access it
 Key: FLINK-35821
 URL: https://issues.apache.org/jira/browse/FLINK-35821
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Affects Versions: 2.0.0, 1.20.0
Reporter: Piotr Nowojski


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba


{noformat}
Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 48, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- in 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
Jul 11 13:49:46 13:49:46.412 [ERROR] 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
 = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
Jul 11 13:49:46 org.apache.flink.runtime.JobException: Recovery is suppressed 
by NoRestartBackoffTimeStrategy
Jul 11 13:49:46 at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
Jul 11 13:49:46 at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
Jul 11 13:49:46 at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
Jul 11 13:49:46 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
Jul 11 13:49:46 at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown 
Source)
Jul 11 13:49:46 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jul 11 13:49:46 at java.lang.reflect.Method.invoke(Method.java:498)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
Jul 11 13:49:46 at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
Jul 11 13:49:46 at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
Jul 11 13:49:46 at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
Jul 11 13:49:46 at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
Jul 11 13:49:46 at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
Jul 11 13:49:46 at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
Jul 11 13:49:46 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
Jul 11 13:49:46 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
Jul 11 13:49:46 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
Jul 11 13:49:46 at 
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
Jul 11 13:49:46 at 
org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
Jul 11 13:49:46 at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
Jul 11 13:49:46 at 
org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
Jul 11 13:49:46 at 
org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
Jul 11 13:49:46 at 
org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)

{noformat}




--
This message was sent by Atlassian Jira
(v8.20.1

Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]

2024-07-11 Thread via GitHub


twalthr commented on code in PR #25064:
URL: https://github.com/apache/flink/pull/25064#discussion_r1674171526


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java:
##
@@ -19,16 +19,16 @@
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Restore tests for {@link BatchExecCalc}. */
-public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase {
+public class CalcCompiledBatchTest extends CompiledBatchTestBase {

Review Comment:
   Similar architecture than the streaming ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]

2024-07-11 Thread via GitHub


twalthr commented on code in PR #25064:
URL: https://github.com/apache/flink/pull/25064#discussion_r1674170591


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java:
##
@@ -19,16 +19,16 @@
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Restore tests for {@link BatchExecCalc}. */
-public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase {
+public class CalcCompiledBatchTest extends CompiledBatchTestBase {

Review Comment:
   Sorry for the confusion. I thought we agreed on `BatchRestoreTestBase` and 
thus `CalcBatchRestoreTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #148:
URL: 
https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674154510


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java:
##
@@ -228,6 +231,97 @@ void testWakeUpIsNoOp() {
 assertThatNoException().isThrownBy(splitReader::wakeUp);
 }
 
+@Test
+void testPauseOrResumeSplits() throws Exception {
+testStreamProxy.addShards(TEST_SHARD_ID);
+KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID);
+
+List expectedRecords =
+Stream.of(getTestRecord("data-1"), getTestRecord("data-2"))
+.collect(Collectors.toList());
+testStreamProxy.addRecords(
+TestUtil.STREAM_ARN,
+TEST_SHARD_ID,
+Collections.singletonList(expectedRecords.get(0)));
+testStreamProxy.addRecords(
+TestUtil.STREAM_ARN,
+TEST_SHARD_ID,
+Collections.singletonList(expectedRecords.get(1)));
+splitReader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(testSplit)));
+
+// read data from split
+RecordsWithSplitIds records = splitReader.fetch();
+
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0));
+
+// pause split
+splitReader.pauseOrResumeSplits(
+Collections.singletonList(testSplit), Collections.emptyList());
+records = splitReader.fetch();
+// returns incomplete split with no records
+assertThat(records.finishedSplits()).isEmpty();
+assertThat(records.nextSplit()).isNull();
+assertThat(records.nextRecordFromSplit()).isNull();
+
+// resume split
+splitReader.pauseOrResumeSplits(
+Collections.emptyList(), Collections.singletonList(testSplit));
+records = splitReader.fetch();
+
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(1));
+}
+
+@Test
+void testPauseOrResumeSplitsOnlyPauseReadsFromSpecifiedSplits() throws 
Exception {
+KinesisShardSplit testSplit1 = getTestSplit(generateShardId(1));
+KinesisShardSplit testSplit2 = getTestSplit(generateShardId(2));
+
+shardMetricGroupMap.put(
+testSplit1.splitId(),
+new KinesisShardMetrics(testSplit1, 
metricListener.getMetricGroup()));
+shardMetricGroupMap.put(
+testSplit2.splitId(),
+new KinesisShardMetrics(testSplit2, 
metricListener.getMetricGroup()));
+
+testStreamProxy.addShards(testSplit1.splitId(), testSplit2.splitId());
+
+List recordsFromSplit1 =
+Arrays.asList(getTestRecord("split-1-data-1"), 
getTestRecord("split-1-data-2"));
+List recordsFromSplit2 =
+Arrays.asList(
+getTestRecord("split-2-data-1"),
+getTestRecord("split-2-data-2"),
+getTestRecord("split-2-data-3"));
+
+recordsFromSplit1.forEach(
+record ->
+testStreamProxy.addRecords(
+STREAM_ARN,
+testSplit1.getShardId(),
+Collections.singletonList(record)));
+recordsFromSplit2.forEach(
+record ->
+testStreamProxy.addRecords(
+STREAM_ARN,
+testSplit2.getShardId(),
+Collections.singletonList(record)));
+
+splitReader.handleSplitsChanges(
+new SplitsAddition<>(Arrays.asList(testSplit1, testSplit2)));
+
+// pause split 1
+splitReader.pauseOrResumeSplits(
+Collections.singletonList(testSplit1), 
Collections.emptyList());
+
+// read data from splits
+List fetchedRecords = new ArrayList<>();
+for (int i = 0; i < 10; i++) {
+RecordsWithSplitIds records = splitReader.fetch();
+fetchedRecords.addAll(readAllRecords(records));
+}
+
+// verify that only records from split 2 were fetched by reader
+
assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new 
Record[0]));

Review Comment:
   Should we test resuming from the paused split?
   
   Or 3x split, 2x pause and only resume 1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][table] Apply uppercase for URL_ENCODE/URL_DECODE remove logging from function class [flink]

2024-07-11 Thread via GitHub


lincoln-lil commented on code in PR #25076:
URL: https://github.com/apache/flink/pull/25076#discussion_r1674153859


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -433,7 +433,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
 public static final BuiltInFunctionDefinition URL_DECODE =
 BuiltInFunctionDefinition.newBuilder()
-.name("url_decode")
+.name("URL_DECODE")
 .kind(SCALAR)
 
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
 
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING().nullable(

Review Comment:
   @snuyanzin As I saw this pr and realized I missed your message on the 
previous one(#24773).
   For the output type strategy, I think it should be 
'outputTypeStrategy(explicit(DataTypes.STRING().nullable()))' because the 
return value may be null even if the input is non-null (e.g., decoding 
failure), otherwise the downstream operator may trust the non-null property and 
encounter a runtime exception (e.g., NPE) if skip the null check for field 
accessing. 
   Does this make sense to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]

2024-07-11 Thread via GitHub


jnh5y commented on code in PR #25064:
URL: https://github.com/apache/flink/pull/25064#discussion_r1674149685


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java:
##
@@ -19,16 +19,16 @@
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Restore tests for {@link BatchExecCalc}. */
-public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase {
+public class CalcCompiledBatchTest extends CompiledBatchTestBase {

Review Comment:
   Are you looking for `CalcCompiledBatchTest` to change to 
`CalcBatchRestoreTest`?
   
   Or are you suggesting that `CompiledBatchTestBase ` should change to 
`BatchRestoreTestBase `?
   
   Lemme know which of those makes sense and I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #148:
URL: 
https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674144962


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java:
##
@@ -127,6 +137,14 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges) {
 }
 }
 
+@Override
+public void pauseOrResumeSplits(
+Collection splitsToPause,
+Collection splitsToResume) {
+splitsToPause.forEach(split -> pausedSplitIds.add(split.splitId()));
+splitsToResume.forEach(split -> 
pausedSplitIds.remove(split.splitId()));

Review Comment:
   Do we need to check and log if we resume a split that is not paused / pause 
a split that doesn't exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-07-11 Thread via GitHub


lincoln-lil commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1674142139


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -431,6 +431,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.SplitFunction")
 .build();
 
+public static final BuiltInFunctionDefinition URL_DECODE =
+BuiltInFunctionDefinition.newBuilder()
+.name("url_decode")
+.kind(SCALAR)
+
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING(

Review Comment:
   @snuyanzin Sorry I missed the ping here! IIUC the 
`AbstractDataType`#`nullable` says it's the default behavior means a datatype 
which doesn't explictly declares its nullabilty then the default behavior is 
nullable.
   For the output type strategy, I think I should point it out more clearly: it 
should be 'outputTypeStrategy(explicit(DataTypes.STRING().nullable()))' because 
the return value may be null even if the input is non-null (e.g., decoding 
failure), WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]

2024-07-11 Thread via GitHub


twalthr commented on code in PR #25064:
URL: https://github.com/apache/flink/pull/25064#discussion_r1674135779


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java:
##
@@ -19,16 +19,16 @@
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Restore tests for {@link BatchExecCalc}. */
-public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase {
+public class CalcCompiledBatchTest extends CompiledBatchTestBase {

Review Comment:
   I would still call it `BatchRestoreTest` it will make discussions easier and 
it also performs kind of a restore but from plan only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


snuyanzin commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674127275


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+private static boolean isValidJsonVal(String jsonInString) {
+// See also BuiltInMethods.scala, IS_JSON_VALUE
+return SqlJsonUtils.isJsonValue(jsonInString);
+}
+
+private String unescapeStr(String inputStr) {
+StringBuilder result = new StringBuilder();
+int i = 0;
+while (i < inputStr.length()) {
+if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) {
+i++; // move to the next char
+char ch = inputStr.charAt(i++);
+
+switch (ch) {
+case '"':
+result.append(ch);
+break;
+case '\\':
+result.append(ch);
+break;
+case '/':
+result.append(ch);
+break;
+case 'b':
+result.append('\b');
+break;
+case 'f':
+result.append('\f');
+break;
+case 'n':
+result.append('\n');
+break;
+case 'r':
+result.append('\r');
+break;
+case 't':
+result.append('\t');
+break;
+case 'u':
+result.append(fromUnicodeLiteral(inputStr, i));
+i = i + 4;
+break;
+default:
+throw new RuntimeException("Illegal escape sequence: 
\\" + ch);

Review Comment:
   Great you spotted this
   
   IMHO: since it is already runtime shouldn't it be `FlinkRuntimeException` 
here?
   
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
   
   which is already used within builtin functions at 
https://github.com/apache/flink/tree/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


snuyanzin commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674129926


##
docs/data/sql_functions.yml:
##
@@ -377,6 +377,12 @@ string:
   - sql: SUBSTR(string, integer1[, integer2])
 table: STRING.substr(INTEGER1[, INTEGER2])
 description: Returns a substring of string starting from position integer1 
with length integer2 (to the end by default).
+  - sql: JSON_QUOTE(string)

Review Comment:
   Thanks for creation of jira
   I would still vote for having filled chinese doc with english description 
which then could be translated within the jira issue you've created



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


snuyanzin commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674127275


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+private static boolean isValidJsonVal(String jsonInString) {
+// See also BuiltInMethods.scala, IS_JSON_VALUE
+return SqlJsonUtils.isJsonValue(jsonInString);
+}
+
+private String unescapeStr(String inputStr) {
+StringBuilder result = new StringBuilder();
+int i = 0;
+while (i < inputStr.length()) {
+if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) {
+i++; // move to the next char
+char ch = inputStr.charAt(i++);
+
+switch (ch) {
+case '"':
+result.append(ch);
+break;
+case '\\':
+result.append(ch);
+break;
+case '/':
+result.append(ch);
+break;
+case 'b':
+result.append('\b');
+break;
+case 'f':
+result.append('\f');
+break;
+case 'n':
+result.append('\n');
+break;
+case 'r':
+result.append('\r');
+break;
+case 't':
+result.append('\t');
+break;
+case 'u':
+result.append(fromUnicodeLiteral(inputStr, i));
+i = i + 4;
+break;
+default:
+throw new RuntimeException("Illegal escape sequence: 
\\" + ch);

Review Comment:
   IMHO: since it is already runtime shouldn't it be `FlinkRuntimeException` 
here?
   
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
   
   which is already used within builtin functions at 
https://github.com/apache/flink/tree/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35820] Converting Duration to String fails for big values [flink]

2024-07-11 Thread via GitHub


twalthr commented on code in PR #25077:
URL: https://github.com/apache/flink/pull/25077#discussion_r1674118355


##
flink-core/src/main/java/org/apache/flink/util/TimeUtils.java:
##
@@ -39,6 +41,10 @@ public class TimeUtils {
 private static final Map LABEL_TO_UNIT_MAP =
 Collections.unmodifiableMap(initMap());
 
+private static final BigInteger NANOS_PER_SECOND = 
BigInteger.valueOf(1000_000_000L);

Review Comment:
   ```suggestion
   private static final BigInteger NANOS_PER_SECOND = 
BigInteger.valueOf(1_000_000_000L);
   ```



##
flink-core/src/main/java/org/apache/flink/util/TimeUtils.java:
##
@@ -79,30 +85,45 @@ public static Duration parseDuration(String text) {
 throw new NumberFormatException("text does not start with a 
number");
 }
 
-final long value;
+final BigInteger value;
 try {
-value = Long.parseLong(number); // this throws a 
NumberFormatException on overflow
+value = new BigInteger(number); // this throws a 
NumberFormatException
 } catch (NumberFormatException e) {
 throw new IllegalArgumentException(
-"The value '"
-+ number
-+ "' cannot be re represented as 64bit number 
(numeric overflow).");
+"The value '" + number + "' cannot be re represented as an 
integer number.", e);

Review Comment:
   ```suggestion
   "The value '" + number + "' cannot be represented as an 
integer number.", e);
   ```



##
flink-core/src/main/java/org/apache/flink/util/TimeUtils.java:
##
@@ -79,30 +85,45 @@ public static Duration parseDuration(String text) {
 throw new NumberFormatException("text does not start with a 
number");
 }
 
-final long value;
+final BigInteger value;
 try {
-value = Long.parseLong(number); // this throws a 
NumberFormatException on overflow
+value = new BigInteger(number); // this throws a 
NumberFormatException
 } catch (NumberFormatException e) {
 throw new IllegalArgumentException(
-"The value '"
-+ number
-+ "' cannot be re represented as 64bit number 
(numeric overflow).");
+"The value '" + number + "' cannot be re represented as an 
integer number.", e);
 }
 
+final ChronoUnit unit;
 if (unitLabel.isEmpty()) {
-return Duration.of(value, ChronoUnit.MILLIS);
-}
-
-ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
-if (unit != null) {
-return Duration.of(value, unit);
+unit = ChronoUnit.MILLIS;
 } else {
+unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+}
+if (unit == null) {
 throw new IllegalArgumentException(
 "Time interval unit label '"
 + unitLabel
 + "' does not match any of the recognized units: "
 + TimeUnit.getAllUnits());
 }
+
+try {
+return convertBigIntToDuration(value, unit);
+} catch (ArithmeticException e) {
+throw new IllegalArgumentException(
+"The value '"
++ number
++ "' cannot be re represented as 
java.time.Duration (numeric overflow).",

Review Comment:
   ```suggestion
   + "' cannot be represented as java.time.Duration 
(numeric overflow).",
   ```



##
flink-core/src/main/java/org/apache/flink/util/TimeUtils.java:
##
@@ -136,17 +157,35 @@ public static String getStringInMillis(final Duration 
duration) {
  * NOTE: It supports only durations that fit into long.
  */
 public static String formatWithHighestUnit(Duration duration) {
-long nanos = duration.toNanos();
+BigInteger nanos = toNanos(duration);
 
 TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
 return String.format(
-"%d %s",
-nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+"%s %s",
+nanos.divide(highestIntegerUnit.getUnitAsNanos()),
 highestIntegerUnit.getLabels().get(0));
 }
 
-private static TimeUnit getHighestIntegerUnit(long nanos) {
-if (nanos == 0) {
+/**
+ * Converted from {@link Duration#toNanos()}, but produces {@link 
BigDecimal} and does not throw

Review Comment:
   ```suggestion
* Converted from {@link Duration#toNanos()}, but produces {@link 
BigInteger} and does not throw
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscr

Re: [PR] [FLINK-35820] Converting Duration to String fails for big values [flink]

2024-07-11 Thread via GitHub


flinkbot commented on PR #25077:
URL: https://github.com/apache/flink/pull/25077#issuecomment-2223086708

   
   ## CI report:
   
   * ff92364b08368175671a0813e8873f46cb602c2c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


fhueske commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674108789


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+public @Nullable Object eval(Object input) {
+if (input == null) {
+return null;
+}
+BinaryStringData bs = (BinaryStringData) input;
+String inputStr = bs.toString();
+try {
+if (isValidJsonVal(inputStr)) {
+return new BinaryStringData(unescapeValidJson(inputStr));
+}
+} catch (Throwable t) {
+// ignore
+}

Review Comment:
   I don't think we should swallow any kind of exception or error.
   Let's catch the errors that we expect (IllegalArgumentException, etc.) and 
handle those by returning the original input.
   Unexpected exceptions should be forwarded because they might be related to 
other issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


fhueske commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674107753


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+private static boolean isValidJsonVal(String jsonInString) {
+// See also BuiltInMethods.scala, IS_JSON_VALUE
+return SqlJsonUtils.isJsonValue(jsonInString);
+}
+
+private String unescapeStr(String inputStr) {
+StringBuilder result = new StringBuilder();
+int i = 0;
+while (i < inputStr.length()) {
+if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) {
+i++; // move to the next char
+char ch = inputStr.charAt(i++);
+
+switch (ch) {
+case '"':
+result.append(ch);
+break;
+case '\\':
+result.append(ch);
+break;
+case '/':
+result.append(ch);
+break;
+case 'b':
+result.append('\b');
+break;
+case 'f':
+result.append('\f');
+break;
+case 'n':
+result.append('\n');
+break;
+case 'r':
+result.append('\r');
+break;
+case 't':
+result.append('\t');
+break;
+case 'u':
+result.append(fromUnicodeLiteral(inputStr, i));
+i = i + 4;
+break;
+default:
+throw new RuntimeException("Illegal escape sequence: 
\\" + ch);
+}
+} else {
+result.append(inputStr.charAt(i));
+i++;
+}
+}
+return result.toString();
+}
+
+private String unescapeValidJson(String inputStr) {
+// check for a quoted json string val and unescape
+if (inputStr.charAt(0) == '"' && inputStr.charAt(inputStr.length() - 
1) == '"') {
+// remove quotes, string len is atleast 2 here
+return unescapeStr(inputStr.substring(1, inputStr.length() - 1));
+} else {
+// string representing Json - array, object or unquoted scalar 
val, return as-is
+return inputStr;
+}
+}
+
+private static String fromUnicodeLiteral(String input, int curPos) {
+
+StringBuilder number = new StringBuilder();
+// isValidJsonVal will already check for unicode literal validity
+for (char ch : input.substring(curPos, curPos + 4).toCharArray()) {
+number.append(Character.toLowerCase(ch));
+}
+int code = Integer.parseInt(number.toString(), 16);
+return String.valueOf((char) code);
+}
+
+public @Nullable Object eval(Object input) {
+if (input == null) {
+return null;
+}
+BinaryStringData bs = (BinaryStringData) input;
+String inputStr = bs.toString();
+try {
+if (isValidJsonVal(inputStr)) {
+return new BinaryStri

[jira] [Updated] (FLINK-35820) Converting Duration to String fails for big values

2024-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35820:
---
Labels: pull-request-available  (was: )

> Converting Duration to String fails for big values
> --
>
> Key: FLINK-35820
> URL: https://issues.apache.org/jira/browse/FLINK-35820
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.19.1
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> We support {{ConfigOption}} with {{Duration}} type. The {{Configuration}} 
> class can read/write {{Duration}} from/to {{String}}. Unfortunately the 
> conversion fails for values that do not fit into {{long}} after conversion to 
> nanos.
> E.g.
> {code}
> final ConfigOption option = ConfigOptions
> .key("duration")
> .durationType()
> .noDefaultValue();
> final Configuration config = new Configuration();
> config.set(option, Duration.ofMillis(Long.MAX_VALUE));
> config.toFileWritableMap();
> {code}
> fails with {{java.lang.ArithmeticException: long overflow}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35820] Converting Duration to String fails for big values [flink]

2024-07-11 Thread via GitHub


dawidwys opened a new pull request, #25077:
URL: https://github.com/apache/flink/pull/25077

   ## What is the purpose of the change
   
   Supports converting all values of Duration to/from String.
   
   ## Verifying this change
   
   Added tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-11 Thread via GitHub


fhueske commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1674098744


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+public JsonUnquoteFunction(SpecializedContext context) {
+super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+}
+
+private static boolean isValidJsonVal(String jsonInString) {
+// See also BuiltInMethods.scala, IS_JSON_VALUE
+return SqlJsonUtils.isJsonValue(jsonInString);
+}
+
+private String unescapeStr(String inputStr) {
+StringBuilder result = new StringBuilder();
+int i = 0;
+while (i < inputStr.length()) {
+if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) {
+i++; // move to the next char
+char ch = inputStr.charAt(i++);
+
+switch (ch) {
+case '"':
+result.append(ch);
+break;
+case '\\':
+result.append(ch);
+break;
+case '/':
+result.append(ch);
+break;
+case 'b':
+result.append('\b');
+break;
+case 'f':
+result.append('\f');
+break;
+case 'n':
+result.append('\n');
+break;
+case 'r':
+result.append('\r');
+break;
+case 't':
+result.append('\t');
+break;
+case 'u':
+result.append(fromUnicodeLiteral(inputStr, i));
+i = i + 4;
+break;
+default:
+throw new RuntimeException("Illegal escape sequence: 
\\" + ch);
+}
+} else {
+result.append(inputStr.charAt(i));
+i++;
+}
+}
+return result.toString();
+}
+
+private String unescapeValidJson(String inputStr) {
+// check for a quoted json string val and unescape
+if (inputStr.charAt(0) == '"' && inputStr.charAt(inputStr.length() - 
1) == '"') {
+// remove quotes, string len is atleast 2 here
+return unescapeStr(inputStr.substring(1, inputStr.length() - 1));
+} else {
+// string representing Json - array, object or unquoted scalar 
val, return as-is
+return inputStr;
+}
+}
+
+private static String fromUnicodeLiteral(String input, int curPos) {
+
+StringBuilder number = new StringBuilder();
+// isValidJsonVal will already check for unicode literal validity
+for (char ch : input.substring(curPos, curPos + 4).toCharArray()) {
+number.append(Character.toLowerCase(ch));
+}
+int code = Integer.parseInt(number.toString(), 16);
+return String.valueOf((char) code);
+}
+
+public @Nullable Object eval(Object input) {
+if (input == null) {
+return null;
+}
+BinaryStringData bs = (BinaryStringData) input;
+String inputStr = bs.toString();
+try {
+if (isValidJsonVal(inputStr)) {
+return new BinaryStri

  1   2   >