Re: [PR] [fix-issue-2676] repair a snapshot-split bug: [flink-cdc]
yuxiqian commented on PR #2968: URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2224947947 Hi @AidenPerce, is there any updates on this PR? Feel free to comment here if you need any help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.
Yuan Kui created FLINK-35826: Summary: [SQL] Sliding window may produce unstable calculations when processing changelog data. Key: FLINK-35826 URL: https://issues.apache.org/jira/browse/FLINK-35826 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.20.0 Environment: flink with release-1.20 Reporter: Yuan Kui Attachments: image-2024-07-12-14-27-58-061.png Calculation results may be unstable when using a sliding window to process changelog data. The test results are partial success and partial failure: !image-2024-07-12-14-27-58-061.png! See the documentation and code for more details. [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing] code: [[BUG] Reproduce the issue of unstable sliding window calculation results · yuchengxin/flink@c003e45 (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35816][table] Non-mergeable proctime tvf window aggregate needs to fallback to group aggregate [flink]
xuyangzhong commented on code in PR #25075: URL: https://github.com/apache/flink/pull/25075#discussion_r1675358942 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala: ## @@ -385,35 +384,97 @@ object WindowUtil { } } - private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = { + private def isValidRowtimeWindow(windowProperties: RelWindowProperties): Boolean = { +// rowtime tvf window can support calculation on window columns even before aggregation +windowProperties.isRowtime + } -@tailrec -def find(rel: RelNode): Unit = { - rel match { -case rss: RelSubset => - val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal) - find(innerRel) + /** + * If the middle Calc(s) contains call(s) on window columns, we should not convert the Aggregate + * into WindowAggregate but GroupAggregate instead. + * + * The valid plan structure is like: + * + * {{{ + * Aggregate + * | + * Calc (should not contain call on window columns) + * | + * WindowTableFunctionScan + * }}} + * + * and unlike: + * + * {{{ + * Aggregate + * | + * Calc + * | + * Aggregate + * | + * Calc + * | + * WindowTableFunctionScan + * }}} + */ + private def isValidProcTimeWindow( + windowProperties: RelWindowProperties, + fmq: FlinkRelMetadataQuery, + agg: FlinkLogicalAggregate): Boolean = { +var existNeighbourWindowTableFunc = false +val calcMatcher = new CalcWindowFunctionScanMatcher +try { + calcMatcher.go(agg.getInput(0)) +} catch { + case r: Util.FoundOne => +r.getNode match { + case _: Some[_] => +existNeighbourWindowTableFunc = true + case _ => // do nothing +} +} +var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty && + calcMatcher.calcNodes.exists(calc => calcContainsCallsOnWindowColumns(calc, fmq)) + +// aggregate call shouldn't be on window columns +val aggInputWindowProps = windowProperties.getWindowColumns +existCallOnWindowColumns = existCallOnWindowColumns || !agg.getAggCallList.forall { + call => aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty +} +// proctime window can's support calculation on window columns before aggregation, +// and need to check if there is a neighbour windowTableFunctionCall +!existCallOnWindowColumns && existNeighbourWindowTableFunc + } + + private class CalcWindowFunctionScanMatcher extends RelVisitor { +val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]() + +override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { + node match { +case calc: Calc => + calcNodes += calc + // continue to visit children + super.visit(calc, 0, parent) case scan: FlinkLogicalTableFunctionScan => if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) { -throw new Util.FoundOne +throw new Util.FoundOne(Some(0)) } - find(scan.getInput(0)) - -// proctime attribute comes from these operators can not be used directly for proctime -// window aggregate, so further traversal of child nodes is unnecessary -case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _: FlinkLogicalJoin => - -case sr: SingleRel => find(sr.getInput) +case rss: RelSubset => + val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal) + // special case doesn't call super.visit for RelSubSet because it has no children + visit(innerRel, 0, rss) +case _: FlinkLogicalAggregate | _: FlinkLogicalCorrelate | _: FlinkLogicalIntersect | +_: FlinkLogicalJoin | _: FlinkLogicalMatch | _: FlinkLogicalMinus | +_: FlinkLogicalOverAggregate | _: FlinkLogicalRank | _: FlinkLogicalUnion => + // proctime attribute comes from these operators can not be used directly for proctime + // window aggregate, so further traversal of child nodes is unnecessary + throw new Util.FoundOne(Option.empty) Review Comment: We can return directly here to stop the further traversal. ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala: ## @@ -385,35 +384,97 @@ object WindowUtil { } } - private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = { + private def isValidRowtimeWindow(windowProperties: RelWindowProperties): Boolean = { +// rowtime tvf window can support calculation on window columns even before aggregation +windowProperties.isRowtime + } -@tailrec -def find(rel: RelNode): Unit = { - rel match { -case rss: RelSubset => - val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal) -
[jira] [Commented] (FLINK-21436) Speed up the restore of UnionListState
[ https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865323#comment-17865323 ] Yue Ma commented on FLINK-21436: [~fanrui] Yes, i think you are right , Most of the operators using UnionState are legacy sources and sinks. But there are still some users who also use unionState themselves. My question is if we plan to deprecate unionState in the feature, can we add some notice in the document (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/)? If we don't have this intention, it's necessary to optimize the recovery time of union State? > Speed up the restore of UnionListState > > > Key: FLINK-21436 > URL: https://issues.apache.org/jira/browse/FLINK-21436 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.13.0 >Reporter: Rui Fan >Priority: Minor > Labels: auto-deprioritized-major > Attachments: JM 启动火焰图.svg, akka timeout Exception.png > > > h1. 1. Problem introduction and cause analysis > Problem description: The duration of UnionListState restore under large > concurrency is more than 2 minutes. > h2. the reason: > 2000 subtasks write 2000 files during checkpoint, and each subtask needs to > read 2000 files during restore. > 2000*2000 = 4 million, so 4 million small files need to be read to hdfs > during restore. HDFS has become a bottleneck, causing restore to be > particularly time-consuming. > h1. 2. Optimize ideas > Under normal circumstances, the UnionListState state is relatively small. > Typical usage scenario: Kafka offset information. > When restoring, JM can directly read all 2000 small files, merge > UnionListState into a byte array and send it to all TMs to avoid frequent > access to hdfs by TMs. > h1. 3. Benefits after optimization > Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s. > After optimization: 2000 concurrent, Kafka offset restore takes less than 1s. > h1. 4. Risk points > Too big UnionListState leads to too much pressure on JM. > Solution 1: > Add configuration and decide whether to enable this feature. The default is > false, which means the old plan is used. When the user is set to true, JM > will merge. > Solution 2: > The above configuration is not required, which is equivalent to enabling > merge by default. > JM detects the size of the state before merge, and if it is less than the > threshold, the state is considered to be relatively small, and the state is > sent to all TMs through ByteStreamStateHandle. > If the threshold is exceeded, the state is considered to be greater. At this > time, write an hdfs file, and send FileStateHandle to all TMs, and TM can > read this file. > > Note: Most of the scenarios where Flink uses UnionListState are Kafka offset > (small state). In theory, most jobs are risk-free. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35804][table-planner] Fix incorrect calc merge to avoid wrong plans about udtf+join+udf [flink]
lincoln-lil commented on code in PR #25068: URL: https://github.com/apache/flink/pull/25068#discussion_r167536 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala: ## @@ -207,4 +210,19 @@ class CalcTest extends TableTestBase { val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) > 10" util.verifyRelPlan(sqlQuery) } + + @Test + def testCalcMergeWithNonDeterministicExpr3(): Unit = { Review Comment: We can use a minimal case to reproduce the error, e.g., ```scala @Test def testCalcMergeWithCorrelate(): Unit = { util.addTemporarySystemFunction("str_split", new StringSplit()) val sqlQuery = """ | |SELECT a, r FROM ( | SELECT a, random_udf(b) r FROM ( | select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1) | ) t |) |WHERE r > 10 |""".stripMargin util.verifyRelPlan(sqlQuery) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][runtime] Avoid duplicating broadcast records redundantly for hybrid shuffle [flink]
reswqa merged PR #25025: URL: https://github.com/apache/flink/pull/25025 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35624) Release Testing: Verify FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-35624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865319#comment-17865319 ] Zakelly Lan commented on FLINK-35624: - Hi [~fanrui], would you please re-run your testing? I think we have fix all the remaining issues. And thanks [~Yanfei Lei] for your manual test! > Release Testing: Verify FLIP-306 Unified File Merging Mechanism for > Checkpoints > --- > > Key: FLINK-35624 > URL: https://issues.apache.org/jira/browse/FLINK-35624 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Zakelly Lan >Assignee: Rui Fan >Priority: Blocker > Labels: release-testing > Fix For: 1.20.0 > > Attachments: image-2024-07-07-14-04-47-065.png, > image-2024-07-08-17-05-40-546.png > > > Follow up the test for https://issues.apache.org/jira/browse/FLINK-32070 > > 1.20 is the MVP version for FLIP-306. It is a little bit complex and should > be tested carefully. The main idea of FLIP-306 is to merge checkpoint files > in TM side, and provide new {{{}StateHandle{}}}s to the JM. There will be a > TM-managed directory under the 'shared' checkpoint directory for each > subtask, and a TM-managed directory under the 'taskowned' checkpoint > directory for each Task Manager. Under those new introduced directories, the > checkpoint files will be merged into smaller file set. The following > scenarios need to be tested, including but not limited to: > # With the file merging enabled, periodic checkpoints perform properly, and > the failover, restore and rescale would also work well. > # Switch the file merging on and off across jobs, checkpoints and recovery > also work properly. > # There will be no left-over TM-managed directory, especially when there is > no cp complete before the job cancellation. > # File merging takes no effect in (native) savepoints. > Besides the behaviors above, it is better to validate the function of space > amplification control and metrics. All the config options can be found under > 'execution.checkpointing.file-merging'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35810) AsyncWaitOperatorTest.testProcessingTimeRepeatedCompleteUnorderedWithRetry fails
[ https://issues.apache.org/jira/browse/FLINK-35810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865318#comment-17865318 ] Weijie Guo commented on FLINK-35810: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60871&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10509 > AsyncWaitOperatorTest.testProcessingTimeRepeatedCompleteUnorderedWithRetry > fails > > > Key: FLINK-35810 > URL: https://issues.apache.org/jira/browse/FLINK-35810 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 2.0.0 >Reporter: Rui Fan >Priority: Major > > AsyncWaitOperatorTest.testProcessingTimeRepeatedCompleteUnorderedWithRetry > fails > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60837&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=10159 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34513) GroupAggregateRestoreTest.testRestore fails
[ https://issues.apache.org/jira/browse/FLINK-34513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865317#comment-17865317 ] Weijie Guo commented on FLINK-34513: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60838&view=logs&j=de826397-1924-5900-0034-51895f69d4b7&t=f311e913-93a2-5a37-acab-4a63e1328f94&l=11746 > GroupAggregateRestoreTest.testRestore fails > --- > > Key: FLINK-34513 > URL: https://issues.apache.org/jira/browse/FLINK-34513 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 1.19.1 >Reporter: Matthias Pohl >Assignee: Bonnie Varghese >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57828&view=logs&j=26b84117-e436-5720-913e-3e280ce55cae&t=77cc7e77-39a0-5007-6d65-4137ac13a471&l=10881 > {code} > Feb 24 01:12:01 01:12:01.384 [ERROR] Tests run: 10, Failures: 1, Errors: 0, > Skipped: 1, Time elapsed: 2.957 s <<< FAILURE! -- in > org.apache.flink.table.planner.plan.nodes.exec.stream.GroupAggregateRestoreTest > Feb 24 01:12:01 01:12:01.384 [ERROR] > org.apache.flink.table.planner.plan.nodes.exec.stream.GroupAggregateRestoreTest.testRestore(TableTestProgram, > ExecNodeMetadata)[4] -- Time elapsed: 0.653 s <<< FAILURE! > Feb 24 01:12:01 java.lang.AssertionError: > Feb 24 01:12:01 > Feb 24 01:12:01 Expecting actual: > Feb 24 01:12:01 ["+I[3, 1, 2, 8, 31, 10.0, 3]", > Feb 24 01:12:01 "+I[2, 1, 4, 14, 42, 7.0, 6]", > Feb 24 01:12:01 "+I[1, 1, 4, 12, 24, 6.0, 4]", > Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 8.0, 7]", > Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 6.0, 5]", > Feb 24 01:12:01 "+I[7, 0, 1, 7, 7, 7.0, 1]", > Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 7.0, 7]", > Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 5.0, 5]", > Feb 24 01:12:01 "+U[3, 1, 2, 8, 31, 9.0, 3]", > Feb 24 01:12:01 "+U[7, 0, 1, 7, 7, 7.0, 2]"] > Feb 24 01:12:01 to contain exactly in any order: > Feb 24 01:12:01 ["+I[3, 1, 2, 8, 31, 10.0, 3]", > Feb 24 01:12:01 "+I[2, 1, 4, 14, 42, 7.0, 6]", > Feb 24 01:12:01 "+I[1, 1, 4, 12, 24, 6.0, 4]", > Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 8.0, 7]", > Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 6.0, 5]", > Feb 24 01:12:01 "+U[3, 1, 2, 8, 31, 9.0, 3]", > Feb 24 01:12:01 "+U[2, 1, 4, 14, 57, 7.0, 7]", > Feb 24 01:12:01 "+I[7, 0, 1, 7, 7, 7.0, 2]", > Feb 24 01:12:01 "+U[1, 1, 4, 12, 32, 5.0, 5]"] > Feb 24 01:12:01 elements not found: > Feb 24 01:12:01 ["+I[7, 0, 1, 7, 7, 7.0, 2]"] > Feb 24 01:12:01 and elements not expected: > Feb 24 01:12:01 ["+I[7, 0, 1, 7, 7, 7.0, 1]", "+U[7, 0, 1, 7, 7, 7.0, 2]"] > Feb 24 01:12:01 > Feb 24 01:12:01 at > org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase.testRestore(RestoreTestBase.java:313) > Feb 24 01:12:01 at > java.base/java.lang.reflect.Method.invoke(Method.java:580) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35825][hive] HiveTableSource supports report statistics for text file [flink]
flinkbot commented on PR #25078: URL: https://github.com/apache/flink/pull/25078#issuecomment-2224721039 ## CI report: * 922576ae0ae4cd3cf36ceeb0f53f81fa82f0fa0d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35825) HiveTableSource supports report statistics for text file
[ https://issues.apache.org/jira/browse/FLINK-35825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35825: --- Labels: pull-request-available (was: ) > HiveTableSource supports report statistics for text file > > > Key: FLINK-35825 > URL: https://issues.apache.org/jira/browse/FLINK-35825 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35825][hive] HiveTableSource supports report statistics for text file [flink]
reswqa opened a new pull request, #25078: URL: https://github.com/apache/flink/pull/25078 ## What is the purpose of the change *HiveTableSource supports report statistics for text file* ## Brief change log - *Introduce a `TextFormatStatisticsReportUtil` to estimate statistics for text fie.* ## Verifying this change covered by ut. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
yuxiqian commented on code in PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1675329046 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java: ## @@ -416,6 +416,28 @@ public void testTableWithoutPrimaryKey() { } } +@Test +public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() { +String tableWithoutPrimaryKey = "customers_no_pk"; +List expected = +Arrays.asList( +"customers_no_pk null [462]", +"customers_no_pk [462] [823]", +"customers_no_pk [823] [1184]", +"customers_no_pk [1184] [1545]", +"customers_no_pk [1545] [1906]", +"customers_no_pk [1906] null"); +List splits = +getTestAssignSnapshotSplits( +customerDatabase, +4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), +new String[] {tableWithoutPrimaryKey}, +"id"); +assertEquals(expected, splits); +} Review Comment: It would be nice if we can also test using non-primary key columns as chunk keys for table with primary keys. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35825) HiveTableSource supports report statistics for text file
Weijie Guo created FLINK-35825: -- Summary: HiveTableSource supports report statistics for text file Key: FLINK-35825 URL: https://issues.apache.org/jira/browse/FLINK-35825 Project: Flink Issue Type: Improvement Components: Connectors / Hive Reporter: Weijie Guo Assignee: Weijie Guo Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
SML0127 commented on PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2224625883 > @leonardBang PTAL @leonardBang @yuxiqian PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35154] Javadoc generating fix [flink]
ldadima commented on PR #24684: URL: https://github.com/apache/flink/pull/24684#issuecomment-2224569550 Thanks for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] pipeline add param scan.newly-added-table.enabled [flink-cdc]
yuxiqian commented on PR #3470: URL: https://github.com/apache/flink-cdc/pull/3470#issuecomment-2224424493 Thanks for @yangup's contribution, but seems it's a duplicate of #3411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675080346 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that writes String + * values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setSqsUrl("sqsUrl") + * .setSqsClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 10 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 5000 + * {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code maxTimeInBufferMs} will be 5000ms + * {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code failOnError} will be false + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class SqsSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 10; +private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; +private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000; +private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000; +private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; +private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000; +private static final boolean DEFAULT_FAIL_ON_ERROR = false; +private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1; + +private Boolean failOnError; +private String sqsUrl; +private Properties sqsClientProperties; +private SerializationSchema serializationSchema; + +SqsSinkBuilder() {} + +/** + * Sets the url of the SQS that the sink will connect to. There is no default for this + * parameter, therefore, this must be provided at sink creation time otherwise the build will + * fail. + * + * @param sqsUrl the url of the Sqs + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSqsUrl(String sqsUrl) { +this.sqsUrl = sqsUrl; +return this; +} + +/** + * Allows the user to specify a serialization schema to serialize each record to persist to SQS. + * + * @param schema serialization schema to use + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSerializationSchema(final SerializationSchema schema) { +serializationSchema = schema; +return this; +} Review Comment: I tried to update the same, please let me know if that looks good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: Test case passed without this, hence removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675069242 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,105 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.4-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests Review Comment: Above command didn't work for me but I tried to run the test case by putting the test case file in same sqs-sink/test case folder and test passed successfully there. Please let me know if that still fails at your end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35824) Polish JDBC documentation : fix link and add back to top button
Zhongqiang Gong created FLINK-35824: --- Summary: Polish JDBC documentation : fix link and add back to top button Key: FLINK-35824 URL: https://issues.apache.org/jira/browse/FLINK-35824 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Reporter: Zhongqiang Gong Assignee: Zhongqiang Gong -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.
[ https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-35823: Fix Version/s: (was: 2.0.0) > Introduce parameters to control the upper limit of rescale to avoid unlimited > expansion due to server-side bottlenecks or data skew. > > > Key: FLINK-35823 > URL: https://issues.apache.org/jira/browse/FLINK-35823 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Major > > 1. If a Flink application writes data to other external storage systems, such > as HDFS, Kafka, etc., when the external server becomes the bottleneck of the > entire task, such as the throughput of HDFS decreases, the writing IO time > will increase, and the corresponding Flink The metric busy will also > increase. At this time, the autoscaler will determine that the parallelism > needs to be increased to increase the write rate. However, in the above case, > due to the bottleneck of the external server, this will not work. This will > cause the next determination cycle to continue to increase the parallelism > until parallelism = max-parallelism. > 2. If some tasks have data skew, it will also cause the same problem. > > Therefore, we should introduce a new parameter judgment. If the degree of > parallelism continues to increase, the throughput will basically remain the > same. There is no need to expand anymore. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35804][table-planner] Fix incorrect calc merge to avoid wrong plans about udtf+join+udf [flink]
zhaorongsheng commented on PR #25068: URL: https://github.com/apache/flink/pull/25068#issuecomment-2224329818 @flinkbot Could someone review this pr? Thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35240) Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record
[ https://issues.apache.org/jira/browse/FLINK-35240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhongqiang Gong resolved FLINK-35240. - Fix Version/s: 2.0.0 Resolution: Fixed Merged into master via: 4154b8d08c1a2b901b058b020d546de59ba6989d > Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record > - > > Key: FLINK-35240 > URL: https://issues.apache.org/jira/browse/FLINK-35240 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Zhongqiang Gong >Assignee: Zhongqiang Gong >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: image-2024-04-26-00-23-29-975.png, > image-2024-04-26-17-16-07-925.png, image-2024-04-26-17-16-20-647.png, > image-2024-04-26-17-16-30-293.png, screenshot-1.png > > > *Reproduce:* > * According to user email: > https://lists.apache.org/thread/9j5z8hv4vjkd54dkzqy1ryyvm0l5rxhc > * !image-2024-04-26-00-23-29-975.png! > *Analysis:* > * `org.apache.flink.formats.csv.CsvBulkWriter#addElement` will flush per > record. > *Solution:* > * I think maybe we can disable `FLUSH_AFTER_WRITE_VALUE` to avoid flush when > a record added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.
[ https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan closed FLINK-35823. --- Resolution: Fixed It's duplicated with https://issues.apache.org/jira/browse/FLINK-35814, so close this. > Introduce parameters to control the upper limit of rescale to avoid unlimited > expansion due to server-side bottlenecks or data skew. > > > Key: FLINK-35823 > URL: https://issues.apache.org/jira/browse/FLINK-35823 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Major > Fix For: 2.0.0 > > > 1. If a Flink application writes data to other external storage systems, such > as HDFS, Kafka, etc., when the external server becomes the bottleneck of the > entire task, such as the throughput of HDFS decreases, the writing IO time > will increase, and the corresponding Flink The metric busy will also > increase. At this time, the autoscaler will determine that the parallelism > needs to be increased to increase the write rate. However, in the above case, > due to the bottleneck of the external server, this will not work. This will > cause the next determination cycle to continue to increase the parallelism > until parallelism = max-parallelism. > 2. If some tasks have data skew, it will also cause the same problem. > > Therefore, we should introduce a new parameter judgment. If the degree of > parallelism continues to increase, the throughput will basically remain the > same. There is no need to expand anymore. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.
[ https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865280#comment-17865280 ] yuanfenghu commented on FLINK-35823: I have discussed this issue with [~fanrui] . I wonder if other people in the community have any suggestions on this? > Introduce parameters to control the upper limit of rescale to avoid unlimited > expansion due to server-side bottlenecks or data skew. > > > Key: FLINK-35823 > URL: https://issues.apache.org/jira/browse/FLINK-35823 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Major > Fix For: 2.0.0 > > > 1. If a Flink application writes data to other external storage systems, such > as HDFS, Kafka, etc., when the external server becomes the bottleneck of the > entire task, such as the throughput of HDFS decreases, the writing IO time > will increase, and the corresponding Flink The metric busy will also > increase. At this time, the autoscaler will determine that the parallelism > needs to be increased to increase the write rate. However, in the above case, > due to the bottleneck of the external server, this will not work. This will > cause the next determination cycle to continue to increase the parallelism > until parallelism = max-parallelism. > 2. If some tasks have data skew, it will also cause the same problem. > > Therefore, we should introduce a new parameter judgment. If the degree of > parallelism continues to increase, the throughput will basically remain the > same. There is no need to expand anymore. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.
[ https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuanfenghu updated FLINK-35823: --- Summary: Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew. (was: Introduce parameters to control the upper limit of rescale to avoid unlimited 扩容 due to server-side bottlenecks or data skew.) > Introduce parameters to control the upper limit of rescale to avoid unlimited > expansion due to server-side bottlenecks or data skew. > > > Key: FLINK-35823 > URL: https://issues.apache.org/jira/browse/FLINK-35823 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Major > Fix For: 2.0.0 > > > 1. If a Flink application writes data to other external storage systems, such > as HDFS, Kafka, etc., when the external server becomes the bottleneck of the > entire task, such as the throughput of HDFS decreases, the writing IO time > will increase, and the corresponding Flink The metric busy will also > increase. At this time, the autoscaler will determine that the parallelism > needs to be increased to increase the write rate. However, in the above case, > due to the bottleneck of the external server, this will not work. This will > cause the next determination cycle to continue to increase the parallelism > until parallelism = max-parallelism. > 2. If some tasks have data skew, it will also cause the same problem. > > Therefore, we should introduce a new parameter judgment. If the degree of > parallelism continues to increase, the throughput will basically remain the > same. There is no need to expand anymore. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited 扩容 due to server-side bottlenecks or data skew.
[ https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuanfenghu updated FLINK-35823: --- Summary: Introduce parameters to control the upper limit of rescale to avoid unlimited 扩容 due to server-side bottlenecks or data skew. (was: Introduce parameters to control the upper limit of rescale to avoid unlimited shrinkage due to server-side bottlenecks or data skew.) > Introduce parameters to control the upper limit of rescale to avoid unlimited > 扩容 due to server-side bottlenecks or data skew. > - > > Key: FLINK-35823 > URL: https://issues.apache.org/jira/browse/FLINK-35823 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Major > Fix For: 2.0.0 > > > 1. If a Flink application writes data to other external storage systems, such > as HDFS, Kafka, etc., when the external server becomes the bottleneck of the > entire task, such as the throughput of HDFS decreases, the writing IO time > will increase, and the corresponding Flink The metric busy will also > increase. At this time, the autoscaler will determine that the parallelism > needs to be increased to increase the write rate. However, in the above case, > due to the bottleneck of the external server, this will not work. This will > cause the next determination cycle to continue to increase the parallelism > until parallelism = max-parallelism. > 2. If some tasks have data skew, it will also cause the same problem. > > Therefore, we should introduce a new parameter judgment. If the degree of > parallelism continues to increase, the throughput will basically remain the > same. There is no need to expand anymore. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited shrinkage due to server-side bottlenecks or data skew.
yuanfenghu created FLINK-35823: -- Summary: Introduce parameters to control the upper limit of rescale to avoid unlimited shrinkage due to server-side bottlenecks or data skew. Key: FLINK-35823 URL: https://issues.apache.org/jira/browse/FLINK-35823 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: yuanfenghu Fix For: 2.0.0 1. If a Flink application writes data to other external storage systems, such as HDFS, Kafka, etc., when the external server becomes the bottleneck of the entire task, such as the throughput of HDFS decreases, the writing IO time will increase, and the corresponding Flink The metric busy will also increase. At this time, the autoscaler will determine that the parallelism needs to be increased to increase the write rate. However, in the above case, due to the bottleneck of the external server, this will not work. This will cause the next determination cycle to continue to increase the parallelism until parallelism = max-parallelism. 2. If some tasks have data skew, it will also cause the same problem. Therefore, we should introduce a new parameter judgment. If the degree of parallelism continues to increase, the throughput will basically remain the same. There is no need to expand anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35240) Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record
[ https://issues.apache.org/jira/browse/FLINK-35240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhongqiang Gong reassigned FLINK-35240: --- Assignee: Zhongqiang Gong > Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record > - > > Key: FLINK-35240 > URL: https://issues.apache.org/jira/browse/FLINK-35240 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Zhongqiang Gong >Assignee: Zhongqiang Gong >Priority: Minor > Labels: pull-request-available > Attachments: image-2024-04-26-00-23-29-975.png, > image-2024-04-26-17-16-07-925.png, image-2024-04-26-17-16-20-647.png, > image-2024-04-26-17-16-30-293.png, screenshot-1.png > > > *Reproduce:* > * According to user email: > https://lists.apache.org/thread/9j5z8hv4vjkd54dkzqy1ryyvm0l5rxhc > * !image-2024-04-26-00-23-29-975.png! > *Analysis:* > * `org.apache.flink.formats.csv.CsvBulkWriter#addElement` will flush per > record. > *Solution:* > * I think maybe we can disable `FLUSH_AFTER_WRITE_VALUE` to avoid flush when > a record added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core [flink]
GOODBOY008 commented on PR #24939: URL: https://github.com/apache/flink/pull/24939#issuecomment-2224309638 > #24945 reverts the InitOutputPathTest to junit4. > > We can research why InitOutputPathTest fails after migrating juint5, and fix it later in this PR as well. @1996fanrui The class failed because of the jdk verison, and I fixed in this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35363] FLIP-449: Reorganization of flink-connector-jdbc [flink-connector-jdbc]
RocMarshal commented on code in PR #123: URL: https://github.com/apache/flink-connector-jdbc/pull/123#discussion_r1675007647 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java: ## @@ -18,252 +18,17 @@ package org.apache.flink.connector.jdbc.converter; -import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; -import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimestampType; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Date; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** Base class for all converters that convert between JDBC object and Flink internal object. */ -public abstract class AbstractJdbcRowConverter implements JdbcRowConverter { - -protected final RowType rowType; -protected final JdbcDeserializationConverter[] toInternalConverters; -protected final JdbcSerializationConverter[] toExternalConverters; -protected final LogicalType[] fieldTypes; - -public abstract String converterName(); +/** + * Base class for all converters that convert between JDBC object and Flink internal object. + * + * @deprecated use AbstractDialectConverter Review Comment: ```suggestion * @deprecated Use {@link AbstractDialectConverter}. ``` ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java: ## @@ -18,551 +18,22 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.AbstractCatalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogPartition; -import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.UniqueConstraint; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; -import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; -import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; -import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; -import org.apache.flink.table.catalog.exceptions.TablePartitionedException; -import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; -import org.apache.flink.table.catalog.stats.CatalogTableStatistics; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.factories.Factory; -import org.apache.flink.table.types.DataType; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; -import org.apache.flink.util.TemporaryClassLoaderContext; - -import org.apache.commons.compress.utils.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.Co
[jira] [Commented] (FLINK-21436) Speed up the restore of UnionListState
[ https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865277#comment-17865277 ] Rui Fan commented on FLINK-21436: - > When does Union State plan to deprecated? I didn't notice any discusstion about this topic after [https://lists.apache.org/thread/sxbmyoko01h568qtb1wk3ot2s2rb72nz] > Since some source and sink operator still rely on UnionState, so it may still > makes sense to speed up UnionState restore time currently ? Do you mean Legacy source and sink or new source and sink? I saw most of new Sources and new Sinks doesn't use Union List State. Would you mind listing some new sources and sinks that using Union List State? > Speed up the restore of UnionListState > > > Key: FLINK-21436 > URL: https://issues.apache.org/jira/browse/FLINK-21436 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.13.0 >Reporter: Rui Fan >Priority: Minor > Labels: auto-deprioritized-major > Attachments: JM 启动火焰图.svg, akka timeout Exception.png > > > h1. 1. Problem introduction and cause analysis > Problem description: The duration of UnionListState restore under large > concurrency is more than 2 minutes. > h2. the reason: > 2000 subtasks write 2000 files during checkpoint, and each subtask needs to > read 2000 files during restore. > 2000*2000 = 4 million, so 4 million small files need to be read to hdfs > during restore. HDFS has become a bottleneck, causing restore to be > particularly time-consuming. > h1. 2. Optimize ideas > Under normal circumstances, the UnionListState state is relatively small. > Typical usage scenario: Kafka offset information. > When restoring, JM can directly read all 2000 small files, merge > UnionListState into a byte array and send it to all TMs to avoid frequent > access to hdfs by TMs. > h1. 3. Benefits after optimization > Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s. > After optimization: 2000 concurrent, Kafka offset restore takes less than 1s. > h1. 4. Risk points > Too big UnionListState leads to too much pressure on JM. > Solution 1: > Add configuration and decide whether to enable this feature. The default is > false, which means the old plan is used. When the user is set to true, JM > will merge. > Solution 2: > The above configuration is not required, which is equivalent to enabling > merge by default. > JM detects the size of the state before merge, and if it is less than the > threshold, the state is considered to be relatively small, and the state is > sent to all TMs through ByteStreamStateHandle. > If the threshold is exceeded, the state is considered to be greater. At this > time, write an hdfs file, and send FileStateHandle to all TMs, and TM can > read this file. > > Note: Most of the scenarios where Flink uses UnionListState are Kafka offset > (small state). In theory, most jobs are risk-free. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error
[ https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu resolved FLINK-35754. --- Resolution: Fixed > SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal > Server Error > - > > Key: FLINK-35754 > URL: https://issues.apache.org/jira/browse/FLINK-35754 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2024-07-05-11-15-52-731.png > > > {code:java} > Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353) > Jul 03 03:14:31 at > org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350) > Jul 03 03:14:31 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Jul 03 03:14:31 at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > Jul 03 03:14:31 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jul 03 03:14:31 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is > not successful: Internal Server Error > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590) > Jul 03 03:14:31 ... 7 more > Jul 03 03:14:31 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]
GOODBOY008 merged PR #24730: URL: https://github.com/apache/flink/pull/24730 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error
[ https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865275#comment-17865275 ] dalongliu commented on FLINK-35754: --- Hi [~pnowojski], I think they are two different problems. > SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal > Server Error > - > > Key: FLINK-35754 > URL: https://issues.apache.org/jira/browse/FLINK-35754 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2024-07-05-11-15-52-731.png > > > {code:java} > Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353) > Jul 03 03:14:31 at > org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350) > Jul 03 03:14:31 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Jul 03 03:14:31 at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > Jul 03 03:14:31 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jul 03 03:14:31 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is > not successful: Internal Server Error > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590) > Jul 03 03:14:31 ... 7 more > Jul 03 03:14:31 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error
[ https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864853#comment-17864853 ] dalongliu edited comment on FLINK-35754 at 7/12/24 1:56 AM: Merged in master: d04c70fae033bb04ed7a3bb69832be55b4425700 Merged in release-1.20: 8244941ed18c9c063449759927b58b629110d89e was (Author: lsy): Merged in master: d04c70fae033bb04ed7a3bb69832be55b4425700 > SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal > Server Error > - > > Key: FLINK-35754 > URL: https://issues.apache.org/jira/browse/FLINK-35754 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2024-07-05-11-15-52-731.png > > > {code:java} > Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353) > Jul 03 03:14:31 at > org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350) > Jul 03 03:14:31 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Jul 03 03:14:31 at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > Jul 03 03:14:31 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jul 03 03:14:31 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is > not successful: Internal Server Error > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590) > Jul 03 03:14:31 ... 7 more > Jul 03 03:14:31 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.20][FLINK-35754][e2e] Fix SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error [flink]
lsyldliu merged PR #25073: URL: https://github.com/apache/flink/pull/25073 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ouyangwulin closed FLINK-35354. --- Resolution: Fixed master:302a69122538fdb76b98e73ebb3c83ee733a0c02 > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode in tikv, we > need to do network mapping, such as `spark.tispark.host_mapping` in > [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I > think we need support `host_mapping` in our Flink tikv cdc connector. > > Add param: > tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31520] [table-planner] Move SqlXXXModel conversion logic to SqlNodeConverter [flink]
nateab commented on PR #22218: URL: https://github.com/apache/flink/pull/22218#issuecomment-2224189971 Hi @xuzhiwen1255 there is a typo in the title of the PR, it should be `module` not `model` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: I will validate that once i will be able to run this test locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: This need to be validated once i will able to run this test locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832262 ## flink-connector-aws/flink-connector-sqs/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension: ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.util.TestLoggerExtension Review Comment: This got copied from other sink package, doesn't look like it is required, deleting it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
snuyanzin commented on PR #24967: URL: https://github.com/apache/flink/pull/24967#issuecomment-2224133227 One more diff between MySQL and Flink behavior, however this I'm not sure need to fix since at least from my point of view MySQL's behavior is questionable MySQL ```sql SELECT JSON_UNQUOTE('"\\u0022\\u005c\\u005c\\u0075\\u0030\\u0030\\u0061\\u0061\\u0022"'); SELECT JSON_UNQUOTE(JSON_UNQUOTE('"\\u0022\\u005c\\u005c\\u0075\\u0030\\u0030\\u0061\\u0061\\u0022"')); ``` returns ``` "\\u00aa" \u00aa ``` Flink ```sql SELECT JSON_UNQUOTE('"\u0022\u005c\u005c\u0075\u0030\u0030\u0061\u0061\u0022"'); SELECT JSON_UNQUOTE(JSON_UNQUOTE('"\u0022\u005c\u005c\u0075\u0030\u0030\u0061\u0061\u0022"')); ``` returns ``` "\\u00aa" ª ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
snuyanzin commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674812992 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +public @Nullable Object eval(Object input) { +if (input == null) { +return null; +} +BinaryStringData bs = (BinaryStringData) input; +String inputStr = bs.toString(); +try { +if (isValidJsonVal(inputStr)) { +return new BinaryStringData(unescapeValidJson(inputStr)); +} +} catch (FlinkRuntimeException | IllegalArgumentException t) { Review Comment: Ooops, seems I was wrong about `FlinkRuntimeException` if we want to rethrow it further then it would make sense however of we use it only within this class then `IllegalArgumentException` is enough sorry about that I should have checked the whole cycle of it here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674703677 ## flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties: ## @@ -0,0 +1,25 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +rootLogger.level = OFF Review Comment: Why do we want to do that? I can see other sinks also have their independent log4j configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder
[ https://issues.apache.org/jira/browse/FLINK-35808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35808: --- Labels: pull-request-available (was: ) > Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in > KafkaSourceBuilder > --- > > Key: FLINK-35808 > URL: https://issues.apache.org/jira/browse/FLINK-35808 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.2.0 >Reporter: Kevin Lam >Assignee: Kevin Lam >Priority: Minor > Labels: pull-request-available > > This issue is a follow-up to [this mailing list > discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. > I'd like to propose letting the > ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in > KafkaSourceBuilder, as shown in this DRAFT PR: > > [https://github.com/apache/flink-connector-kafka/pull/108] > > From the PR description: > {quote}This allows users to easily implement the [{{claim check}} large > message > pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] > without bringing any concerns into the Flink codebase otherwise, by > specifying a {{value.deserializer}} that handles it, but otherwise passes > through the bytes. > Note: overriding value.serializer is already supported on the Producer side: > |[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]| > > Other Reading: > [https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/] > [https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0] > {quote} > > What do folks think? If it seems reasonable I can follow the steps in the > [contribution guide|https://flink.apache.org/how-to-contribute/overview/]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35808] Let `ConsumerConfig.(KEY|VALUE)_DESERIALIZER_CLASS_CONFIG` be overridable by user in `KafkaSourceBuilder` [flink-connector-kafka]
klam-shop commented on PR #108: URL: https://github.com/apache/flink-connector-kafka/pull/108#issuecomment-2223875792 Thanks for your review @fapaul ! I did another pass on trying to incorporate your comments, and also updated some of the documentation that mentions the deserializer configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][table] Apply uppercase for URL_ENCODE/URL_DECODE remove logging from function class [flink]
snuyanzin merged PR #25076: URL: https://github.com/apache/flink/pull/25076 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35819) Update to AWS SDKv2 2.26.19
[ https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35819: --- Labels: pull-request-available (was: ) > Update to AWS SDKv2 2.26.19 > --- > > Key: FLINK-35819 > URL: https://issues.apache.org/jira/browse/FLINK-35819 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.4.0 >Reporter: Burak Ozakinci >Assignee: Burak Ozakinci >Priority: Blocker > Labels: pull-request-available > Fix For: aws-connector-4.4.0 > > > As part of the work in FLINK-31922, AWS SDK needs to be updated to use > RetryStrategy constructs instead of deprecated RetryPolicy classes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35819][Connectors/AWS] Update to AWS SDKv2 2.26.19 [flink-connector-aws]
karubian opened a new pull request, #149: URL: https://github.com/apache/flink-connector-aws/pull/149 ## Purpose of the change Update to AWS SDKv2 2.26.19 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Significant changes - [X] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35819) Update to AWS SDKv2 2.26.19
[ https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Burak Ozakinci updated FLINK-35819: --- Summary: Update to AWS SDKv2 2.26.19 (was: Update to AWS SDK 2.26.19) > Update to AWS SDKv2 2.26.19 > --- > > Key: FLINK-35819 > URL: https://issues.apache.org/jira/browse/FLINK-35819 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.4.0 >Reporter: Burak Ozakinci >Assignee: Burak Ozakinci >Priority: Blocker > Fix For: aws-connector-4.4.0 > > > As part of the work in FLINK-31922, AWS SDK needs to be updated to use > RetryStrategy constructs instead of deprecated RetryPolicy classes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35819) Update to AWS SDK 2.26.19
[ https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Burak Ozakinci updated FLINK-35819: --- Summary: Update to AWS SDK 2.26.19 (was: Upgrade to AWS SDK 2.26.19) > Update to AWS SDK 2.26.19 > - > > Key: FLINK-35819 > URL: https://issues.apache.org/jira/browse/FLINK-35819 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.4.0 >Reporter: Burak Ozakinci >Assignee: Burak Ozakinci >Priority: Blocker > Fix For: aws-connector-4.4.0 > > > As part of the work in FLINK-31922, AWS SDK needs to be updated to use > RetryStrategy constructs instead of deprecated RetryPolicy classes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35822) FLIP-465: Introduce DESCRIBE FUNCTION
Natea Eshetu Beshada created FLINK-35822: Summary: FLIP-465: Introduce DESCRIBE FUNCTION Key: FLINK-35822 URL: https://issues.apache.org/jira/browse/FLINK-35822 Project: Flink Issue Type: Improvement Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime Reporter: Natea Eshetu Beshada Assignee: Natea Eshetu Beshada https://cwiki.apache.org/confluence/display/FLINK/FLIP-465%3A+Introduce+DESCRIBE+FUNCTION -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-25920) Allow receiving updates of CommittableSummary
[ https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865154#comment-17865154 ] Alexis Sarda-Espinosa edited comment on FLINK-25920 at 7/11/24 4:53 PM: [^trace_again.zip] This time it happened very close to the start of the job after a restart with savepoint, I see the first checkpoint succeeded and the second one failed. Thinking aloud, could checkpoint mode EXACTLY_ONCE mitigate this? was (Author: asardaes): [^trace_again.zip] This time it happened very close to the start of the job after a restart with savepoint, I see the first checkpoint succeeded and the second one failed. > Allow receiving updates of CommittableSummary > - > > Key: FLINK-25920 > URL: https://issues.apache.org/jira/browse/FLINK-25920 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Common >Affects Versions: 1.15.0, 1.16.0 >Reporter: Fabian Paul >Assignee: Arvid Heise >Priority: Major > Attachments: trace.zip, trace_again.zip > > > In the case of unaligned checkpoints, it might happen that the checkpoint > barrier overtakes the records and an empty committable summary is emitted > that needs to be correct at a later point when the records arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it
[ https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865155#comment-17865155 ] Zakelly Lan commented on FLINK-35821: - [~pnowojski] I think this is the same problem as https://issues.apache.org/jira/browse/FLINK-35803 . Would you please rebase and try again? > ResumeCheckpointManuallyITCase failed with File X does not exist or the user > running Flink C has insufficient permissions to access it > -- > > Key: FLINK-35821 > URL: https://issues.apache.org/jira/browse/FLINK-35821 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 2.0.0, 1.20.0 >Reporter: Piotr Nowojski >Priority: Major > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba > primary failure: > {noformat} > Caused by: java.io.FileNotFoundException: File > file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863 > does not exist or the user running Flink ('agent01_azpcontainer') has > insufficient permissions to access it. > {noformat} > Full stack trace > {noformat} > 2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: > 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- > in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase > 2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] > org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode > = CLAIM] -- Time elapsed: 2.722 s <<< ERROR! > 2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > 2024-07-11T13:49:46.4142766Z Jul 11 13:49:46 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) > 2024-07-11T13:49:46.4144185Z Jul 11 13:49:46 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) > 2024-07-11T13:49:46.4145249Z Jul 11 13:49:46 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) > 2024-07-11T13:49:46.4146510Z Jul 11 13:49:46 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281) > 2024-07-11T13:49:46.4147599Z Jul 11 13:49:46 at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272) > 2024-07-11T13:49:46.4148975Z Jul 11 13:49:46 at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265) > 2024-07-11T13:49:46.4150467Z Jul 11 13:49:46 at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800) > 2024-07-11T13:49:46.4151977Z Jul 11 13:49:46 at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777) > 2024-07-11T13:49:46.4153308Z Jul 11 13:49:46 at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > 2024-07-11T13:49:46.4154713Z Jul 11 13:49:46 at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515) > 2024-07-11T13:49:46.4155416Z Jul 11 13:49:46 at > sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) > 2024-07-11T13:49:46.4156342Z Jul 11 13:49:46 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2024-07-11T13:49:46.4157291Z Jul 11 13:49:46 at > java.lang.reflect.Method.invoke(Method.java:498) > 2024-07-11T13:49:46.4158065Z Jul 11 13:49:46 at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) > 2024-07-11T13:49:46.4159387Z Jul 11 13:49:46 at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > 2024-07-11T13:49:46.4160469Z Jul 11 13:49:46 at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) > 2024-07-11T13:49:46.4161819Z Jul 11 13:49:46 at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) > 2024-07-11T13:49:46.4163253Z Jul 11 13:49:46 at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) > 2024-07-11T13:49:46.41
[jira] [Commented] (FLINK-25920) Allow receiving updates of CommittableSummary
[ https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865154#comment-17865154 ] Alexis Sarda-Espinosa commented on FLINK-25920: --- [^trace_again.zip] This time it happened very close to the start of the job after a restart with savepoint, I see the first checkpoint succeeded and the second one failed. > Allow receiving updates of CommittableSummary > - > > Key: FLINK-25920 > URL: https://issues.apache.org/jira/browse/FLINK-25920 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Common >Affects Versions: 1.15.0, 1.16.0 >Reporter: Fabian Paul >Assignee: Arvid Heise >Priority: Major > Attachments: trace.zip, trace_again.zip > > > In the case of unaligned checkpoints, it might happen that the checkpoint > barrier overtakes the records and an empty committable summary is emitted > that needs to be correct at a later point when the records arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-25920) Allow receiving updates of CommittableSummary
[ https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexis Sarda-Espinosa updated FLINK-25920: -- Attachment: trace_again.zip > Allow receiving updates of CommittableSummary > - > > Key: FLINK-25920 > URL: https://issues.apache.org/jira/browse/FLINK-25920 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Common >Affects Versions: 1.15.0, 1.16.0 >Reporter: Fabian Paul >Assignee: Arvid Heise >Priority: Major > Attachments: trace.zip, trace_again.zip > > > In the case of unaligned checkpoints, it might happen that the checkpoint > barrier overtakes the records and an empty committable summary is emitted > that needs to be correct at a later point when the records arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to Flink 1.19 [flink-connector-rabbitmq]
vahmed-hamdy commented on PR #29: URL: https://github.com/apache/flink-connector-rabbitmq/pull/29#issuecomment-2223412688 @paulh86 Blocked on review from the community, will reach out on dev slack channel for a reviewer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35819) Upgrade to AWS SDK 2.26.19
[ https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh reassigned FLINK-35819: --- Assignee: Burak Ozakinci > Upgrade to AWS SDK 2.26.19 > -- > > Key: FLINK-35819 > URL: https://issues.apache.org/jira/browse/FLINK-35819 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.4.0 >Reporter: Burak Ozakinci >Assignee: Burak Ozakinci >Priority: Blocker > Fix For: aws-connector-4.4.0 > > > As part of the work in FLINK-31922, AWS SDK needs to be updated to use > RetryStrategy constructs instead of deprecated RetryPolicy classes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32324) Implement watermark alignment on KDS and DDB source
[ https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-32324: Summary: Implement watermark alignment on KDS and DDB source (was: Implement watermark alignment on KDS source) > Implement watermark alignment on KDS and DDB source > --- > > Key: FLINK-32324 > URL: https://issues.apache.org/jira/browse/FLINK-32324 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > > Implement watermark alignment interfaces suggested by this FLIP in the KDS > Source. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32324) Implement watermark alignment on KDS and DDB source
[ https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-32324. - Resolution: Fixed > Implement watermark alignment on KDS and DDB source > --- > > Key: FLINK-32324 > URL: https://issues.apache.org/jira/browse/FLINK-32324 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > > Implement watermark alignment interfaces suggested by this FLIP in the KDS > and DDB Source. > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32324) Implement watermark alignment on KDS and DDB source
[ https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-32324: Description: Implement watermark alignment interfaces suggested by this FLIP in the KDS and DDB Source. [https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources] (was: Implement watermark alignment interfaces suggested by this FLIP in the KDS Source. https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources) > Implement watermark alignment on KDS and DDB source > --- > > Key: FLINK-32324 > URL: https://issues.apache.org/jira/browse/FLINK-32324 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > > Implement watermark alignment interfaces suggested by this FLIP in the KDS > and DDB Source. > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]
jnh5y commented on PR #25064: URL: https://github.com/apache/flink/pull/25064#issuecomment-2223289135 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32324) Implement watermark alignment on KDS source
[ https://issues.apache.org/jira/browse/FLINK-32324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865140#comment-17865140 ] Hong Liang Teoh commented on FLINK-32324: - merged commit [{{f81473e}}|https://github.com/apache/flink-connector-aws/commit/f81473e20d7df91b668faf2ca4d2d00576007862] into apache:main > Implement watermark alignment on KDS source > --- > > Key: FLINK-32324 > URL: https://issues.apache.org/jira/browse/FLINK-32324 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > > Implement watermark alignment interfaces suggested by this FLIP in the KDS > Source. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]
hlteoh37 merged PR #148: URL: https://github.com/apache/flink-connector-aws/pull/148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
anupamaggarwal commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674222067 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +public @Nullable Object eval(Object input) { +if (input == null) { +return null; +} +BinaryStringData bs = (BinaryStringData) input; +String inputStr = bs.toString(); +try { +if (isValidJsonVal(inputStr)) { +return new BinaryStringData(unescapeValidJson(inputStr)); +} +} catch (Throwable t) { +// ignore +} Review Comment: thanks @fhueske, @snuyanzin makes sense, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674213420 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,105 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.4-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests Review Comment: This test doesn't seem to work. Please make sure we have all the dependencies. ``` Caused by: java.lang.NoClassDefFoundError: software/amazon/awssdk/services/s3/S3Client at org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client(AWSServicesTestUtils.java:65) at org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.list(LocalstackContainer.java:79) at org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51) at org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.lambda$waitUntilReady$0(LocalstackContainer.java:72) at org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.services.s3.S3Client ``` I ran using the below command ``` mvn clean verify -Prun-end-to-end-tests -DdistDir=/Users/liangtl/workplace/flink_os/flink-1.19.0/lib/flink-dist-1.19.0.jar -pl flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests -am ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35819) Upgrade to AWS SDK 2.26.19
[ https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865126#comment-17865126 ] Burak Ozakinci edited comment on FLINK-35819 at 7/11/24 3:20 PM: - [~Hong Teoh] Please assign this to me, thank you was (Author: JIRAUSER303336): [~Hong Teoh] Please assign this to me, thank you [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13533827] > Upgrade to AWS SDK 2.26.19 > -- > > Key: FLINK-35819 > URL: https://issues.apache.org/jira/browse/FLINK-35819 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.4.0 >Reporter: Burak Ozakinci >Priority: Blocker > Fix For: aws-connector-4.4.0 > > > As part of the work in FLINK-31922, AWS SDK needs to be updated to use > RetryStrategy constructs instead of deprecated RetryPolicy classes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]
jnh5y commented on code in PR #25064: URL: https://github.com/apache/flink/pull/25064#discussion_r1674211997 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java: ## @@ -19,16 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms; -import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase; import org.apache.flink.table.test.program.TableTestProgram; import java.util.Arrays; import java.util.List; /** Restore tests for {@link BatchExecCalc}. */ -public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase { +public class CalcCompiledBatchTest extends CompiledBatchTestBase { Review Comment: Ok, I think I got the names updated as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35819) Upgrade to AWS SDK 2.26.19
[ https://issues.apache.org/jira/browse/FLINK-35819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865126#comment-17865126 ] Burak Ozakinci commented on FLINK-35819: [~Hong Teoh] Please assign this to me, thank you [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13533827] > Upgrade to AWS SDK 2.26.19 > -- > > Key: FLINK-35819 > URL: https://issues.apache.org/jira/browse/FLINK-35819 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.4.0 >Reporter: Burak Ozakinci >Priority: Blocker > Fix For: aws-connector-4.4.0 > > > As part of the work in FLINK-31922, AWS SDK needs to be updated to use > RetryStrategy constructs instead of deprecated RetryPolicy classes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674205703 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,105 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.4-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + + +org.apache.flink +flink-connector-sqs +${project.version} +test +test-jar + + + + +com.google.guava +guava +test + + + +com.fasterxml.jackson.core +jackson-databind +test + + + +com.fasterxml.jackson.datatype +jackson-datatype-jsr310 +test + + + + + + + +org.apache.maven.plugins +maven-dependency-plugin + + +copy +pre-integration-test + +copy + + + + + + Review Comment: This should be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865121#comment-17865121 ] Trystan commented on FLINK-35285: - Any suggestions on this? We are consistently finding a 50%-60% scale down max factor to be too aggressive, leading to flapping. But we can't set a safer 20-30% factor because of this since it never scales at all. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]
z3d1k commented on code in PR #148: URL: https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674196316 ## flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java: ## @@ -228,6 +231,97 @@ void testWakeUpIsNoOp() { assertThatNoException().isThrownBy(splitReader::wakeUp); } +@Test +void testPauseOrResumeSplits() throws Exception { +testStreamProxy.addShards(TEST_SHARD_ID); +KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID); + +List expectedRecords = +Stream.of(getTestRecord("data-1"), getTestRecord("data-2")) +.collect(Collectors.toList()); +testStreamProxy.addRecords( +TestUtil.STREAM_ARN, +TEST_SHARD_ID, +Collections.singletonList(expectedRecords.get(0))); +testStreamProxy.addRecords( +TestUtil.STREAM_ARN, +TEST_SHARD_ID, +Collections.singletonList(expectedRecords.get(1))); +splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit))); + +// read data from split +RecordsWithSplitIds records = splitReader.fetch(); + assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0)); + +// pause split +splitReader.pauseOrResumeSplits( +Collections.singletonList(testSplit), Collections.emptyList()); +records = splitReader.fetch(); +// returns incomplete split with no records +assertThat(records.finishedSplits()).isEmpty(); +assertThat(records.nextSplit()).isNull(); +assertThat(records.nextRecordFromSplit()).isNull(); + +// resume split +splitReader.pauseOrResumeSplits( +Collections.emptyList(), Collections.singletonList(testSplit)); +records = splitReader.fetch(); + assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(1)); +} + +@Test +void testPauseOrResumeSplitsOnlyPauseReadsFromSpecifiedSplits() throws Exception { +KinesisShardSplit testSplit1 = getTestSplit(generateShardId(1)); +KinesisShardSplit testSplit2 = getTestSplit(generateShardId(2)); + +shardMetricGroupMap.put( +testSplit1.splitId(), +new KinesisShardMetrics(testSplit1, metricListener.getMetricGroup())); +shardMetricGroupMap.put( +testSplit2.splitId(), +new KinesisShardMetrics(testSplit2, metricListener.getMetricGroup())); + +testStreamProxy.addShards(testSplit1.splitId(), testSplit2.splitId()); + +List recordsFromSplit1 = +Arrays.asList(getTestRecord("split-1-data-1"), getTestRecord("split-1-data-2")); +List recordsFromSplit2 = +Arrays.asList( +getTestRecord("split-2-data-1"), +getTestRecord("split-2-data-2"), +getTestRecord("split-2-data-3")); + +recordsFromSplit1.forEach( +record -> +testStreamProxy.addRecords( +STREAM_ARN, +testSplit1.getShardId(), +Collections.singletonList(record))); +recordsFromSplit2.forEach( +record -> +testStreamProxy.addRecords( +STREAM_ARN, +testSplit2.getShardId(), +Collections.singletonList(record))); + +splitReader.handleSplitsChanges( +new SplitsAddition<>(Arrays.asList(testSplit1, testSplit2))); + +// pause split 1 +splitReader.pauseOrResumeSplits( +Collections.singletonList(testSplit1), Collections.emptyList()); + +// read data from splits +List fetchedRecords = new ArrayList<>(); +for (int i = 0; i < 10; i++) { +RecordsWithSplitIds records = splitReader.fetch(); +fetchedRecords.addAll(readAllRecords(records)); +} + +// verify that only records from split 2 were fetched by reader + assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new Record[0])); Review Comment: Updated test case to use 3 splits, pause 2 and then resume 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674182875 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: Do we need this for SQS? I'm aware we need it for KDS, but just checking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error
[ https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865115#comment-17865115 ] Piotr Nowojski edited comment on FLINK-35754 at 7/11/24 3:08 PM: - [~hackergin], can you elaborate what was the problem here? Is this the same problem as: https://issues.apache.org/jira/browse/FLINK-35821 ? was (Author: pnowojski): [~hackergin], can you elaborate what's the problem? Is this the same problem as: https://issues.apache.org/jira/browse/FLINK-35821 ? > SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal > Server Error > - > > Key: FLINK-35754 > URL: https://issues.apache.org/jira/browse/FLINK-35754 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2024-07-05-11-15-52-731.png > > > {code:java} > Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353) > Jul 03 03:14:31 at > org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350) > Jul 03 03:14:31 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Jul 03 03:14:31 at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > Jul 03 03:14:31 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jul 03 03:14:31 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is > not successful: Internal Server Error > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590) > Jul 03 03:14:31 ... 7 more > Jul 03 03:14:31 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error
[ https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865115#comment-17865115 ] Piotr Nowojski commented on FLINK-35754: [~hackergin], can you elaborate what's the problem? Is this the same problem as: https://issues.apache.org/jira/browse/FLINK-35821 ? > SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal > Server Error > - > > Key: FLINK-35754 > URL: https://issues.apache.org/jira/browse/FLINK-35754 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > Labels: pull-request-available > Attachments: image-2024-07-05-11-15-52-731.png > > > {code:java} > Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353) > Jul 03 03:14:31 at > org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210) > Jul 03 03:14:31 at > org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350) > Jul 03 03:14:31 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Jul 03 03:14:31 at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > Jul 03 03:14:31 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jul 03 03:14:31 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is > not successful: Internal Server Error > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570) > Jul 03 03:14:31 at > org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590) > Jul 03 03:14:31 ... 7 more > Jul 03 03:14:31 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][table] Apply uppercase for URL_ENCODE/URL_DECODE remove logging from function class [flink]
snuyanzin commented on code in PR #25076: URL: https://github.com/apache/flink/pull/25076#discussion_r1674178015 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -433,7 +433,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition URL_DECODE = BuiltInFunctionDefinition.newBuilder() -.name("url_decode") +.name("URL_DECODE") .kind(SCALAR) .inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING))) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING().nullable( Review Comment: @lincoln-lil thanks for the feedback now I see what you mean Yep, it makes, thanks for clarification I updated the PR to address this, could you please approve/reject? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674177861 ## flink-connector-aws/flink-connector-sqs/pom.xml: ## @@ -0,0 +1,130 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + +4.0.0 + + +org.apache.flink +flink-connector-aws-parent +4.4-SNAPSHOT + + +flink-connector-sqs +Flink : Connectors : AWS : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +provided + + + +org.apache.flink +flink-connector-aws-base +${project.version} + + + +software.amazon.awssdk +sqs + + + +software.amazon.awssdk +netty-nio-client + + + + +org.apache.flink +flink-test-utils +${flink.version} +test + + +org.apache.flink +flink-connector-test-utils +${flink.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test-jar +test + + + +org.apache.flink +flink-connector-base +${flink.version} +test-jar +test + + + +org.testcontainers +testcontainers +test + + + +com.fasterxml.jackson.core +jackson-core + + + +com.fasterxml.jackson.datatype +jackson-datatype-jsr310 + + + Review Comment: nit `ArchUnit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it
[ https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-35821: --- Description: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba {noformat} 2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase 2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode = CLAIM] -- Time elapsed: 2.722 s <<< ERROR! 2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy 2024-07-11T13:49:46.4142766Z Jul 11 13:49:46at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) 2024-07-11T13:49:46.4144185Z Jul 11 13:49:46at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) 2024-07-11T13:49:46.4145249Z Jul 11 13:49:46at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) 2024-07-11T13:49:46.4146510Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281) 2024-07-11T13:49:46.4147599Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272) 2024-07-11T13:49:46.4148975Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265) 2024-07-11T13:49:46.4150467Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800) 2024-07-11T13:49:46.4151977Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777) 2024-07-11T13:49:46.4153308Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) 2024-07-11T13:49:46.4154713Z Jul 11 13:49:46at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515) 2024-07-11T13:49:46.4155416Z Jul 11 13:49:46at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) 2024-07-11T13:49:46.4156342Z Jul 11 13:49:46at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2024-07-11T13:49:46.4157291Z Jul 11 13:49:46at java.lang.reflect.Method.invoke(Method.java:498) 2024-07-11T13:49:46.4158065Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) 2024-07-11T13:49:46.4159387Z Jul 11 13:49:46at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) 2024-07-11T13:49:46.4160469Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) 2024-07-11T13:49:46.4161819Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) 2024-07-11T13:49:46.4163253Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) 2024-07-11T13:49:46.4164717Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) 2024-07-11T13:49:46.4165948Z Jul 11 13:49:46at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 2024-07-11T13:49:46.4167080Z Jul 11 13:49:46at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 2024-07-11T13:49:46.4168228Z Jul 11 13:49:46at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 2024-07-11T13:49:46.4169380Z Jul 11 13:49:46at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 2024-07-11T13:49:46.4170327Z Jul 11 13:49:46at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 2024-07-11T13:49:46.4171192Z Jul 11 13:49:46at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 2024-07-11T13:49:46.4171814Z Jul 11 13:49:46at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 2024-07-11T13:49:46.4172433Z Jul 11 13:49:46at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 2024-07-11T13:49:46.4173029Z Jul 11 13:49:46at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 2024-07-11T13:49:46.4173622Z Jul 11 13:49:46at org.apache.pekko.act
[jira] [Updated] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it
[ https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-35821: --- Description: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba primary failure: {noformat} Caused by: java.io.FileNotFoundException: File file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863 does not exist or the user running Flink ('agent01_azpcontainer') has insufficient permissions to access it. {noformat} Full stack trace {noformat} 2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase 2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode = CLAIM] -- Time elapsed: 2.722 s <<< ERROR! 2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy 2024-07-11T13:49:46.4142766Z Jul 11 13:49:46at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) 2024-07-11T13:49:46.4144185Z Jul 11 13:49:46at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) 2024-07-11T13:49:46.4145249Z Jul 11 13:49:46at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) 2024-07-11T13:49:46.4146510Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281) 2024-07-11T13:49:46.4147599Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272) 2024-07-11T13:49:46.4148975Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265) 2024-07-11T13:49:46.4150467Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800) 2024-07-11T13:49:46.4151977Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777) 2024-07-11T13:49:46.4153308Z Jul 11 13:49:46at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) 2024-07-11T13:49:46.4154713Z Jul 11 13:49:46at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515) 2024-07-11T13:49:46.4155416Z Jul 11 13:49:46at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) 2024-07-11T13:49:46.4156342Z Jul 11 13:49:46at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2024-07-11T13:49:46.4157291Z Jul 11 13:49:46at java.lang.reflect.Method.invoke(Method.java:498) 2024-07-11T13:49:46.4158065Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) 2024-07-11T13:49:46.4159387Z Jul 11 13:49:46at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) 2024-07-11T13:49:46.4160469Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) 2024-07-11T13:49:46.4161819Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) 2024-07-11T13:49:46.4163253Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) 2024-07-11T13:49:46.4164717Z Jul 11 13:49:46at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) 2024-07-11T13:49:46.4165948Z Jul 11 13:49:46at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 2024-07-11T13:49:46.4167080Z Jul 11 13:49:46at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 2024-07-11T13:49:46.4168228Z Jul 11 13:49:46at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 2024-07-11T13:49:46.4169380Z Jul 11 13:49:46at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 2024-07-11T13:49:46.4170327Z Jul 11 13:49:46at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 2024-07-11T13:49:46.4171192Z Jul 11 13:49:46at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scal
[jira] [Created] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it
Piotr Nowojski created FLINK-35821: -- Summary: ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it Key: FLINK-35821 URL: https://issues.apache.org/jira/browse/FLINK-35821 Project: Flink Issue Type: Bug Components: Test Infrastructure Affects Versions: 2.0.0, 1.20.0 Reporter: Piotr Nowojski https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba {noformat} Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase Jul 11 13:49:46 13:49:46.412 [ERROR] org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode = CLAIM] -- Time elapsed: 2.722 s <<< ERROR! Jul 11 13:49:46 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Jul 11 13:49:46 at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) Jul 11 13:49:46 at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) Jul 11 13:49:46 at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) Jul 11 13:49:46 at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281) Jul 11 13:49:46 at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272) Jul 11 13:49:46 at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265) Jul 11 13:49:46 at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800) Jul 11 13:49:46 at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777) Jul 11 13:49:46 at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) Jul 11 13:49:46 at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515) Jul 11 13:49:46 at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) Jul 11 13:49:46 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Jul 11 13:49:46 at java.lang.reflect.Method.invoke(Method.java:498) Jul 11 13:49:46 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) Jul 11 13:49:46 at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) Jul 11 13:49:46 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) Jul 11 13:49:46 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) Jul 11 13:49:46 at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) Jul 11 13:49:46 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) Jul 11 13:49:46 at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) Jul 11 13:49:46 at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) Jul 11 13:49:46 at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) Jul 11 13:49:46 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) Jul 11 13:49:46 at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) Jul 11 13:49:46 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) Jul 11 13:49:46 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) Jul 11 13:49:46 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) Jul 11 13:49:46 at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) Jul 11 13:49:46 at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) Jul 11 13:49:46 at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) Jul 11 13:49:46 at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) Jul 11 13:49:46 at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) Jul 11 13:49:46 at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) {noformat} -- This message was sent by Atlassian Jira (v8.20.1
Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]
twalthr commented on code in PR #25064: URL: https://github.com/apache/flink/pull/25064#discussion_r1674171526 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java: ## @@ -19,16 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms; -import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase; import org.apache.flink.table.test.program.TableTestProgram; import java.util.Arrays; import java.util.List; /** Restore tests for {@link BatchExecCalc}. */ -public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase { +public class CalcCompiledBatchTest extends CompiledBatchTestBase { Review Comment: Similar architecture than the streaming ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]
twalthr commented on code in PR #25064: URL: https://github.com/apache/flink/pull/25064#discussion_r1674170591 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java: ## @@ -19,16 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms; -import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase; import org.apache.flink.table.test.program.TableTestProgram; import java.util.Arrays; import java.util.List; /** Restore tests for {@link BatchExecCalc}. */ -public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase { +public class CalcCompiledBatchTest extends CompiledBatchTestBase { Review Comment: Sorry for the confusion. I thought we agreed on `BatchRestoreTestBase` and thus `CalcBatchRestoreTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]
hlteoh37 commented on code in PR #148: URL: https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674154510 ## flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java: ## @@ -228,6 +231,97 @@ void testWakeUpIsNoOp() { assertThatNoException().isThrownBy(splitReader::wakeUp); } +@Test +void testPauseOrResumeSplits() throws Exception { +testStreamProxy.addShards(TEST_SHARD_ID); +KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID); + +List expectedRecords = +Stream.of(getTestRecord("data-1"), getTestRecord("data-2")) +.collect(Collectors.toList()); +testStreamProxy.addRecords( +TestUtil.STREAM_ARN, +TEST_SHARD_ID, +Collections.singletonList(expectedRecords.get(0))); +testStreamProxy.addRecords( +TestUtil.STREAM_ARN, +TEST_SHARD_ID, +Collections.singletonList(expectedRecords.get(1))); +splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit))); + +// read data from split +RecordsWithSplitIds records = splitReader.fetch(); + assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0)); + +// pause split +splitReader.pauseOrResumeSplits( +Collections.singletonList(testSplit), Collections.emptyList()); +records = splitReader.fetch(); +// returns incomplete split with no records +assertThat(records.finishedSplits()).isEmpty(); +assertThat(records.nextSplit()).isNull(); +assertThat(records.nextRecordFromSplit()).isNull(); + +// resume split +splitReader.pauseOrResumeSplits( +Collections.emptyList(), Collections.singletonList(testSplit)); +records = splitReader.fetch(); + assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(1)); +} + +@Test +void testPauseOrResumeSplitsOnlyPauseReadsFromSpecifiedSplits() throws Exception { +KinesisShardSplit testSplit1 = getTestSplit(generateShardId(1)); +KinesisShardSplit testSplit2 = getTestSplit(generateShardId(2)); + +shardMetricGroupMap.put( +testSplit1.splitId(), +new KinesisShardMetrics(testSplit1, metricListener.getMetricGroup())); +shardMetricGroupMap.put( +testSplit2.splitId(), +new KinesisShardMetrics(testSplit2, metricListener.getMetricGroup())); + +testStreamProxy.addShards(testSplit1.splitId(), testSplit2.splitId()); + +List recordsFromSplit1 = +Arrays.asList(getTestRecord("split-1-data-1"), getTestRecord("split-1-data-2")); +List recordsFromSplit2 = +Arrays.asList( +getTestRecord("split-2-data-1"), +getTestRecord("split-2-data-2"), +getTestRecord("split-2-data-3")); + +recordsFromSplit1.forEach( +record -> +testStreamProxy.addRecords( +STREAM_ARN, +testSplit1.getShardId(), +Collections.singletonList(record))); +recordsFromSplit2.forEach( +record -> +testStreamProxy.addRecords( +STREAM_ARN, +testSplit2.getShardId(), +Collections.singletonList(record))); + +splitReader.handleSplitsChanges( +new SplitsAddition<>(Arrays.asList(testSplit1, testSplit2))); + +// pause split 1 +splitReader.pauseOrResumeSplits( +Collections.singletonList(testSplit1), Collections.emptyList()); + +// read data from splits +List fetchedRecords = new ArrayList<>(); +for (int i = 0; i < 10; i++) { +RecordsWithSplitIds records = splitReader.fetch(); +fetchedRecords.addAll(readAllRecords(records)); +} + +// verify that only records from split 2 were fetched by reader + assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new Record[0])); Review Comment: Should we test resuming from the paused split? Or 3x split, 2x pause and only resume 1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][table] Apply uppercase for URL_ENCODE/URL_DECODE remove logging from function class [flink]
lincoln-lil commented on code in PR #25076: URL: https://github.com/apache/flink/pull/25076#discussion_r1674153859 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -433,7 +433,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition URL_DECODE = BuiltInFunctionDefinition.newBuilder() -.name("url_decode") +.name("URL_DECODE") .kind(SCALAR) .inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING))) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING().nullable( Review Comment: @snuyanzin As I saw this pr and realized I missed your message on the previous one(#24773). For the output type strategy, I think it should be 'outputTypeStrategy(explicit(DataTypes.STRING().nullable()))' because the return value may be null even if the input is non-null (e.g., decoding failure), otherwise the downstream operator may trust the non-null property and encounter a runtime exception (e.g., NPE) if skip the null check for field accessing. Does this make sense to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]
jnh5y commented on code in PR #25064: URL: https://github.com/apache/flink/pull/25064#discussion_r1674149685 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java: ## @@ -19,16 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms; -import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase; import org.apache.flink.table.test.program.TableTestProgram; import java.util.Arrays; import java.util.List; /** Restore tests for {@link BatchExecCalc}. */ -public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase { +public class CalcCompiledBatchTest extends CompiledBatchTestBase { Review Comment: Are you looking for `CalcCompiledBatchTest` to change to `CalcBatchRestoreTest`? Or are you suggesting that `CompiledBatchTestBase ` should change to `BatchRestoreTestBase `? Lemme know which of those makes sense and I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32324][Connectors/AWS] Implement support for watermark alignment in FLIP-27 sources for KDS and DDB streams [flink-connector-aws]
hlteoh37 commented on code in PR #148: URL: https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674144962 ## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java: ## @@ -127,6 +137,14 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { } } +@Override +public void pauseOrResumeSplits( +Collection splitsToPause, +Collection splitsToResume) { +splitsToPause.forEach(split -> pausedSplitIds.add(split.splitId())); +splitsToResume.forEach(split -> pausedSplitIds.remove(split.splitId())); Review Comment: Do we need to check and log if we resume a split that is not paused / pause a split that doesn't exist? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
lincoln-lil commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1674142139 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -431,6 +431,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .runtimeClass("org.apache.flink.table.runtime.functions.scalar.SplitFunction") .build(); +public static final BuiltInFunctionDefinition URL_DECODE = +BuiltInFunctionDefinition.newBuilder() +.name("url_decode") +.kind(SCALAR) + .inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING( Review Comment: @snuyanzin Sorry I missed the ping here! IIUC the `AbstractDataType`#`nullable` says it's the default behavior means a datatype which doesn't explictly declares its nullabilty then the default behavior is nullable. For the output type strategy, I think I should point it out more clearly: it should be 'outputTypeStrategy(explicit(DataTypes.STRING().nullable()))' because the return value may be null even if the input is non-null (e.g., decoding failure), WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35799] Add CompiledPlan annotations to BatchExecCalc [flink]
twalthr commented on code in PR #25064: URL: https://github.com/apache/flink/pull/25064#discussion_r1674135779 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcCompiledBatchTest.java: ## @@ -19,16 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms; -import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchCompiledPlanTestBase; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.CompiledBatchTestBase; import org.apache.flink.table.test.program.TableTestProgram; import java.util.Arrays; import java.util.List; /** Restore tests for {@link BatchExecCalc}. */ -public class CalcBatchCompiledPlanTest extends BatchCompiledPlanTestBase { +public class CalcCompiledBatchTest extends CompiledBatchTestBase { Review Comment: I would still call it `BatchRestoreTest` it will make discussions easier and it also performs kind of a restore but from plan only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
snuyanzin commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674127275 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +private static boolean isValidJsonVal(String jsonInString) { +// See also BuiltInMethods.scala, IS_JSON_VALUE +return SqlJsonUtils.isJsonValue(jsonInString); +} + +private String unescapeStr(String inputStr) { +StringBuilder result = new StringBuilder(); +int i = 0; +while (i < inputStr.length()) { +if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) { +i++; // move to the next char +char ch = inputStr.charAt(i++); + +switch (ch) { +case '"': +result.append(ch); +break; +case '\\': +result.append(ch); +break; +case '/': +result.append(ch); +break; +case 'b': +result.append('\b'); +break; +case 'f': +result.append('\f'); +break; +case 'n': +result.append('\n'); +break; +case 'r': +result.append('\r'); +break; +case 't': +result.append('\t'); +break; +case 'u': +result.append(fromUnicodeLiteral(inputStr, i)); +i = i + 4; +break; +default: +throw new RuntimeException("Illegal escape sequence: \\" + ch); Review Comment: Great you spotted this IMHO: since it is already runtime shouldn't it be `FlinkRuntimeException` here? https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java which is already used within builtin functions at https://github.com/apache/flink/tree/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
snuyanzin commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674129926 ## docs/data/sql_functions.yml: ## @@ -377,6 +377,12 @@ string: - sql: SUBSTR(string, integer1[, integer2]) table: STRING.substr(INTEGER1[, INTEGER2]) description: Returns a substring of string starting from position integer1 with length integer2 (to the end by default). + - sql: JSON_QUOTE(string) Review Comment: Thanks for creation of jira I would still vote for having filled chinese doc with english description which then could be translated within the jira issue you've created -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
snuyanzin commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674127275 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +private static boolean isValidJsonVal(String jsonInString) { +// See also BuiltInMethods.scala, IS_JSON_VALUE +return SqlJsonUtils.isJsonValue(jsonInString); +} + +private String unescapeStr(String inputStr) { +StringBuilder result = new StringBuilder(); +int i = 0; +while (i < inputStr.length()) { +if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) { +i++; // move to the next char +char ch = inputStr.charAt(i++); + +switch (ch) { +case '"': +result.append(ch); +break; +case '\\': +result.append(ch); +break; +case '/': +result.append(ch); +break; +case 'b': +result.append('\b'); +break; +case 'f': +result.append('\f'); +break; +case 'n': +result.append('\n'); +break; +case 'r': +result.append('\r'); +break; +case 't': +result.append('\t'); +break; +case 'u': +result.append(fromUnicodeLiteral(inputStr, i)); +i = i + 4; +break; +default: +throw new RuntimeException("Illegal escape sequence: \\" + ch); Review Comment: IMHO: since it is already runtime shouldn't it be `FlinkRuntimeException` here? https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java which is already used within builtin functions at https://github.com/apache/flink/tree/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35820] Converting Duration to String fails for big values [flink]
twalthr commented on code in PR #25077: URL: https://github.com/apache/flink/pull/25077#discussion_r1674118355 ## flink-core/src/main/java/org/apache/flink/util/TimeUtils.java: ## @@ -39,6 +41,10 @@ public class TimeUtils { private static final Map LABEL_TO_UNIT_MAP = Collections.unmodifiableMap(initMap()); +private static final BigInteger NANOS_PER_SECOND = BigInteger.valueOf(1000_000_000L); Review Comment: ```suggestion private static final BigInteger NANOS_PER_SECOND = BigInteger.valueOf(1_000_000_000L); ``` ## flink-core/src/main/java/org/apache/flink/util/TimeUtils.java: ## @@ -79,30 +85,45 @@ public static Duration parseDuration(String text) { throw new NumberFormatException("text does not start with a number"); } -final long value; +final BigInteger value; try { -value = Long.parseLong(number); // this throws a NumberFormatException on overflow +value = new BigInteger(number); // this throws a NumberFormatException } catch (NumberFormatException e) { throw new IllegalArgumentException( -"The value '" -+ number -+ "' cannot be re represented as 64bit number (numeric overflow)."); +"The value '" + number + "' cannot be re represented as an integer number.", e); Review Comment: ```suggestion "The value '" + number + "' cannot be represented as an integer number.", e); ``` ## flink-core/src/main/java/org/apache/flink/util/TimeUtils.java: ## @@ -79,30 +85,45 @@ public static Duration parseDuration(String text) { throw new NumberFormatException("text does not start with a number"); } -final long value; +final BigInteger value; try { -value = Long.parseLong(number); // this throws a NumberFormatException on overflow +value = new BigInteger(number); // this throws a NumberFormatException } catch (NumberFormatException e) { throw new IllegalArgumentException( -"The value '" -+ number -+ "' cannot be re represented as 64bit number (numeric overflow)."); +"The value '" + number + "' cannot be re represented as an integer number.", e); } +final ChronoUnit unit; if (unitLabel.isEmpty()) { -return Duration.of(value, ChronoUnit.MILLIS); -} - -ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel); -if (unit != null) { -return Duration.of(value, unit); +unit = ChronoUnit.MILLIS; } else { +unit = LABEL_TO_UNIT_MAP.get(unitLabel); +} +if (unit == null) { throw new IllegalArgumentException( "Time interval unit label '" + unitLabel + "' does not match any of the recognized units: " + TimeUnit.getAllUnits()); } + +try { +return convertBigIntToDuration(value, unit); +} catch (ArithmeticException e) { +throw new IllegalArgumentException( +"The value '" ++ number ++ "' cannot be re represented as java.time.Duration (numeric overflow).", Review Comment: ```suggestion + "' cannot be represented as java.time.Duration (numeric overflow).", ``` ## flink-core/src/main/java/org/apache/flink/util/TimeUtils.java: ## @@ -136,17 +157,35 @@ public static String getStringInMillis(final Duration duration) { * NOTE: It supports only durations that fit into long. */ public static String formatWithHighestUnit(Duration duration) { -long nanos = duration.toNanos(); +BigInteger nanos = toNanos(duration); TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos); return String.format( -"%d %s", -nanos / highestIntegerUnit.unit.getDuration().toNanos(), +"%s %s", +nanos.divide(highestIntegerUnit.getUnitAsNanos()), highestIntegerUnit.getLabels().get(0)); } -private static TimeUnit getHighestIntegerUnit(long nanos) { -if (nanos == 0) { +/** + * Converted from {@link Duration#toNanos()}, but produces {@link BigDecimal} and does not throw Review Comment: ```suggestion * Converted from {@link Duration#toNanos()}, but produces {@link BigInteger} and does not throw ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscr
Re: [PR] [FLINK-35820] Converting Duration to String fails for big values [flink]
flinkbot commented on PR #25077: URL: https://github.com/apache/flink/pull/25077#issuecomment-2223086708 ## CI report: * ff92364b08368175671a0813e8873f46cb602c2c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
fhueske commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674108789 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +public @Nullable Object eval(Object input) { +if (input == null) { +return null; +} +BinaryStringData bs = (BinaryStringData) input; +String inputStr = bs.toString(); +try { +if (isValidJsonVal(inputStr)) { +return new BinaryStringData(unescapeValidJson(inputStr)); +} +} catch (Throwable t) { +// ignore +} Review Comment: I don't think we should swallow any kind of exception or error. Let's catch the errors that we expect (IllegalArgumentException, etc.) and handle those by returning the original input. Unexpected exceptions should be forwarded because they might be related to other issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
fhueske commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674107753 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +private static boolean isValidJsonVal(String jsonInString) { +// See also BuiltInMethods.scala, IS_JSON_VALUE +return SqlJsonUtils.isJsonValue(jsonInString); +} + +private String unescapeStr(String inputStr) { +StringBuilder result = new StringBuilder(); +int i = 0; +while (i < inputStr.length()) { +if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) { +i++; // move to the next char +char ch = inputStr.charAt(i++); + +switch (ch) { +case '"': +result.append(ch); +break; +case '\\': +result.append(ch); +break; +case '/': +result.append(ch); +break; +case 'b': +result.append('\b'); +break; +case 'f': +result.append('\f'); +break; +case 'n': +result.append('\n'); +break; +case 'r': +result.append('\r'); +break; +case 't': +result.append('\t'); +break; +case 'u': +result.append(fromUnicodeLiteral(inputStr, i)); +i = i + 4; +break; +default: +throw new RuntimeException("Illegal escape sequence: \\" + ch); +} +} else { +result.append(inputStr.charAt(i)); +i++; +} +} +return result.toString(); +} + +private String unescapeValidJson(String inputStr) { +// check for a quoted json string val and unescape +if (inputStr.charAt(0) == '"' && inputStr.charAt(inputStr.length() - 1) == '"') { +// remove quotes, string len is atleast 2 here +return unescapeStr(inputStr.substring(1, inputStr.length() - 1)); +} else { +// string representing Json - array, object or unquoted scalar val, return as-is +return inputStr; +} +} + +private static String fromUnicodeLiteral(String input, int curPos) { + +StringBuilder number = new StringBuilder(); +// isValidJsonVal will already check for unicode literal validity +for (char ch : input.substring(curPos, curPos + 4).toCharArray()) { +number.append(Character.toLowerCase(ch)); +} +int code = Integer.parseInt(number.toString(), 16); +return String.valueOf((char) code); +} + +public @Nullable Object eval(Object input) { +if (input == null) { +return null; +} +BinaryStringData bs = (BinaryStringData) input; +String inputStr = bs.toString(); +try { +if (isValidJsonVal(inputStr)) { +return new BinaryStri
[jira] [Updated] (FLINK-35820) Converting Duration to String fails for big values
[ https://issues.apache.org/jira/browse/FLINK-35820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35820: --- Labels: pull-request-available (was: ) > Converting Duration to String fails for big values > -- > > Key: FLINK-35820 > URL: https://issues.apache.org/jira/browse/FLINK-35820 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.19.1 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > We support {{ConfigOption}} with {{Duration}} type. The {{Configuration}} > class can read/write {{Duration}} from/to {{String}}. Unfortunately the > conversion fails for values that do not fit into {{long}} after conversion to > nanos. > E.g. > {code} > final ConfigOption option = ConfigOptions > .key("duration") > .durationType() > .noDefaultValue(); > final Configuration config = new Configuration(); > config.set(option, Duration.ofMillis(Long.MAX_VALUE)); > config.toFileWritableMap(); > {code} > fails with {{java.lang.ArithmeticException: long overflow}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35820] Converting Duration to String fails for big values [flink]
dawidwys opened a new pull request, #25077: URL: https://github.com/apache/flink/pull/25077 ## What is the purpose of the change Supports converting all values of Duration to/from String. ## Verifying this change Added tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]
fhueske commented on code in PR #24967: URL: https://github.com/apache/flink/pull/24967#discussion_r1674098744 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.SqlJsonUtils; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */ +@Internal +public class JsonUnquoteFunction extends BuiltInScalarFunction { + +public JsonUnquoteFunction(SpecializedContext context) { +super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context); +} + +private static boolean isValidJsonVal(String jsonInString) { +// See also BuiltInMethods.scala, IS_JSON_VALUE +return SqlJsonUtils.isJsonValue(jsonInString); +} + +private String unescapeStr(String inputStr) { +StringBuilder result = new StringBuilder(); +int i = 0; +while (i < inputStr.length()) { +if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) { +i++; // move to the next char +char ch = inputStr.charAt(i++); + +switch (ch) { +case '"': +result.append(ch); +break; +case '\\': +result.append(ch); +break; +case '/': +result.append(ch); +break; +case 'b': +result.append('\b'); +break; +case 'f': +result.append('\f'); +break; +case 'n': +result.append('\n'); +break; +case 'r': +result.append('\r'); +break; +case 't': +result.append('\t'); +break; +case 'u': +result.append(fromUnicodeLiteral(inputStr, i)); +i = i + 4; +break; +default: +throw new RuntimeException("Illegal escape sequence: \\" + ch); +} +} else { +result.append(inputStr.charAt(i)); +i++; +} +} +return result.toString(); +} + +private String unescapeValidJson(String inputStr) { +// check for a quoted json string val and unescape +if (inputStr.charAt(0) == '"' && inputStr.charAt(inputStr.length() - 1) == '"') { +// remove quotes, string len is atleast 2 here +return unescapeStr(inputStr.substring(1, inputStr.length() - 1)); +} else { +// string representing Json - array, object or unquoted scalar val, return as-is +return inputStr; +} +} + +private static String fromUnicodeLiteral(String input, int curPos) { + +StringBuilder number = new StringBuilder(); +// isValidJsonVal will already check for unicode literal validity +for (char ch : input.substring(curPos, curPos + 4).toCharArray()) { +number.append(Character.toLowerCase(ch)); +} +int code = Integer.parseInt(number.toString(), 16); +return String.valueOf((char) code); +} + +public @Nullable Object eval(Object input) { +if (input == null) { +return null; +} +BinaryStringData bs = (BinaryStringData) input; +String inputStr = bs.toString(); +try { +if (isValidJsonVal(inputStr)) { +return new BinaryStri