[jira] [Commented] (FLINK-30844) TaskTest.testInterruptibleSharedLockInInvokeAndCancel causes a JVM shutdown with exit code 239
[ https://issues.apache.org/jira/browse/FLINK-30844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707155#comment-17707155 ] Matthias Pohl commented on FLINK-30844: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47748&view=logs&j=a549b384-c55a-52c0-c451-00e0477ab6db&t=eef5922c-08d9-5ba3-7299-8393476594e7&l=8807 > TaskTest.testInterruptibleSharedLockInInvokeAndCancel causes a JVM shutdown > with exit code 239 > -- > > Key: FLINK-30844 > URL: https://issues.apache.org/jira/browse/FLINK-30844 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Task >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Anton Kalashnikov >Priority: Major > Labels: test-stability > > We're experiencing a fatal crash in {{TaskTest}}: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45440&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=8334 > {code} > [...] > Jan 31 01:03:12 [ERROR] Process Exit Code: 239 > Jan 31 01:03:12 [ERROR] Crashed tests: > Jan 31 01:03:12 [ERROR] org.apache.flink.runtime.taskmanager.TaskTest > Jan 31 01:03:12 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:748) > Jan 31 01:03:12 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.access$700(ForkStarter.java:121) > Jan 31 01:03:12 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter$1.call(ForkStarter.java:393) > Jan 31 01:03:12 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter$1.call(ForkStarter.java:370) > Jan 31 01:03:12 [ERROR] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Jan 31 01:03:12 [ERROR] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > Jan 31 01:03:12 [ERROR] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > Jan 31 01:03:12 [ERROR] at java.lang.Thread.run(Thread.java:748) > Jan 31 01:03:12 [ERROR] -> [Help 1] > Jan 31 01:03:12 [ERROR] > Jan 31 01:03:12 [ERROR] To see the full stack trace of the errors, re-run > Maven with the -e switch. > Jan 31 01:03:12 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > Jan 31 01:03:12 [ERROR] > Jan 31 01:03:12 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > Jan 31 01:03:12 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException > Jan 31 01:03:12 [ERROR] > Jan 31 01:03:12 [ERROR] After correcting the problems, you can resume the > build with the command > Jan 31 01:03:12 [ERROR] mvn -rf :flink-runtime > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31678) NonHAQueryableStateFsBackendITCase.testAggregatingState: Query did no succeed
[ https://issues.apache.org/jira/browse/FLINK-31678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31678: -- Priority: Critical (was: Major) > NonHAQueryableStateFsBackendITCase.testAggregatingState: Query did no succeed > - > > Key: FLINK-31678 > URL: https://issues.apache.org/jira/browse/FLINK-31678 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State, Tests >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47748&view=logs&j=d89de3df-4600-5585-dadc-9bbc9a5e661c&t=be5a4b15-4b23-56b1-7582-795f58a645a2&l=40484 > {code} > ava.lang.AssertionError: Did not succeed query > Mar 31 01:24:32 at org.junit.Assert.fail(Assert.java:89) > Mar 31 01:24:32 at org.junit.Assert.assertTrue(Assert.java:42) > Mar 31 01:24:32 at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.testAggregatingState(AbstractQueryableStateTestBase.java:1094) > [...] > Mar 31 01:24:32 Suppressed: java.util.concurrent.TimeoutException > Mar 31 01:24:32 at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1769) > Mar 31 01:24:32 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > Mar 31 01:24:32 at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$AutoCancellableJob.close(AbstractQueryableStateTestBase.java:1351) > Mar 31 01:24:32 at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.testAggregatingState(AbstractQueryableStateTestBase.java:1096) > Mar 31 01:24:32 ... 52 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
[ https://issues.apache.org/jira/browse/FLINK-31676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31676: --- Labels: pull-request-available (was: ) > Pulsar connector should not rely on Flink Shaded > > > Key: FLINK-31676 > URL: https://issues.apache.org/jira/browse/FLINK-31676 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > > The Pulsar connector currently depends on Flink Shaded for Guava. However, > externalized connectors must not rely on flink-shaded. This will just not be > possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] MartijnVisser opened a new pull request, #37: [FLINK-31676][Connector/Pulsar] Replace Shaded Guava from Flink with Shaded Guava from Pulsar
MartijnVisser opened a new pull request, #37: URL: https://github.com/apache/flink-connector-pulsar/pull/37 ## Purpose of the change * Replace dependency on Flink-Shaded for connectors, since externalized connectors shouldn't rely on Flink-Shaded ## Brief change log - Removed all occurrences of `org.apache.flink.shaded.guava30.com` with `org.apache.pulsar.shade.com` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [X] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31678) NonHAQueryableStateFsBackendITCase.testAggregatingState: Query did no succeed
Matthias Pohl created FLINK-31678: - Summary: NonHAQueryableStateFsBackendITCase.testAggregatingState: Query did no succeed Key: FLINK-31678 URL: https://issues.apache.org/jira/browse/FLINK-31678 Project: Flink Issue Type: Bug Components: Runtime / Queryable State, Tests Affects Versions: 1.18.0 Reporter: Matthias Pohl https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47748&view=logs&j=d89de3df-4600-5585-dadc-9bbc9a5e661c&t=be5a4b15-4b23-56b1-7582-795f58a645a2&l=40484 {code} ava.lang.AssertionError: Did not succeed query Mar 31 01:24:32 at org.junit.Assert.fail(Assert.java:89) Mar 31 01:24:32 at org.junit.Assert.assertTrue(Assert.java:42) Mar 31 01:24:32 at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.testAggregatingState(AbstractQueryableStateTestBase.java:1094) [...] Mar 31 01:24:32 Suppressed: java.util.concurrent.TimeoutException Mar 31 01:24:32 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1769) Mar 31 01:24:32 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) Mar 31 01:24:32 at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$AutoCancellableJob.close(AbstractQueryableStateTestBase.java:1351) Mar 31 01:24:32 at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.testAggregatingState(AbstractQueryableStateTestBase.java:1096) Mar 31 01:24:32 ... 52 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
[ https://issues.apache.org/jira/browse/FLINK-31676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-31676: -- Assignee: Martijn Visser > Pulsar connector should not rely on Flink Shaded > > > Key: FLINK-31676 > URL: https://issues.apache.org/jira/browse/FLINK-31676 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > > The Pulsar connector currently depends on Flink Shaded for Guava. However, > externalized connectors must not rely on flink-shaded. This will just not be > possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30252) Publish flink-shaded pom
[ https://issues.apache.org/jira/browse/FLINK-30252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30252: --- Fix Version/s: shaded-18.0 (was: shaded-17.0) > Publish flink-shaded pom > > > Key: FLINK-30252 > URL: https://issues.apache.org/jira/browse/FLINK-30252 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: shaded-18.0 > > > Publish a bom for flink-shaded, such that downtream projects just select the > flink-shaded version, with all other dependency versions being selected > automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26974) Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707152#comment-17707152 ] Matthias Pohl commented on FLINK-26974: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47749&view=logs&j=e92ecf6d-e207-5a42-7ff7-528ff0c5b259&t=40fc352e-9b4c-5fd8-363f-628f24b01ec2&l=29150 > Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure > - > > Key: FLINK-26974 > URL: https://issues.apache.org/jira/browse/FLINK-26974 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yun Gao >Assignee: Huang Xingbo >Priority: Critical > Labels: auto-deprioritized-major, test-stability > > {code:java} > Mar 31 10:49:17 === FAILURES > === > Mar 31 10:49:17 __ > EmbeddedThreadDependencyTests.test_add_python_file __ > Mar 31 10:49:17 > Mar 31 10:49:17 self = > testMethod=test_add_python_file> > Mar 31 10:49:17 > Mar 31 10:49:17 def test_add_python_file(self): > Mar 31 10:49:17 python_file_dir = os.path.join(self.tempdir, > "python_file_dir_" + str(uuid.uuid4())) > Mar 31 10:49:17 os.mkdir(python_file_dir) > Mar 31 10:49:17 python_file_path = os.path.join(python_file_dir, > "test_dependency_manage_lib.py") > Mar 31 10:49:17 with open(python_file_path, 'w') as f: > Mar 31 10:49:17 f.write("def add_two(a):\nraise > Exception('This function should not be called!')") > Mar 31 10:49:17 self.t_env.add_python_file(python_file_path) > Mar 31 10:49:17 > Mar 31 10:49:17 python_file_dir_with_higher_priority = os.path.join( > Mar 31 10:49:17 self.tempdir, "python_file_dir_" + > str(uuid.uuid4())) > Mar 31 10:49:17 os.mkdir(python_file_dir_with_higher_priority) > Mar 31 10:49:17 python_file_path_higher_priority = > os.path.join(python_file_dir_with_higher_priority, > Mar 31 10:49:17 > "test_dependency_manage_lib.py") > Mar 31 10:49:17 with open(python_file_path_higher_priority, 'w') as f: > Mar 31 10:49:17 f.write("def add_two(a):\nreturn a + 2") > Mar 31 10:49:17 > self.t_env.add_python_file(python_file_path_higher_priority) > Mar 31 10:49:17 > Mar 31 10:49:17 def plus_two(i): > Mar 31 10:49:17 from test_dependency_manage_lib import add_two > Mar 31 10:49:17 return add_two(i) > Mar 31 10:49:17 > Mar 31 10:49:17 self.t_env.create_temporary_system_function( > Mar 31 10:49:17 "add_two", udf(plus_two, DataTypes.BIGINT(), > DataTypes.BIGINT())) > Mar 31 10:49:17 table_sink = source_sink_utils.TestAppendSink( > Mar 31 10:49:17 ['a', 'b'], [DataTypes.BIGINT(), > DataTypes.BIGINT()]) > Mar 31 10:49:17 self.t_env.register_table_sink("Results", table_sink) > Mar 31 10:49:17 t = self.t_env.from_elements([(1, 2), (2, 5), (3, > 1)], ['a', 'b']) > Mar 31 10:49:17 > t.select(expr.call("add_two", t.a), > t.a).execute_insert("Results").wait() > Mar 31 10:49:17 > Mar 31 10:49:17 pyflink/table/tests/test_dependency.py:63: > Mar 31 10:49:17 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ > Mar 31 10:49:17 pyflink/table/table_result.py:76: in wait > Mar 31 10:49:17 get_method(self._j_table_result, "await")() > Mar 31 10:49:17 > .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in > __call__ > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34001&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=6bb545dd-772d-5d8c-f258-f5085fba3295&l=27239 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30772) Update Flink Shaded dependencies to the latest versions
[ https://issues.apache.org/jira/browse/FLINK-30772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-30772. -- Resolution: Fixed > Update Flink Shaded dependencies to the latest versions > --- > > Key: FLINK-30772 > URL: https://issues.apache.org/jira/browse/FLINK-30772 > Project: Flink > Issue Type: Technical Debt > Components: BuildSystem / Shaded >Affects Versions: 1.18.0 >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Fix For: shaded-17.0 > > > It would be smart to update the Flink Shaded components to their latest > versions before we release Flink 1.18 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30772) Update Flink Shaded dependencies to the latest versions
[ https://issues.apache.org/jira/browse/FLINK-30772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707151#comment-17707151 ] Martijn Visser commented on FLINK-30772: Fixed in master: [FLINK-30772][Build/Shaded] Update Netty to 4.1.90.Final d00429a35fdd5b781bcf26d99f2c7cf8ff80f8f7 [FLINK-30772][Build/Shaded] Update Jackson to 2.14.2 b09266f1befca067b8090177fbc1b0f46cf21d0d [FLINK-30772][Build/Shaded] Update Netty TCNative to 2.0.59.Final b5c59db3b6c2bdefb0bcb4a7495880d5ef9ab3b8 [FLINK-30772][Build/Shaded] Update Swagger to 2.2.8 9b1265fdcf0a80aca134a4e0ed381d897fe805bc [FLINK-30772][Build/Shaded] Update Zookeeper 3.6 to 3.6.4 14142c380007836d68247471ddb73e053d15895c [FLINK-30772][Build/Shaded] Update Snakeyaml to 2.0 1da20d4df35af41b7042fbf8e5e23bc570897603 [FLINK-30772][Build/Shaded] Update Guava to 31.1-jre 2822733762a49b535279d6c2bdb9e083fbc2480c [FLINK-30772][Build/Shaded] Update Zookeeper 3.8 to 3.8.1 d68885c61387f19b4407e67afd21edccd932e623 [FLINK-30772][Build/Shaded] Update Curator to 5.4.0 c567aa7d8f183e7264c1a08c675f511e8f8b075a [FLINK-30772][Build/Shaded] Update ASM to 9.5 b7ac6ad55c110ebba48417522d4f2defca10ac95 [FLINK-30772][Build/Shaded] Rename Guava module from flink-shaded-guava-30 to flink-shaded-guava-31 c37343c80e8f82ff9473fc978597a40314b9b864 > Update Flink Shaded dependencies to the latest versions > --- > > Key: FLINK-30772 > URL: https://issues.apache.org/jira/browse/FLINK-30772 > Project: Flink > Issue Type: Technical Debt > Components: BuildSystem / Shaded >Affects Versions: 1.18.0 >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Fix For: shaded-17.0 > > > It would be smart to update the Flink Shaded components to their latest > versions before we release Flink 1.18 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30772) Update Flink Shaded dependencies to the latest versions
[ https://issues.apache.org/jira/browse/FLINK-30772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30772: --- Fix Version/s: shaded-17.0 > Update Flink Shaded dependencies to the latest versions > --- > > Key: FLINK-30772 > URL: https://issues.apache.org/jira/browse/FLINK-30772 > Project: Flink > Issue Type: Technical Debt > Components: BuildSystem / Shaded >Affects Versions: 1.18.0 >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Fix For: shaded-17.0 > > > It would be smart to update the Flink Shaded components to their latest > versions before we release Flink 1.18 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31631) Upgrade GCS connector to 2.2.11.
[ https://issues.apache.org/jira/browse/FLINK-31631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-31631. -- Fix Version/s: 1.18.0 Resolution: Fixed Fixed in master: e3f9df66e404d80faa9926f3fef43827a7d28683 > Upgrade GCS connector to 2.2.11. > > > Key: FLINK-31631 > URL: https://issues.apache.org/jira/browse/FLINK-31631 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.17.0 >Reporter: Chris Nauroth >Assignee: Chris Nauroth >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > Upgrade the [GCS > Connector|https://github.com/GoogleCloudDataproc/hadoop-connectors/tree/v2.2.11/gcs] > bundled in the Flink distro from version 2.2.3 to 2.2.11. The new release > contains multiple bug fixes and enhancements discussed in the [Release > Notes|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.11/gcs/CHANGES.md]. > Notable changes include: > * Improved socket timeout handling. > * Trace logging capabilities. > * Fix bug that prevented usage of GCS as a [Hadoop Credential > Provider|https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html]. > * Dependency upgrades. > * Support OAuth2 based client authentication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser merged pull request #22281: [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11.
MartijnVisser merged PR #22281: URL: https://github.com/apache/flink/pull/22281 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser commented on pull request #22281: [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11.
MartijnVisser commented on PR #22281: URL: https://github.com/apache/flink/pull/22281#issuecomment-1491385479 @cnauroth Thanks for the PR, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-6757) Investigate Apache Atlas integration
[ https://issues.apache.org/jira/browse/FLINK-6757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707145#comment-17707145 ] Martijn Visser commented on FLINK-6757: --- [~litiliu] This requires someone creating a FLIP and driving this effort. I don't think there's currently any volunteers who want/have the bandwidth to work on this. > Investigate Apache Atlas integration > > > Key: FLINK-6757 > URL: https://issues.apache.org/jira/browse/FLINK-6757 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Till Rohrmann >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, stale-minor > > Users asked for an integration of Apache Flink with Apache Atlas. It might be > worthwhile to investigate what is necessary to achieve this task. > References: > http://atlas.incubator.apache.org/StormAtlasHook.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31655) Adaptive Channel selection for partitioner
[ https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707138#comment-17707138 ] tartarus commented on FLINK-31655: -- Hi [~gaoyunhaii] [~fanrui] Thanks for your suggestions, I will produce a design doc as soon. > Adaptive Channel selection for partitioner > -- > > Key: FLINK-31655 > URL: https://issues.apache.org/jira/browse/FLINK-31655 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: tartarus >Assignee: tartarus >Priority: Major > > In Flink, if the upstream and downstream operator parallelism is not the > same, then by default the RebalancePartitioner will be used to select the > target channel. > In our company, users often use flink to access redis, hbase or other rpc > services, If some of the Operators are slow to return requests (for external > service reasons), then because Rebalance/Rescale are Round-Robin the Channel > selection policy, so the job is easy to backpressure. > Because the Rebalance/Rescale policy does not care which subtask the data is > sent to downstream, so we expect Rebalance/Rescale to refer to the processing > power of the downstream subtask when choosing a Channel. > Send more data to the free subtask, this ensures the best possible throughput > of job! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22312: [FLINK-31677][table] Add built-in MAP_ENTRIES function.
flinkbot commented on PR #22312: URL: https://github.com/apache/flink/pull/22312#issuecomment-1491329647 ## CI report: * 13959f8e4ede00ec672129170259b32761ffe6e4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31677) Add MAP_ENTRIES supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-31677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31677: --- Labels: pull-request-available (was: ) > Add MAP_ENTRIES supported in SQL & Table API > > > Key: FLINK-31677 > URL: https://issues.apache.org/jira/browse/FLINK-31677 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: jackylau >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > map_entries(map) - Returns an unordered array of all entries in the given map. > Syntax: > map_entries(map) > Arguments: > map: An MAP to be handled. > Returns: > Returns an unordered array of all entries in the given map. > Returns null if the argument is null > {code:sql} > > SELECT map_entries(map[1, 'a', 2, 'b']); > [(1,"a"),(2,"b")]{code} > See also > presto [https://prestodb.io/docs/current/functions/map.html] > spark https://spark.apache.org/docs/latest/api/sql/index.html#map_entries -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] liuyongvs opened a new pull request, #22312: [FLINK-31677][table] Add built-in MAP_ENTRIES function.
liuyongvs opened a new pull request, #22312: URL: https://github.com/apache/flink/pull/22312 - What is the purpose of the change This is an implementation of MAP_ENTRIES - Brief change log MAP_ENTRIES for Table API and SQL ``` map_entries(map) - Returns an unordered array of all entries in the given map. Syntax: map_entries(map) Arguments: map: An MAP to be handled. Returns: Returns an unordered array of all entries in the given map. Returns null if the argument is null > SELECT map_entries(map[1, 'a', 2, 'b']); [(1,"a"),(2,"b")] ``` See also presto https://prestodb.io/docs/current/functions/map.html spark https://spark.apache.org/docs/latest/api/sql/index.html#map_entries - Verifying this change This change added tests in MapFunctionITCase. - Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): ( no) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes ) The serializers: (no) The runtime per-record code paths (performance sensitive): ( no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) The S3 file system connector: ( no) - Documentation Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SinBex commented on pull request #22311: [FLINK-31652][k8s] Handle the deleted event in case pod is deleted during the pending phase
SinBex commented on PR #22311: URL: https://github.com/apache/flink/pull/22311#issuecomment-1491322025 Thank you so much for your review @huwh , I have updated the code, PTAL~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mbalassi commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1154032017 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -627,14 +637,42 @@ public Map getClusterInfo(Configuration conf) throws Exception { .toSeconds(), TimeUnit.SECONDS); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } -return runtimeVersion; + +// JobManager resource usage can be deduced from the CR +var jmParameters = +new KubernetesJobManagerParameters( +conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +var jmTotalCpu = +jmParameters.getJobManagerCPU() +* jmParameters.getJobManagerCPULimitFactor() +* jmParameters.getReplicas(); +var jmTotalMemory = +Math.round( +jmParameters.getJobManagerMemoryMB() +* Math.pow(1024, 2) +* jmParameters.getJobManagerMemoryLimitFactor() +* jmParameters.getReplicas()); + +// TaskManager resource usage is best gathered from the REST API to get current replicas Review Comment: Good catch @mateczagany. I had this suspicion in the back of my mind, that the CPU consumption might be overreported, but the way we pass the values to the taskmanagers via `flink-kubernetes` (which does have proper fractional values) convinced me that it should be ok. I will dive a bit deeper into this and come back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SinBex commented on a diff in pull request #22311: [FLINK-31652][k8s] Handle the deleted event in case pod is deleted during the pending phase
SinBex commented on code in PR #22311: URL: https://github.com/apache/flink/pull/22311#discussion_r1154031487 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java: ## @@ -65,6 +67,11 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import static io.fabric8.kubernetes.client.Watcher.Action.ADDED; Review Comment: Thanks for your suggestion. It is indeed better not to introduce an external class, I will use an internal enum to replace it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] huwh commented on a diff in pull request #22311: [FLINK-31652][k8s] Handle the deleted event in case pod is deleted during the pending phase
huwh commented on code in PR #22311: URL: https://github.com/apache/flink/pull/22311#discussion_r1153988699 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java: ## @@ -65,6 +67,11 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import static io.fabric8.kubernetes.client.Watcher.Action.ADDED; Review Comment: It's better not use fabric classes in KubernetesResourceManagerDriver since these are Flink internal logic. ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java: ## @@ -340,12 +347,12 @@ private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters( blockedNodes); } -private void handlePodEventsInMainThread(List pods) { +private void handlePodEventsInMainThread(List pods, Action action) { getMainThreadExecutor() .execute( () -> { for (KubernetesPod pod : pods) { -if (pod.isTerminated()) { +if (action == DELETED || pod.isTerminated()) { Review Comment: We need some annotations to explain why we need this logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31677) Add MAP_ENTRIES supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-31677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jackylau updated FLINK-31677: - Description: map_entries(map) - Returns an unordered array of all entries in the given map. Syntax: map_entries(map) Arguments: map: An MAP to be handled. Returns: Returns an unordered array of all entries in the given map. Returns null if the argument is null {code:sql} > SELECT map_entries(map[1, 'a', 2, 'b']); [(1,"a"),(2,"b")]{code} See also presto [https://prestodb.io/docs/current/functions/map.html] spark https://spark.apache.org/docs/latest/api/sql/index.html#map_entries > Add MAP_ENTRIES supported in SQL & Table API > > > Key: FLINK-31677 > URL: https://issues.apache.org/jira/browse/FLINK-31677 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: jackylau >Priority: Major > Fix For: 1.18.0 > > > map_entries(map) - Returns an unordered array of all entries in the given map. > Syntax: > map_entries(map) > Arguments: > map: An MAP to be handled. > Returns: > Returns an unordered array of all entries in the given map. > Returns null if the argument is null > {code:sql} > > SELECT map_entries(map[1, 'a', 2, 'b']); > [(1,"a"),(2,"b")]{code} > See also > presto [https://prestodb.io/docs/current/functions/map.html] > spark https://spark.apache.org/docs/latest/api/sql/index.html#map_entries -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31655) Adaptive Channel selection for partitioner
[ https://issues.apache.org/jira/browse/FLINK-31655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707102#comment-17707102 ] Yun Gao commented on FLINK-31655: - Hi [~tartarus] thanks for the proposal, it also looks useful from my side. There are also some discussion about it in the community previously , and some points are mentioned, like we might not introduce new locks and we might design carefully about the structure to maintain the active channels to avoid additional overhead, thus I also think if convenient you may propose a design doc thus we could first get consensus on the overall design. Also cc [~pltbkd] , who has also implemented the functionality previously in the internal version. > Adaptive Channel selection for partitioner > -- > > Key: FLINK-31655 > URL: https://issues.apache.org/jira/browse/FLINK-31655 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: tartarus >Assignee: tartarus >Priority: Major > > In Flink, if the upstream and downstream operator parallelism is not the > same, then by default the RebalancePartitioner will be used to select the > target channel. > In our company, users often use flink to access redis, hbase or other rpc > services, If some of the Operators are slow to return requests (for external > service reasons), then because Rebalance/Rescale are Round-Robin the Channel > selection policy, so the job is easy to backpressure. > Because the Rebalance/Rescale policy does not care which subtask the data is > sent to downstream, so we expect Rebalance/Rescale to refer to the processing > power of the downstream subtask when choosing a Channel. > Send more data to the free subtask, this ensures the best possible throughput > of job! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31677) Add MAP_ENTRIES supported in SQL & Table API
jackylau created FLINK-31677: Summary: Add MAP_ENTRIES supported in SQL & Table API Key: FLINK-31677 URL: https://issues.apache.org/jira/browse/FLINK-31677 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: jackylau Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] SinBex commented on pull request #22311: [FLINK-31652][k8s] Handle the deleted event in case pod is deleted during the pending phase
SinBex commented on PR #22311: URL: https://github.com/apache/flink/pull/22311#issuecomment-1491245093 @xintongsong Could you please help to review this PR in your free time? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22311: [FLINK-31652][k8s] Handle the deleted event in case pod is deleted during the pending phase
flinkbot commented on PR #22311: URL: https://github.com/apache/flink/pull/22311#issuecomment-1491237551 ## CI report: * 2cba48fe3f868381707aa949f6f2132f6981ebf9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31652) Flink should handle the delete event if the pod was deleted while pending
[ https://issues.apache.org/jira/browse/FLINK-31652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31652: --- Labels: pull-request-available (was: ) > Flink should handle the delete event if the pod was deleted while pending > - > > Key: FLINK-31652 > URL: https://issues.apache.org/jira/browse/FLINK-31652 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.17.0, 1.16.1 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Labels: pull-request-available > > I found that in kubernetes deployment, if the taskmanager pod is deleted in > 'Pending' phase, the flink job will get stuck and keep waiting for the pod > scheduled. We can reproduce this issue with the 'kubectl delete pod' command > to delete the pod when it is in the pending phase. > > The cause reason is that the pod status will not be updated in time in this > case, so the KubernetesResourceManagerDriver won't detect the pod is > terminated, and I also verified this by logging the pod status in > KubernetesPod#isTerminated(), and it shows as follows. > {code:java} > public boolean isTerminated() { > log.info("pod status: " + getInternalResource().getStatus()); > if (getInternalResource().getStatus() != null) { > final boolean podFailed = > > PodPhase.Failed.name().equals(getInternalResource().getStatus().getPhase()); > final boolean containersFailed = > > getInternalResource().getStatus().getContainerStatuses().stream() > .anyMatch( > e -> > e.getState() != null > && > e.getState().getTerminated() != null); > return containersFailed || podFailed; > } > return false; > } {code} > In the case, this function will return false because `containersFailed` and > `podFailed` are both false. > {code:java} > PodStatus(conditions=[PodCondition(lastProbeTime=null, > lastTransitionTime=2023-03-28T12:35:10Z, reason=Unschedulable, status=False, > type=PodScheduled, additionalProperties={})], containerStatuses=[], > ephemeralContainerStatuses=[], hostIP=null, initContainerStatuses=[], > message=null, nominatedNodeName=null, phase=Pending, podIP=null, podIPs=[], > qosClass=Guaranteed, reason=null, startTime=null, > additionalProperties={}){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] SinBex opened a new pull request, #22311: [FLINK-31652][k8s] Handle the deleted event in case pod is deleted during the pending phase
SinBex opened a new pull request, #22311: URL: https://github.com/apache/flink/pull/22311 ## What is the purpose of the change Currently, in kubernetes deployment, if the taskmanager pod is deleted in 'Pending' phase, the flink job will get stuck and keep waiting for the pod scheduled. The cause reason is that the pod status will not be updated in time, so the KubernetesResourceManagerDriver won't detect the pod is terminated, but should handle the deleted event. ## Brief change log - The method KubernetesResourceManagerDriver#handlePodEventsInMainThread handle the deleted event ## Verifying this change - Add UT in `KubernetesResourceManagerDriverTest#testOnPodDeletedWithDeletedEvent` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31573) Nexmark performance drops in 1.17 compared to 1.13
[ https://issues.apache.org/jira/browse/FLINK-31573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707095#comment-17707095 ] Yun Tang commented on FLINK-31573: -- I think https://github.com/nexmark/nexmark/pull/43 could help mitigate this problem. > Nexmark performance drops in 1.17 compared to 1.13 > -- > > Key: FLINK-31573 > URL: https://issues.apache.org/jira/browse/FLINK-31573 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Priority: Critical > > The case was originally > [reported|https://lists.apache.org/thread/n4x2xgcr2kb8vcbby7c6gwb22n0thwhz] > in the voting thread of 1.17.0 RC3. > Compared to Flink 1.13, the performance of Nexmark in 1.17.0 RC3 drops ~8% in > query 18. Some details could be found in the [mailing > list|https://lists.apache.org/thread/n4x2xgcr2kb8vcbby7c6gwb22n0thwhz]. > A further investigation showed that with configuration > {{execution.checkpointing.checkpoints-after-tasks-finish.enabled}} set to > false, the performance of 1.17 is better than 1.16. > A fully comparison of Nexmark result between 1.16 and 1.17 is ongoing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on pull request #22307: [FLINK-31643][core] Introduce a temporary configuration to enable the tiered store architecture for hybrid shuffle
WencongLiu commented on PR #22307: URL: https://github.com/apache/flink/pull/22307#issuecomment-1491214003 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on pull request #22277: [FLINK-31621][table] Add built-in ARRAY_REVERSE function.
liuyongvs commented on PR #22277: URL: https://github.com/apache/flink/pull/22277#issuecomment-1491199038 rebase to fix conflicts with array_position which merged just now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-6757) Investigate Apache Atlas integration
[ https://issues.apache.org/jira/browse/FLINK-6757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707087#comment-17707087 ] liting liu commented on FLINK-6757: --- Are we going to do the changes suggested by [~gyfora]? It will help a lot to do lineage integration with other systems (DataHub, Atlas, and so on). > Investigate Apache Atlas integration > > > Key: FLINK-6757 > URL: https://issues.apache.org/jira/browse/FLINK-6757 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Till Rohrmann >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, stale-minor > > Users asked for an integration of Apache Flink with Apache Atlas. It might be > worthwhile to investigate what is necessary to achieve this task. > References: > http://atlas.incubator.apache.org/StormAtlasHook.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24785) Relocate RocksDB's log under flink log directory by default
[ https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706853#comment-17706853 ] jinghaihang edited comment on FLINK-24785 at 3/31/23 2:04 AM: -- Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /flink-io-9c156d8a-3df5-4d17-b838-40540cd5f10a/job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/db/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in rocksdb db path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. was (Author: assassinj): Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/db/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in rocksdb db path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. > Relocate RocksDB's log under flink log directory by default > --- > > Key: FLINK-24785 > URL: https://issues.apache.org/jira/browse/FLINK-24785 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Previously, RocksDB's log locates at its own DB folder, which makes the > debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log > directory by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24785) Relocate RocksDB's log under flink log directory by default
[ https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706853#comment-17706853 ] jinghaihang edited comment on FLINK-24785 at 3/31/23 1:59 AM: -- Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/db/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in rocksdb db path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. was (Author: assassinj): Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /tm_container_e76_1671244879475_67638619_01_29/tmp /job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in above path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. > Relocate RocksDB's log under flink log directory by default > --- > > Key: FLINK-24785 > URL: https://issues.apache.org/jira/browse/FLINK-24785 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Previously, RocksDB's log locates at its own DB folder, which makes the > debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log > directory by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24785) Relocate RocksDB's log under flink log directory by default
[ https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707083#comment-17707083 ] jinghaihang commented on FLINK-24785: - !https://km.sankuai.com/api/file/cdn/1613529928/31761470454?contentType=1&isNewContent=false|width=812,height=350! > Relocate RocksDB's log under flink log directory by default > --- > > Key: FLINK-24785 > URL: https://issues.apache.org/jira/browse/FLINK-24785 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Previously, RocksDB's log locates at its own DB folder, which makes the > debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log > directory by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24785) Relocate RocksDB's log under flink log directory by default
[ https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706853#comment-17706853 ] jinghaihang edited comment on FLINK-24785 at 3/31/23 1:52 AM: -- Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /tm_container_e76_1671244879475_67638619_01_29/tmp /job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in above path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. was (Author: assassinj): Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /tm_container_e76_1671244879475_67638619_01_29/tmp /job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in above path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. > Relocate RocksDB's log under flink log directory by default > --- > > Key: FLINK-24785 > URL: https://issues.apache.org/jira/browse/FLINK-24785 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Previously, RocksDB's log locates at its own DB folder, which makes the > debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log > directory by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24785) Relocate RocksDB's log under flink log directory by default
[ https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706853#comment-17706853 ] jinghaihang edited comment on FLINK-24785 at 3/31/23 1:51 AM: -- Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619 /tm_container_e76_1671244879475_67638619_01_29/tmp /job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c_{_}42_100{_}_uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in above path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. was (Author: assassinj): Hi ,Yun Tang , thanks for your reply. To be honest,I didn't understand what you mentioned ‘previous location’ mean. --- When my flink version is 1.12,the Rocksdb LOG file can store in my environment,path is: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-xx/appcache/application_1671244879475_67638619/job_7ebf314124fad7b7f930921b126e5dde_op_StreamFilter_9dd63673dd41ea021b896d5203f3ba7c__42_100__uuid_3a1d05e9-cb45-4b77-baa8-99d1d1cc77b9/LOG When my flink version is 1.16, due to this issue feature, i can not find this LOG file neither in flink log dir nor in above path. And i find filename exceeds 255 characters problem, so i changed the frocksdb code, retained the useful part, and the file can appear. > Relocate RocksDB's log under flink log directory by default > --- > > Key: FLINK-24785 > URL: https://issues.apache.org/jira/browse/FLINK-24785 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Previously, RocksDB's log locates at its own DB folder, which makes the > debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log > directory by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia merged pull request #22267: [FLINK-31602][table] Add built-in ARRAY_POSITION function.
luoyuxia merged PR #22267: URL: https://github.com/apache/flink/pull/22267 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
RamanVerma commented on code in PR #18: URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1153921860 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; + +import java.io.Serializable; +import java.util.ArrayList; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProducer extends Serializable { +default Iterable produceHeaders(IN input) { Review Comment: I think you can use `Headers` instead of `Iterable` from `org.apache.kafka.common.header.Headers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-31547) Introduce FlinkResultSetMetaData for jdbc driver
[ https://issues.apache.org/jira/browse/FLINK-31547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li resolved FLINK-31547. Fix Version/s: 1.18.0 Resolution: Fixed Fixed via https://github.com/apache/flink/commit/b81291864e69899191a3ef01599a67f3b08b384b (1.18.0) [~zjureel]Thanks for your contribution! > Introduce FlinkResultSetMetaData for jdbc driver > > > Key: FLINK-31547 > URL: https://issues.apache.org/jira/browse/FLINK-31547 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / JDBC >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] libenchao closed pull request #22284: [FLINK-31547][jdbc-driver] Introduce FlinkResultSetMetaData for jdbc driver
libenchao closed pull request #22284: [FLINK-31547][jdbc-driver] Introduce FlinkResultSetMetaData for jdbc driver URL: https://github.com/apache/flink/pull/22284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30321) Upgrade ZLIB of FRocksDB to 1.2.13
[ https://issues.apache.org/jira/browse/FLINK-30321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707079#comment-17707079 ] songwenbin commented on FLINK-30321: Thank you very much for your answer. In flink1.16.1 , upgrade the frocksdbjni of the pom.xml of the flick-statebackend-rocksdb to 6.20.3-veverica-2.0 ,is is ok ? [~Yanfei Lei] > Upgrade ZLIB of FRocksDB to 1.2.13 > -- > > Key: FLINK-30321 > URL: https://issues.apache.org/jira/browse/FLINK-30321 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Fix For: 1.17.0 > > > In FRocksDB, the ZLIB version is 1.2.11, which may result in memory > corruption, see > [cve-2018-25032|https://nvd.nist.gov/vuln/detail/cve-2018-25032#vulnCurrentDescriptionTitle] > https://lists.apache.org/thread/rm40f45qfw6rls70k35o2dt0k4tz9bsr -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] FangYongs commented on pull request #22284: [FLINK-31547][jdbc-driver] Introduce FlinkResultSetMetaData for jdbc driver
FangYongs commented on PR #22284: URL: https://github.com/apache/flink/pull/22284#issuecomment-1491152645 Thanks @libenchao DONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #22284: [FLINK-31547][jdbc-driver] Introduce FlinkResultSetMetaData for jdbc driver
FangYongs commented on code in PR #22284: URL: https://github.com/apache/flink/pull/22284#discussion_r1153919508 ## flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/ColumnInfo.java: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import org.apache.flink.table.types.logical.ArrayType; Review Comment: Good idea, I'll consider how to create this e2e test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mas-chen commented on a diff in pull request #22303: [FLINK-31305] fix error propagation bug in WriterCallback and use Tes…
mas-chen commented on code in PR #22303: URL: https://github.com/apache/flink/pull/22303#discussion_r1153914893 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -198,39 +197,135 @@ public void testCurrentSendTimeMetric() throws Exception { } @Test -void testNumRecordsOutErrorsCounterMetric() throws Exception { +void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); -final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); -try (final KafkaWriter writer = +SinkInitContext sinkInitContext = +new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), +timeService, +null); +final KafkaWriter writer = createWriterWithConfiguration( -properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) { -final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); -assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); +properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); +final Counter numRecordsOutErrors = +sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + +triggerProducerException(writer, properties); + +// test flush +assertThatCode(() -> writer.flush(false)) +.hasRootCauseExactlyInstanceOf(ProducerFencedException.class); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + +assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) +.as("the exception is not thrown again") +.doesNotThrowAnyException(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); +} -writer.write(1, SINK_WRITER_CONTEXT); -assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); +@Test +void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception { +Properties properties = getKafkaClientConfiguration(); -final String transactionalId = writer.getCurrentProducer().getTransactionalId(); +SinkInitContext sinkInitContext = +new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), +timeService, +null); +final KafkaWriter writer = +createWriterWithConfiguration( +properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); +final Counter numRecordsOutErrors = +sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + +triggerProducerException(writer, properties); +// to ensure that the exceptional send request has completed +writer.getCurrentProducer().flush(); + +assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) +.hasRootCauseExactlyInstanceOf(ProducerFencedException.class); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + +assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) +.as("the exception is not thrown again") +.doesNotThrowAnyException(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); +} -try (FlinkKafkaInternalProducer producer = -new FlinkKafkaInternalProducer<>(properties, transactionalId)) { +@Test +void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception { +Properties properties = getKafkaClientConfiguration(); -producer.initTransactions(); -producer.beginTransaction(); -producer.send(new ProducerRecord(topic, "2".getBytes())); -producer.commitTransaction(); -} +SinkInitContext sinkInitContext = +new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), +timeService, +null); +final KafkaWriter writer = +createWriterWithConfiguration( +properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); +final Counter numRecordsOutErrors = +sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); -writer.write(3, SINK_WRITER_CONTEXT); -// this doesn't throw exception b
[GitHub] [flink] mas-chen commented on a diff in pull request #22303: [FLINK-31305] fix error propagation bug in WriterCallback and use Tes…
mas-chen commented on code in PR #22303: URL: https://github.com/apache/flink/pull/22303#discussion_r1153914459 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -198,39 +197,135 @@ public void testCurrentSendTimeMetric() throws Exception { } @Test -void testNumRecordsOutErrorsCounterMetric() throws Exception { +void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); -final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); -try (final KafkaWriter writer = +SinkInitContext sinkInitContext = +new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), +timeService, +null); +final KafkaWriter writer = createWriterWithConfiguration( -properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) { -final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); -assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); +properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); +final Counter numRecordsOutErrors = +sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + +triggerProducerException(writer, properties); + +// test flush +assertThatCode(() -> writer.flush(false)) +.hasRootCauseExactlyInstanceOf(ProducerFencedException.class); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + +assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) +.as("the exception is not thrown again") +.doesNotThrowAnyException(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); +} -writer.write(1, SINK_WRITER_CONTEXT); -assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); +@Test +void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception { +Properties properties = getKafkaClientConfiguration(); -final String transactionalId = writer.getCurrentProducer().getTransactionalId(); +SinkInitContext sinkInitContext = +new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), +timeService, +null); +final KafkaWriter writer = +createWriterWithConfiguration( +properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); +final Counter numRecordsOutErrors = +sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + +triggerProducerException(writer, properties); +// to ensure that the exceptional send request has completed +writer.getCurrentProducer().flush(); + +assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) +.hasRootCauseExactlyInstanceOf(ProducerFencedException.class); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + +assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) +.as("the exception is not thrown again") +.doesNotThrowAnyException(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); +} -try (FlinkKafkaInternalProducer producer = -new FlinkKafkaInternalProducer<>(properties, transactionalId)) { +@Test +void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception { +Properties properties = getKafkaClientConfiguration(); -producer.initTransactions(); -producer.beginTransaction(); -producer.send(new ProducerRecord(topic, "2".getBytes())); -producer.commitTransaction(); -} +SinkInitContext sinkInitContext = +new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), +timeService, +null); +final KafkaWriter writer = +createWriterWithConfiguration( +properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); +final Counter numRecordsOutErrors = +sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); -writer.write(3, SINK_WRITER_CONTEXT); -// this doesn't throw exception b
[GitHub] [flink] cnauroth commented on pull request #22281: [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11.
cnauroth commented on PR #22281: URL: https://github.com/apache/flink/pull/22281#issuecomment-1491091594 I pushed up the documentation changes. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19873: [FLINK-27891][Table] Add ARRAY_APPEND and ARRAY_PREPEND functions
snuyanzin commented on code in PR #19873: URL: https://github.com/apache/flink/pull/19873#discussion_r1153853228 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,12 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_APPEND(haystack, element) +table: haystack.arrayAppend(needle) +description: Appends an element to the end of the array and returns the result. If the array itself is null, the function will return null. If an element to add is null, the null element will be added to the end of the array. The given element is cast implicitly to the array's element type if necessary. + - sql: ARRAY_PREPEND(element, haystack) +table: haystack.arrayPrepend(needle) +description: Appends an element to the beginning of the array and returns the result. If the array itself is null, the function will return null. If an element to add is null, the null element will be added to the beginning of the array. The given element is cast implicitly to the array's element type if necessary. Review Comment: depends on what you call obvious. just look at functions a.g. ```sql SELECT ARRAY_PREPEND(1, ARRAY[2, 3, 4]); ``` In this case at's obviously that the result will be `ARRAY[1, 2, 3, 4]` in case of spark ```sql SELECT ARRAY_PREPEND(ARRAY[2, 3, 4], 1); ``` it is not that obvious... well, i see more engines follow the second option. Probably will do same just because of majority -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19873: [FLINK-27891][Table] Add ARRAY_APPEND and ARRAY_PREPEND functions
snuyanzin commented on code in PR #19873: URL: https://github.com/apache/flink/pull/19873#discussion_r1153851661 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -178,6 +179,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .runtimeClass( "org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction") .build(); + +public static final BuiltInFunctionDefinition ARRAY_APPEND = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_APPEND") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Arrays.asList("haystack", "element"), +Arrays.asList( +logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG))) +.outputTypeStrategy(nullableIfArgs(argument(0))) +.runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ArrayAppendFunction") +.build(); Review Comment: ok, postgres and spark does this, then could be done in Flink as well ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -178,6 +179,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .runtimeClass( "org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction") .build(); + +public static final BuiltInFunctionDefinition ARRAY_APPEND = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_APPEND") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Arrays.asList("haystack", "element"), +Arrays.asList( +logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG))) +.outputTypeStrategy(nullableIfArgs(argument(0))) +.runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ArrayAppendFunction") +.build(); Review Comment: ok, postgres and spark do this, then could be done in Flink as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22063: [FLINK-24909][Table SQL/Client] Add syntax highlighting
snuyanzin commented on code in PR #22063: URL: https://github.com/apache/flink/pull/22063#discussion_r1153849899 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.parser; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.client.config.SqlClientOptions; +import org.apache.flink.table.client.gateway.Executor; + +import org.jline.reader.LineReader; +import org.jline.reader.impl.DefaultHighlighter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Sql Client syntax highlighter. */ +public class SqlClientSyntaxHighlighter extends DefaultHighlighter { +private static final Logger LOG = LoggerFactory.getLogger(SqlClientSyntaxHighlighter.class); +private static Set flinkKeywordSet; +private static Set flinkKeywordCharacterSet; + +static { +try (InputStream is = + SqlClientSyntaxHighlighter.class.getResourceAsStream("/keywords.properties")) { +Properties props = new Properties(); +props.load(is); +flinkKeywordSet = +Collections.unmodifiableSet( + Arrays.stream(props.get("default").toString().split(";")) +.collect(Collectors.toSet())); +flinkKeywordCharacterSet = +flinkKeywordSet.stream() +.flatMap(t -> t.chars().mapToObj(c -> (char) c)) +.collect(Collectors.toSet()); +} catch (IOException e) { +LOG.error("Exception: ", e); +flinkKeywordSet = Collections.emptySet(); +} +} + +private final Executor executor; + +public SqlClientSyntaxHighlighter(Executor executor) { +this.executor = executor; +} + +@Override +public AttributedString highlight(LineReader reader, String buffer) { +final SyntaxHighlightStyle.BuiltInStyle style = +SyntaxHighlightStyle.BuiltInStyle.fromString( +executor.getSessionConfig() + .get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA)); + +if (style == SyntaxHighlightStyle.BuiltInStyle.DEFAULT) { +return super.highlight(reader, buffer); +} +final String dialectName = + executor.getSessionConfig().get(TableConfigOptions.TABLE_SQL_DIALECT); +final SqlDialect dialect = +SqlDialect.HIVE.name().equalsIgnoreCase(dialectName) +? SqlDialect.HIVE +: SqlDialect.DEFAULT; +return getHighlightedOutput(buffer, style.getHighlightStyle(), dialect); +} + +static AttributedString getHighlightedOutput( +String buffer, SyntaxHighlightStyle style, SqlDialect dialect) { +final AttributedStringBuilder highlightedOutput = new AttributedStringBuilder(); +State currentParseState = null; +StringBuilder word = new StringBuilder(); +for (int i = 0; i < buffer.length(); i++) { +final char currentChar = buffer.charAt(i); +if (currentParseState == null) { +currentParseState = State.computeStateAt(buffer, i, dialect); +if (currentParseState == null) { +if (!flinkKeywordCharacterSet.contains(Character.toUpperCase(currentChar))) { +handleWord(word, highlightedOutput, currentParseState, style, true); +highlightedOutput.appen
[GitHub] [flink] snuyanzin commented on a diff in pull request #22063: [FLINK-24909][Table SQL/Client] Add syntax highlighting
snuyanzin commented on code in PR #22063: URL: https://github.com/apache/flink/pull/22063#discussion_r1153849663 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.parser; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.client.config.SqlClientOptions; +import org.apache.flink.table.client.gateway.Executor; + +import org.jline.reader.LineReader; +import org.jline.reader.impl.DefaultHighlighter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Sql Client syntax highlighter. */ +public class SqlClientSyntaxHighlighter extends DefaultHighlighter { +private static final Logger LOG = LoggerFactory.getLogger(SqlClientSyntaxHighlighter.class); +private static Set flinkKeywordSet; +private static Set flinkKeywordCharacterSet; Review Comment: done ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.parser; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.client.config.SqlClientOptions; +import org.apache.flink.table.client.gateway.Executor; + +import org.jline.reader.LineReader; +import org.jline.reader.impl.DefaultHighlighter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Sql Client syntax highlighter. */ +public class SqlClientSyntaxHighlighter extends DefaultHighlighter { +private static final Logger LOG = LoggerFactory.getLogger(SqlClientSyntaxHighlighter.class); +private static Set flinkKeywordSet; +private static Set flinkKeywordCharacterSet; + +static { +try (InputStream is = + SqlClientSyntaxHighlighter.class.getResourceAsStream("/keywords.properties")) { +Properties props = new Properties(); +props.load(is); +flinkKeywordSet = +Collections.unmodifiableSet( + Arrays.stream(props.get("default").toString().split(";")) +.collect(Collectors.toSet())); +flinkKeywordCharacterSet = +
[GitHub] [flink] snuyanzin commented on a diff in pull request #22063: [FLINK-24909][Table SQL/Client] Add syntax highlighting
snuyanzin commented on code in PR #22063: URL: https://github.com/apache/flink/pull/22063#discussion_r1153849499 ## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlClientHighlighterTest.java: ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.parser; + +import org.apache.flink.table.api.SqlDialect; + +import org.jline.utils.AttributedStringBuilder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.apache.flink.table.client.cli.parser.SyntaxHighlightStyle.BuiltInStyle.DARK; +import static org.apache.flink.table.client.cli.parser.SyntaxHighlightStyle.BuiltInStyle.LIGHT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SqlClientSyntaxHighlighter}. */ +class SqlClientHighlighterTest { +@ParameterizedTest +@MethodSource("specProvider") +void test(SqlClientHighlighterTestSpec spec) { +assertThat( +SqlClientSyntaxHighlighter.getHighlightedOutput( +spec.sql, spec.style, spec.dialect) +.toAnsi()) +.isEqualTo(spec.getExpected()); +} + +static Stream specProvider() { +return Stream.of( +new SqlClientHighlighterTestSpec( +"select", +new AttributedStringTestSpecBuilder(DARK.getHighlightStyle()) +.appendKeyword("select")), +new SqlClientHighlighterTestSpec( +"default_style", +new AttributedStringTestSpecBuilder(DARK.getHighlightStyle()) +.append("default_style")), +new SqlClientHighlighterTestSpec( +"'quoted'", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendQuoted("'quoted'")), +new SqlClientHighlighterTestSpec( +"`sqlQuoteIdentifier`", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendSqlIdentifier("`sqlQuoteIdentifier`")), +new SqlClientHighlighterTestSpec( +"/*\nmultiline\n comment\n*/", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendComment("/*\nmultiline\n comment\n*/")), +new SqlClientHighlighterTestSpec( +"/*\nnot finished\nmultiline\n comment\n", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendComment("/*\nnot finished\nmultiline\n comment\n")), +new SqlClientHighlighterTestSpec( +"/*+hint*/", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendHint("/*+hint*/")), +new SqlClientHighlighterTestSpec( +"'`not a sql quote`''/*not a comment*/''--not a comment'", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendQuoted( +"'`not a sql quote`''/*not a comment*/''--not a comment'")), +new SqlClientHighlighterTestSpec( +"`'not a quote'``/*not a comment*/``--not a comment`", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendSqlIdentifier( +"`'not a quote'``/*not a comment*/``--not a comment`")), +new SqlClientHighlighterTestSpec( +"/*'not a quote'`not a sql quote``` /*+ not a hint*/", +new AttributedStringTestSpecBuilder(LIGHT.getHighlightStyle()) +.appendCommen
[GitHub] [flink] snuyanzin commented on a diff in pull request #22063: [FLINK-24909][Table SQL/Client] Add syntax highlighting
snuyanzin commented on code in PR #22063: URL: https://github.com/apache/flink/pull/22063#discussion_r1153840515 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli.parser; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.client.config.SqlClientOptions; +import org.apache.flink.table.client.gateway.Executor; + +import org.jline.reader.LineReader; +import org.jline.reader.impl.DefaultHighlighter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Sql Client syntax highlighter. */ +public class SqlClientSyntaxHighlighter extends DefaultHighlighter { +private static final Logger LOG = LoggerFactory.getLogger(SqlClientSyntaxHighlighter.class); +private static Set flinkKeywordSet; +private static Set flinkKeywordCharacterSet; + +static { +try (InputStream is = + SqlClientSyntaxHighlighter.class.getResourceAsStream("/keywords.properties")) { +Properties props = new Properties(); +props.load(is); +flinkKeywordSet = +Collections.unmodifiableSet( + Arrays.stream(props.get("default").toString().split(";")) +.collect(Collectors.toSet())); +flinkKeywordCharacterSet = +flinkKeywordSet.stream() +.flatMap(t -> t.chars().mapToObj(c -> (char) c)) +.collect(Collectors.toSet()); +} catch (IOException e) { +LOG.error("Exception: ", e); +flinkKeywordSet = Collections.emptySet(); +} +} + +private final Executor executor; + +public SqlClientSyntaxHighlighter(Executor executor) { +this.executor = executor; +} + +@Override +public AttributedString highlight(LineReader reader, String buffer) { +final SyntaxHighlightStyle.BuiltInStyle style = +SyntaxHighlightStyle.BuiltInStyle.fromString( +executor.getSessionConfig() + .get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA)); Review Comment: yes, it's handled at org.apache.flink.table.client.cli.parser.SyntaxHighlightStyle.BuiltInStyle#fromString. In case of wrong schema name the default one (no highlight) will be used https://github.com/apache/flink/pull/22063/files#diff-2be1e22afaf63582ff481966e09b1c43fc35a6176faee2f495b97f9b54562c8aR86-R95 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31604) Reduce usage of CatalogTableImpl in planner
[ https://issues.apache.org/jira/browse/FLINK-31604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707050#comment-17707050 ] Sergey Nuyanzin commented on FLINK-31604: - Merged to master at [cfb213040a08021da3a954a1c0e7f94f22a80f1e|https://github.com/apache/flink/commit/cfb213040a08021da3a954a1c0e7f94f22a80f1e] > Reduce usage of CatalogTableImpl in planner > --- > > Key: FLINK-31604 > URL: https://issues.apache.org/jira/browse/FLINK-31604 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > The task is similar to https://issues.apache.org/jira/browse/FLINK-30896 > however about CatalogTableImpl which is deprecated -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31604) Reduce usage of CatalogTableImpl in planner
[ https://issues.apache.org/jira/browse/FLINK-31604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin closed FLINK-31604. --- Fix Version/s: 1.18.0 Resolution: Fixed > Reduce usage of CatalogTableImpl in planner > --- > > Key: FLINK-31604 > URL: https://issues.apache.org/jira/browse/FLINK-31604 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The task is similar to https://issues.apache.org/jira/browse/FLINK-30896 > however about CatalogTableImpl which is deprecated -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin merged pull request #22263: [FLINK-31604][table] Reduce usage of CatalogTableImpl in table-planner
snuyanzin merged PR #22263: URL: https://github.com/apache/flink/pull/22263 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #22263: [FLINK-31604][table] Reduce usage of CatalogTableImpl in table-planner
snuyanzin commented on PR #22263: URL: https://github.com/apache/flink/pull/22263#issuecomment-1491001684 Thanks for having a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
[ https://issues.apache.org/jira/browse/FLINK-31676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707003#comment-17707003 ] Mason Chen edited comment on FLINK-31676 at 3/30/23 8:48 PM: - Yeah, [~stevenz3wu] who works on Icebergg and I discussed this long time back since it was a little cumbersome to change that number repeatedly. It could map to a fixed string, if we don't want to use/expose guava directly and keep the package name from breaking (Iceberg and other libs do this too). was (Author: mason6345): Yeah, [~stevenz3wu] who works on Icebergg and I discussed this long time back since it was a little cumbersome to change that number repeatedly. It could map to a fixed string, if we don't want to use guava directly and keep the package name from breaking (Iceberg and other libs do this too). > Pulsar connector should not rely on Flink Shaded > > > Key: FLINK-31676 > URL: https://issues.apache.org/jira/browse/FLINK-31676 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar >Reporter: Martijn Visser >Priority: Major > > The Pulsar connector currently depends on Flink Shaded for Guava. However, > externalized connectors must not rely on flink-shaded. This will just not be > possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.
snuyanzin commented on PR #21947: URL: https://github.com/apache/flink/pull/21947#issuecomment-1490890380 Closing the issue since https://github.com/apache/flink/pull/21947 is closed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #557: [FLINK-31630] Limit max checkpoint age for last-state upgrade
gyfora commented on PR #557: URL: https://github.com/apache/flink-kubernetes-operator/pull/557#issuecomment-1490835955 @morhidi checkpoint interval often has a bound based on state size etc . In some cases it’s not viable to go below a few minutes but you don’t want to introduce a few minutes of lag if you don’t have to. Or in case of iceberg users may prefer to keep it higher . It would be strange to force a lower checkpoint interval to the user just because the operator is not aware of it. This feature tries to address this limitation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31660) flink-connectors-kafka ITCases are not runnable in the IDE
[ https://issues.apache.org/jira/browse/FLINK-31660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707009#comment-17707009 ] Natea Eshetu Beshada commented on FLINK-31660: -- [~pnowojski] was able to get a rough working pom by looking at {{flink-connector-hive}} and added a few dependencies and a new maven plugin, and then [~pgaref] helped me by telling me to have intellij use the maven wrapper, and we fiddled with the pom a bit more. I will try to polish things up and put out a bug fix > flink-connectors-kafka ITCases are not runnable in the IDE > -- > > Key: FLINK-31660 > URL: https://issues.apache.org/jira/browse/FLINK-31660 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.18.0 >Reporter: Natea Eshetu Beshada >Assignee: Natea Eshetu Beshada >Priority: Major > > The following exception is thrown when trying to run > {{KafkaChangelogTableITCase}} or {{KafkaTableITCase}} > {code:java} > java.lang.NoClassDefFoundError: > org/apache/flink/table/shaded/com/jayway/jsonpath/spi/json/JsonProvider at > java.base/java.lang.Class.getDeclaredMethods0(Native Method) > at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166) > at java.base/java.lang.Class.getMethodsRecursive(Class.java:3307) > at java.base/java.lang.Class.getMethod0(Class.java:3293) > at java.base/java.lang.Class.getMethod(Class.java:2106) > at org.apache.calcite.linq4j.tree.Types.lookupMethod(Types.java:309) > at org.apache.calcite.util.BuiltInMethod.(BuiltInMethod.java:670) > at org.apache.calcite.util.BuiltInMethod.(BuiltInMethod.java:357) > at > org.apache.calcite.rel.metadata.BuiltInMetadata$PercentageOriginalRows.(BuiltInMetadata.java:344) > at > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows$RelMdPercentageOriginalRowsHandler.getDef(RelMdPercentageOriginalRows.java:231) > at > org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider.reflectiveSource(ReflectiveRelMetadataProvider.java:134) > at > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.(RelMdPercentageOriginalRows.java:42) > at > org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.(DefaultRelMetadataProvider.java:42) > at > org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.(DefaultRelMetadataProvider.java:28) > at org.apache.calcite.plan.RelOptCluster.(RelOptCluster.java:97) > at org.apache.calcite.plan.RelOptCluster.create(RelOptCluster.java:106) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$.create(FlinkRelOptClusterFactory.scala:36) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory.create(FlinkRelOptClusterFactory.scala) > at > org.apache.flink.table.planner.delegation.PlannerContext.(PlannerContext.java:132) > at > org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:121) > at > org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:65) > at > org.apache.flink.table.planner.loader.DelegatePlannerFactory.create(DelegatePlannerFactory.java:36) > at > org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:58) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:127) > at > org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122) > at > org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94) > at > org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.setup(KafkaTableTestBase.java:93) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on pull request #4: [hotfix] Fix unstable test of MongoSinkITCase.testRecovery
Jiabao-Sun commented on PR #4: URL: https://github.com/apache/flink-connector-mongodb/pull/4#issuecomment-1490804554 Hi @dannycranmer. Could you help review it when you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
[ https://issues.apache.org/jira/browse/FLINK-31676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707003#comment-17707003 ] Mason Chen commented on FLINK-31676: Yeah, [~stevenz3wu] who works on Icebergg and I discussed this long time back since it was a little cumbersome to change that number repeatedly. It could map to a fixed string, if we don't want to use guava directly and keep the package name from breaking (Iceberg and other libs do this too). > Pulsar connector should not rely on Flink Shaded > > > Key: FLINK-31676 > URL: https://issues.apache.org/jira/browse/FLINK-31676 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar >Reporter: Martijn Visser >Priority: Major > > The Pulsar connector currently depends on Flink Shaded for Guava. However, > externalized connectors must not rely on flink-shaded. This will just not be > possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-mongodb] Jiabao-Sun opened a new pull request, #4: [hotfix] Fix unstable test of MongoSinkITCase.testRecovery
Jiabao-Sun opened a new pull request, #4: URL: https://github.com/apache/flink-connector-mongodb/pull/4 Fix the CI fails https://github.com/apache/flink-connector-mongodb/actions/runs/4435527099/jobs/7782784066. > 2023-03-16T09:46:59.8097110Z 09:46:52,687 [Source: Sequence Source -> Map -> Map -> Sink: Writer (1/1)#7] ERROR org.apache.flink.connector.mongodb.sink.writer.MongoWriter [] - Bulk Write to MongoDB failed 2023-03-16T09:46:59.8098540Z com.mongodb.MongoBulkWriteException: Bulk write operation error on server localhost:32771. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: test_sink.test-recovery-mongo-sink index: _id_ dup key: { : 1 }', details={}}]. We use non-idempotent writes in this test case and may write some data before checkpointed. In that case we'll meet duplicate write error. Set `batchIntervalMs` and `batchSize` to -1 to force writes at checkpoint to make the test stable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
[ https://issues.apache.org/jira/browse/FLINK-31676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707000#comment-17707000 ] Martijn Visser commented on FLINK-31676: [~mason6345] Yeah I don’t see why that wouldn’t be possible. The problem that I ran into is that we were updating Guava in Flink Shaded and then seeing some tests fail in Flink, because the module is changing from guava30 to guava31 because of the version update. I think there are already other connectors that are using Guava directly too > Pulsar connector should not rely on Flink Shaded > > > Key: FLINK-31676 > URL: https://issues.apache.org/jira/browse/FLINK-31676 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar >Reporter: Martijn Visser >Priority: Major > > The Pulsar connector currently depends on Flink Shaded for Guava. However, > externalized connectors must not rely on flink-shaded. This will just not be > possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
[ https://issues.apache.org/jira/browse/FLINK-31676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706999#comment-17706999 ] Mason Chen commented on FLINK-31676: [~martijnvisser] I happen to see this. Would we still be able to use guava in external connectors? Ideally yes and maybe we need to change the shading process to make it possible to work against different Flink versions > Pulsar connector should not rely on Flink Shaded > > > Key: FLINK-31676 > URL: https://issues.apache.org/jira/browse/FLINK-31676 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Pulsar >Reporter: Martijn Visser >Priority: Major > > The Pulsar connector currently depends on Flink Shaded for Guava. However, > externalized connectors must not rely on flink-shaded. This will just not be > possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #557: [FLINK-31630] Limit max checkpoint age for last-state upgrade
morhidi commented on PR #557: URL: https://github.com/apache/flink-kubernetes-operator/pull/557#issuecomment-1490779906 > > @gyfora could you please shed some light on the benefits of this feature? > > If the checkpoint interval is high (lets say 1 hour) it can happen that we trigger a last state upgrade close to the end of the interval simply falling back to the last checkpoint. This would mean a very large backlog to process, probably unintentionally. > > This feature would allow us to automatically take a savepoint instead (or in Flink 1.17+ trigger a checkpoint) > > @gyfora could you please shed some light on the benefits of this feature? > > If the checkpoint interval is high (lets say 1 hour) it can happen that we trigger a last state upgrade close to the end of the interval simply falling back to the last checkpoint. This would mean a very large backlog to process, probably unintentionally. > > This feature would allow us to automatically take a savepoint instead (or in Flink 1.17+ trigger a checkpoint) Thanks for the clarification. I guess users are able to tune their checkpoint interval to avoid scenario we're trying to address with this feature. I like the idea of checkpoint upgrade mode though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31676) Pulsar connector should not rely on Flink Shaded
Martijn Visser created FLINK-31676: -- Summary: Pulsar connector should not rely on Flink Shaded Key: FLINK-31676 URL: https://issues.apache.org/jira/browse/FLINK-31676 Project: Flink Issue Type: Technical Debt Components: Connectors / Pulsar Reporter: Martijn Visser The Pulsar connector currently depends on Flink Shaded for Guava. However, externalized connectors must not rely on flink-shaded. This will just not be possible if we want them to work against different Flink versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-shaded] MartijnVisser merged pull request #119: Update Flink Shaded dependencies to the latest versions
MartijnVisser merged PR #119: URL: https://github.com/apache/flink-shaded/pull/119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22310: [hotfix][doc] Fix the incorrect description for TO_TIMESTAMP function
flinkbot commented on PR #22310: URL: https://github.com/apache/flink/pull/22310#issuecomment-1490712975 ## CI report: * c79a916f50ffad4a75998675c1bb0c6a4cf85795 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hackergin opened a new pull request, #22310: [hotfix][doc] Fix the incorrect syntax description for TO_TIMESTAMP function
hackergin opened a new pull request, #22310: URL: https://github.com/apache/flink/pull/22310 ## What is the purpose of the change The function TO_TIMESTAMP is independent of timezone. Therefore, we need to remove the misleading description that refers to it being 'under the UTC+0'. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31675) Deadlock in AWS Connectors following content-length AWS SDK exception
[ https://issues.apache.org/jira/browse/FLINK-31675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-31675: -- Fix Version/s: aws-connector-3.1.0 aws-connector-4.2.0 (was: 1.16.2) (was: 1.17.1) > Deadlock in AWS Connectors following content-length AWS SDK exception > - > > Key: FLINK-31675 > URL: https://issues.apache.org/jira/browse/FLINK-31675 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS >Affects Versions: 1.17.0, 1.16.1, 1.15.4 >Reporter: Antonio Vespoli >Priority: Major > Fix For: aws-connector-3.1.0, 1.15.5, aws-connector-4.2.0 > > > Connector calls to AWS services can hang on a canceled future following a > content-length mismatch that isn't handled gracefully by the SDK: > > {code:java} > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.FutureCancelledException: > java.io.IOException: Response had content-length of 31 bytes, but only > received 0 bytes before the connection was closed. > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$null$3(NettyRequestExecutor.java:136) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.io.IOException: Response had content-length of 31 bytes, but > only received 0 bytes before the connection was closed. > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.validateResponseContentLength(ResponseHandler.java:163) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$700(ResponseHandler.java:75) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:369) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.complete(HandlerPublisher.java:447) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.channelInactive(HandlerPublisher.java:430) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) > at > org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) > at > org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) > at > org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) > at > org.apache.flink.kinesis.shaded.io.netty.handler.
[GitHub] [flink] jeremy-degroot closed pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
jeremy-degroot closed pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness URL: https://github.com/apache/flink/pull/22309 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jeremy-degroot commented on pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
jeremy-degroot commented on PR #22309: URL: https://github.com/apache/flink/pull/22309#issuecomment-1490659748 PR moved to https://github.com/apache/flink-connector-kafka/pull/20 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] boring-cyborg[bot] commented on pull request #20: FLINK-29398 Provide rack ID to Kafka Source to take advantage of Rack Awareness
boring-cyborg[bot] commented on PR #20: URL: https://github.com/apache/flink-connector-kafka/pull/20#issuecomment-1490659193 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] jeremy-degroot opened a new pull request, #20: FLINK-29398 Provide rack ID to Kafka Source to take advantage of Rack Awareness
jeremy-degroot opened a new pull request, #20: URL: https://github.com/apache/flink-connector-kafka/pull/20 ## What is the purpose of the change This PR adds a new method to KafkaSourceBuilder that sets the `client.id` config for the KafkaSource to the value returned by the provided Supplier. It needs to be a Supplier because it needs to run on the TaskManager, and can't be determined at Job submit time like other configs. ## Brief change log - *Add setRackId to KafkaSourceBuilder* - *Plumb rackId into KafkaPartitionSplitReader* - *Add rack id tests* - *Document RackId feature* ## Verifying this change - *Added tests for the KafkaSplitReader that verify behaviors for null rackId Supplier, null and empty return values, and provided values.* - *Manually verified the change by running a 3-node cluster that covered two "racks" (AWS Availability Zones) against an Amazon MSK cluster.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs/Javadocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31675) Deadlock in AWS Connectors following content-length AWS SDK exception
[ https://issues.apache.org/jira/browse/FLINK-31675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antonio Vespoli updated FLINK-31675: Description: Connector calls to AWS services can hang on a canceled future following a content-length mismatch that isn't handled gracefully by the SDK: {code:java} org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.FutureCancelledException: java.io.IOException: Response had content-length of 31 bytes, but only received 0 bytes before the connection was closed. at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$null$3(NettyRequestExecutor.java:136) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: Response had content-length of 31 bytes, but only received 0 bytes before the connection was closed. at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.validateResponseContentLength(ResponseHandler.java:163) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$700(ResponseHandler.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:369) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.complete(HandlerPublisher.java:447) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.channelInactive(HandlerPublisher.java:430) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe$2.run(AbstractHttp2StreamChannel.java:737) ... 6 more {code} Related AWS SDK issue: [https://github.com/aws/aws-sdk-java-v2/issues/3335] AWS SDK fix: [https://github.com/aws/aws-sdk-java-v2/pull/3855/files] This mishandled exception creates a deadlock situation that prevents the connectors from making any progress. We should update the AWS SDK v2 to 2.20.32: [https://github.com/aws/aws-sdk-java-v2/commit/eb5619e24e4eaca6f80effa1c43c0cd409cdd53e] was: Connector calls to AWS services can hang on a canceled future following a content-length mismatch that isn't handled gracefully by the SDK: {code:java} org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.FutureCancelledException: java.io.
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #557: [FLINK-31630] Limit max checkpoint age for last-state upgrade
gyfora commented on PR #557: URL: https://github.com/apache/flink-kubernetes-operator/pull/557#issuecomment-1490595467 > @gyfora could you please shed some light on the benefits of this feature? If the checkpoint interval is high (lets say 1 hour) it can happen that we trigger a last state upgrade close to the end of the interval simply falling back to the last checkpoint. This would mean a very large backlog to process, probably unintentionally. This feature would allow us to automatically take a savepoint instead (or in Flink 1.17+ trigger a checkpoint) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27429) Implement Vertica JDBC Dialect
[ https://issues.apache.org/jira/browse/FLINK-27429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27429: --- Labels: pull-request-available (was: ) > Implement Vertica JDBC Dialect > -- > > Key: FLINK-27429 > URL: https://issues.apache.org/jira/browse/FLINK-27429 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: 1.16.0 >Reporter: Jasmin Redzepovic >Assignee: Jasmin Redzepovic >Priority: Minor > Labels: pull-request-available > > In order to use Vertica database as a JDBC source or sink, corresponding > dialect has to be implemented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] boring-cyborg[bot] commented on pull request #33: [FLINK-27429] Add Vertica JDBC dialect
boring-cyborg[bot] commented on PR #33: URL: https://github.com/apache/flink-connector-jdbc/pull/33#issuecomment-1490594577 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1153504166 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -627,14 +637,42 @@ public Map getClusterInfo(Configuration conf) throws Exception { .toSeconds(), TimeUnit.SECONDS); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } -return runtimeVersion; + +// JobManager resource usage can be deduced from the CR +var jmParameters = +new KubernetesJobManagerParameters( +conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +var jmTotalCpu = +jmParameters.getJobManagerCPU() +* jmParameters.getJobManagerCPULimitFactor() +* jmParameters.getReplicas(); +var jmTotalMemory = +Math.round( +jmParameters.getJobManagerMemoryMB() +* Math.pow(1024, 2) +* jmParameters.getJobManagerMemoryLimitFactor() +* jmParameters.getReplicas()); + +// TaskManager resource usage is best gathered from the REST API to get current replicas Review Comment: If fractional values are used for the CPU, there will be a difference between retrieving it from Flink REST and Kubernetes CR. Flink uses `Hardware.getNumberCPUCores()` under the hood to retrieve this value, not sure exactly how that works, but it's definitely an integer in the end :D This will lead to weird scenarios where if you have 3 JM and 3 TM replicas, all with `.5` CPU shares, the result will be `4.5` as total CPUs. An easy solution might be to just retrieve the number of TMs and multiply it with the CPU defined in the CR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #556: [FLINK-30575] Set processing capacity to infinite if task is idle
mxm commented on code in PR #556: URL: https://github.com/apache/flink-kubernetes-operator/pull/556#discussion_r1153487988 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java: ## @@ -251,12 +251,20 @@ private static double getAverage( ScalingMetric metric, JobVertexID jobVertexId, SortedMap>> metricsHistory) { -return StatUtils.mean( +double[] metricsValues = metricsHistory.values().stream() .map(m -> m.get(jobVertexId)) .filter(m -> m.containsKey(metric)) .mapToDouble(m -> m.get(metric)) .filter(d -> !Double.isNaN(d)) -.toArray()); +.toArray(); +for (double metricsValue : metricsValues) { +if (Double.isInfinite(metricsValue)) { +// As long as infinite values are present, we can't properly average. We need to +// wait until they are evicted. +return metricsValue; Review Comment: Actually, reverted. I'm not sure whether we should return positive infinity here. We could potentially have negative infinity values which would be removed here. Would just keep things as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19897: [FLINK-27885][tests][JUnit5 migration] flink-csv
snuyanzin commented on code in PR #19897: URL: https://github.com/apache/flink/pull/19897#discussion_r1153486137 ## flink-clients/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension: ## @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.util.TestLoggerExtension +org.junit.jupiter.api.extension.Extension Review Comment: This one I didn't get. Should it be changed within `flink-csv` changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #556: [FLINK-30575] Set processing capacity to infinite if task is idle
mxm commented on code in PR #556: URL: https://github.com/apache/flink-kubernetes-operator/pull/556#discussion_r1153480322 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java: ## @@ -251,12 +251,20 @@ private static double getAverage( ScalingMetric metric, JobVertexID jobVertexId, SortedMap>> metricsHistory) { -return StatUtils.mean( +double[] metricsValues = metricsHistory.values().stream() .map(m -> m.get(jobVertexId)) .filter(m -> m.containsKey(metric)) .mapToDouble(m -> m.get(metric)) .filter(d -> !Double.isNaN(d)) -.toArray()); +.toArray(); +for (double metricsValue : metricsValues) { +if (Double.isInfinite(metricsValue)) { +// As long as infinite values are present, we can't properly average. We need to +// wait until they are evicted. +return metricsValue; Review Comment: Changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements
zentol commented on code in PR #22296: URL: https://github.com/apache/flink/pull/22296#discussion_r1153479180 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() { } } +@Override +public CompletableFuture requestJobResourceRequirements(JobID jobId) { +return performOperationOnJobMasterGateway( +jobId, JobMasterGateway::requestJobResourceRequirements); +} + +@Override +public CompletableFuture updateJobResourceRequirements( +JobID jobId, JobResourceRequirements jobResourceRequirements) { +if (!pendingJobResourceRequirementsUpdates.add(jobId)) { +return FutureUtils.completedExceptionally( +new ConcurrentModificationException( +"Another update to the job [%s] resource requirements is in progress.")); Review Comment: > How about throwing RestHandlerException here, maybe with code 409 Conflict? good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-shaded] zentol commented on a diff in pull request #119: Update Flink Shaded dependencies to the latest versions
zentol commented on code in PR #119: URL: https://github.com/apache/flink-shaded/pull/119#discussion_r1153476454 ## flink-shaded-netty-4/src/main/resources/META-INF/NOTICE: ## @@ -6,36 +6,37 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- io.netty:netty-all:4.1.82.Final -- io.netty:netty-buffer:4.1.82.Final -- io.netty:netty-codec:4.1.82.Final -- io.netty:netty-codec-dns:4.1.82.Final -- io.netty:netty-codec-haproxy:4.1.82.Final -- io.netty:netty-codec-http:4.1.82.Final -- io.netty:netty-codec-http2:4.1.82.Final -- io.netty:netty-codec-memcache:4.1.82.Final -- io.netty:netty-codec-mqtt:4.1.82.Final -- io.netty:netty-codec-redis:4.1.82.Final -- io.netty:netty-codec-smtp:4.1.82.Final -- io.netty:netty-codec-socks:4.1.82.Final -- io.netty:netty-codec-stomp:4.1.82.Final -- io.netty:netty-codec-xml:4.1.82.Final -- io.netty:netty-common:4.1.82.Final -- io.netty:netty-handler:4.1.82.Final -- io.netty:netty-handler-proxy:4.1.82.Final -- io.netty:netty-resolver:4.1.82.Final -- io.netty:netty-resolver-dns:4.1.82.Final -- io.netty:netty-resolver-dns-classes-macos:4.1.82.Final -- io.netty:netty-resolver-dns-native-macos:osx-x86_64:4.1.82.Final -- io.netty:netty-resolver-dns-native-macos:osx-aarch_64:4.1.82.Final -- io.netty:netty-transport:4.1.82.Final -- io.netty:netty-transport-classes-epoll:4.1.82.Final -- io.netty:netty-transport-classes-kqueue:4.1.82.Final -- io.netty:netty-transport-native-epoll:linux-x86_64:4.1.82.Final -- io.netty:netty-transport-native-epoll:linux-aarch_64:4.1.82.Final -- io.netty:netty-transport-native-kqueue:osx-x86_64:4.1.82.Final -- io.netty:netty-transport-native-kqueue:osx-aarch_64:4.1.82.Final -- io.netty:netty-transport-native-unix-common:4.1.82.Final -- io.netty:netty-transport-rxtx:4.1.82.Final -- io.netty:netty-transport-sctp:4.1.82.Final -- io.netty:netty-transport-udt:4.1.82.Final \ No newline at end of file +- io.netty:netty-all:4.1.90.Final +- io.netty:netty-buffer:4.1.90.Final +- io.netty:netty-codec-dns:4.1.90.Final Review Comment: > The Python failure is caused by Pulsar, which uses flink-shaded but that still points to guava30 instead of guava31. I don't think that's a blocker for this PR, but a necessary update that needs to happen in Pulsar and then in PyFlink to point to the new Pulsar artifact. The externalized connectors must not rely on flink-shaded. This will just not be possible if we want them to work against different Flink versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19897: [FLINK-27885][tests][JUnit5 migration] flink-csv
snuyanzin commented on code in PR #19897: URL: https://github.com/apache/flink/pull/19897#discussion_r1153475324 ## flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java: ## @@ -75,7 +76,7 @@ public void testCsvFormatStatsReportWithSingleFile() throws Exception { } @Test -public void testCsvFormatStatsReportWithMultiFile() throws Exception { +void testCsvFormatStatsReportWithMultiFile() throws Exception { // insert data and get statistics. DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType(); tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await(); Review Comment: this is not related to your changes however it relates to assertj. In the next line ```java assertThat(folder.listFiles()).isNotNull().hasSize(1); ``` `isNotNull()` is redundant check since same check is done inside `hasSize` Also it's better to replace `assert files != null;` with assertj's version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19897: [FLINK-27885][tests][JUnit5 migration] flink-csv
snuyanzin commented on code in PR #19897: URL: https://github.com/apache/flink/pull/19897#discussion_r1153473868 ## flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java: ## @@ -61,7 +62,7 @@ protected String[] properties() { } @Test -public void testCsvFormatStatsReportWithSingleFile() throws Exception { +void testCsvFormatStatsReportWithSingleFile() throws Exception { // insert data and get statistics. DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType(); tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await(); Review Comment: this is not related to your changes however it relates to assertj. In the next line ```java assertThat(folder.listFiles()).isNotNull().hasSize(1); ``` `isNotNull()` is redundant check since same check is done inside `hasSize` Also it's better to replace `assert files != null;` with assertj's version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-shaded] zentol commented on a diff in pull request #119: Update Flink Shaded dependencies to the latest versions
zentol commented on code in PR #119: URL: https://github.com/apache/flink-shaded/pull/119#discussion_r1153473283 ## flink-shaded-netty-4/src/main/resources/META-INF/NOTICE: ## @@ -6,36 +6,37 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- io.netty:netty-all:4.1.82.Final -- io.netty:netty-buffer:4.1.82.Final -- io.netty:netty-codec:4.1.82.Final -- io.netty:netty-codec-dns:4.1.82.Final -- io.netty:netty-codec-haproxy:4.1.82.Final -- io.netty:netty-codec-http:4.1.82.Final -- io.netty:netty-codec-http2:4.1.82.Final -- io.netty:netty-codec-memcache:4.1.82.Final -- io.netty:netty-codec-mqtt:4.1.82.Final -- io.netty:netty-codec-redis:4.1.82.Final -- io.netty:netty-codec-smtp:4.1.82.Final -- io.netty:netty-codec-socks:4.1.82.Final -- io.netty:netty-codec-stomp:4.1.82.Final -- io.netty:netty-codec-xml:4.1.82.Final -- io.netty:netty-common:4.1.82.Final -- io.netty:netty-handler:4.1.82.Final -- io.netty:netty-handler-proxy:4.1.82.Final -- io.netty:netty-resolver:4.1.82.Final -- io.netty:netty-resolver-dns:4.1.82.Final -- io.netty:netty-resolver-dns-classes-macos:4.1.82.Final -- io.netty:netty-resolver-dns-native-macos:osx-x86_64:4.1.82.Final -- io.netty:netty-resolver-dns-native-macos:osx-aarch_64:4.1.82.Final -- io.netty:netty-transport:4.1.82.Final -- io.netty:netty-transport-classes-epoll:4.1.82.Final -- io.netty:netty-transport-classes-kqueue:4.1.82.Final -- io.netty:netty-transport-native-epoll:linux-x86_64:4.1.82.Final -- io.netty:netty-transport-native-epoll:linux-aarch_64:4.1.82.Final -- io.netty:netty-transport-native-kqueue:osx-x86_64:4.1.82.Final -- io.netty:netty-transport-native-kqueue:osx-aarch_64:4.1.82.Final -- io.netty:netty-transport-native-unix-common:4.1.82.Final -- io.netty:netty-transport-rxtx:4.1.82.Final -- io.netty:netty-transport-sctp:4.1.82.Final -- io.netty:netty-transport-udt:4.1.82.Final \ No newline at end of file +- io.netty:netty-all:4.1.90.Final +- io.netty:netty-buffer:4.1.90.Final +- io.netty:netty-codec-dns:4.1.90.Final Review Comment: > On that note: Why do we keep a separate curator dependency for tests? Is it because we don't necessarily need zookeeper in certain modules and want to keep it more light-weight by relying on curator's TestingZooKeeperServer implementation? Zookeeper is guaranteed to be available in any module that use curator. If you think these tests can be easier/better tested by working directly against Zookeeper feel free to open a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements
rkhachatryan commented on code in PR #22296: URL: https://github.com/apache/flink/pull/22296#discussion_r1153443702 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() { } } +@Override +public CompletableFuture requestJobResourceRequirements(JobID jobId) { +return performOperationOnJobMasterGateway( +jobId, JobMasterGateway::requestJobResourceRequirements); +} + +@Override +public CompletableFuture updateJobResourceRequirements( +JobID jobId, JobResourceRequirements jobResourceRequirements) { +if (!pendingJobResourceRequirementsUpdates.add(jobId)) { +return FutureUtils.completedExceptionally( +new ConcurrentModificationException( +"Another update to the job [%s] resource requirements is in progress.")); Review Comment: The error handling now is a bit inconsistent: - here we throw a regular `ConcurrentModificationException` (so http client will get HTTP/500?) - in `validateMaxParallelism`, we throw `RestHandlerException` How about throwing `RestHandlerException` here, maybe with code `409 Conflict`? ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() { } } +@Override +public CompletableFuture requestJobResourceRequirements(JobID jobId) { +return performOperationOnJobMasterGateway( +jobId, JobMasterGateway::requestJobResourceRequirements); +} + +@Override +public CompletableFuture updateJobResourceRequirements( +JobID jobId, JobResourceRequirements jobResourceRequirements) { +if (!pendingJobResourceRequirementsUpdates.add(jobId)) { +return FutureUtils.completedExceptionally( +new ConcurrentModificationException( +"Another update to the job [%s] resource requirements is in progress.")); +} +return performOperationOnJobMasterGateway( +jobId, JobMasterGateway::getMaxParallelismPerVertex) +.thenAccept( +maxParallelismPerJobVertex -> +validateMaxParallelism( +jobResourceRequirements, maxParallelismPerJobVertex)) +.thenRunAsync( +() -> { +try { +jobGraphWriter.putJobResourceRequirements( +jobId, jobResourceRequirements); +} catch (Exception e) { +throw new CompletionException( +"The resource requirements could not be persisted and have not been applied. Please retry.", +e); +} +}, +ioExecutor) +.thenComposeAsync( +ignored -> +performOperationOnJobMasterGateway( +jobId, +jobMasterGateway -> + jobMasterGateway.updateJobResourceRequirements( + jobResourceRequirements)), +getMainThreadExecutor()) +.whenComplete((ack, error) -> pendingJobResourceRequirementsUpdates.remove(jobId)); Review Comment: **nit**: log error on debug level? ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -1164,6 +1153,189 @@ public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception { .hasMessage("Test exception.")); } +@Test +public void testInvalidResourceRequirementsUpdate() throws Exception { +dispatcher = +createAndStartDispatcher( +heartbeatServices, +haServices, +JobMasterServiceLeadershipRunnerFactory.INSTANCE); +final DispatcherGateway dispatcherGateway = +dispatcher.getSelfGateway(DispatcherGateway.class); +jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + +dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + +// We can try updating the JRR once the scheduler has been started. +CommonTestUtils.waitUntilCondition( +() -> { +final JobStatus status = +dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(); +// need to check for C
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #557: [FLINK-31630] Limit max checkpoint age for last-state upgrade
morhidi commented on code in PR #557: URL: https://github.com/apache/flink-kubernetes-operator/pull/557#discussion_r1153435509 ## docs/layouts/shortcodes/generated/dynamic_section.html: ## @@ -80,6 +80,12 @@ Boolean Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled. + + kubernetes.operator.job.upgrade.last-state.checkpoint.max.age Review Comment: `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age` reflects better the intention -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #557: [FLINK-31630] Limit max checkpoint age for last-state upgrade
morhidi commented on PR #557: URL: https://github.com/apache/flink-kubernetes-operator/pull/557#issuecomment-1490500643 @gyfora could you please shed some light on the benefits of this feature? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser commented on pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
MartijnVisser commented on PR #22309: URL: https://github.com/apache/flink/pull/22309#issuecomment-1490467770 > @MartijnVisser just to be clear, you're recommending I close this PR, and implement my change against https://github.com/apache/flink-connector-kafka/pulls? Yes, given that we won't merge this anymore into `master` since the code will be removed from that branch shortly. All development for the Flink Kafka connector will continue in that dedicated repo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
flinkbot commented on PR #22309: URL: https://github.com/apache/flink/pull/22309#issuecomment-1490466273 ## CI report: * 879e0a66a7fda5c5a50c38568945f315340217f0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jeremy-degroot commented on pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
jeremy-degroot commented on PR #22309: URL: https://github.com/apache/flink/pull/22309#issuecomment-1490464873 @MartijnVisser just to be clear, you're recommending I close this PR, and implement my change against https://github.com/apache/flink-connector-kafka/pulls? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser commented on pull request #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
MartijnVisser commented on PR #22309: URL: https://github.com/apache/flink/pull/22309#issuecomment-1490451791 @jeremy-degroot Thanks for the PR, it will be good to move this to apache/flink-connector-kafkak:main - See https://lists.apache.org/thread/s85o6ym545tbl2cwv228wxbhyynp6g14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29398) Utilize Rack Awareness in Flink Consumer
[ https://issues.apache.org/jira/browse/FLINK-29398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29398: --- Labels: pull-request-available (was: ) > Utilize Rack Awareness in Flink Consumer > > > Key: FLINK-29398 > URL: https://issues.apache.org/jira/browse/FLINK-29398 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Jeremy DeGroot >Assignee: Jeremy DeGroot >Priority: Major > Labels: pull-request-available > > [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] > was implemented some time ago in Kafka. This allows brokers and consumers to > communicate about the rack (or AWS Availability Zone) they're located in. > Reading from a local broker can save money in bandwidth and improve latency > for your consumers. > Flink Kafka consumers currently cannot easily use rack awareness if they're > deployed across multiple racks or availability zones, because they have no > control over which rack the Task Manager they'll be assigned to may be in. > This improvement proposes that a Kafka Consumer could be configured with a > callback or Future that could be run when it's being configured on the task > manager, that will set the appropriate value at runtime if a value is > provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] jeremy-degroot opened a new pull request, #22309: FLINK-29398 [Kafka Source] Provide rack ID to Kafka Source to take advantage of Rack Awareness
jeremy-degroot opened a new pull request, #22309: URL: https://github.com/apache/flink/pull/22309 ## What is the purpose of the change This PR adds a new method to KafkaSourceBuilder that sets the `client.id` config for the KafkaSource to the value returned by the provided Supplier. It needs to be a Supplier because it needs to run on the TaskManager, and can't be determined at Job submit time like other configs. ## Brief change log - *Add setRackId to KafkaSourceBuilder* - *Plumb rackId into KafkaPartitionSplitReader* - *Add rack id tests* - *Document RackId feature* ## Verifying this change - *Added tests for the KafkaSplitReader that verify behaviors for null rackId Supplier, null and empty return values, and provided values.* - *Manually verified the change by running a 3-node cluster that covered two "racks" (AWS Availability Zones) against an Amazon MSK cluster.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs/Javadocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org