[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: I considered that too, indeed that was my initial plan: - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply returns `null` regardless of the input element - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines of your suggestion In the end, I didn't do so because it seems simpler and at the same time more realistic to me having the current test (`testIncreasingRecordBasedCounters`) check that we get the counting right when interleaving both valid and invalid elements (using the same `DummyRecordSerializer`), as opposed to checking those cases in isolation. Also, the new, separate, test would essentially be a clone of the existing one, plus require yet another`createWriterWithConfiguration` method/override that considers the new dummy serializer instead. In summary, I think `testIncreasingRecordBasedCounters` is actually a good spot for checking the new functionality. Having said that, we can of course create a separate test if my arguments are not convincing enough. Whatever we decide! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
leonardBang commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009076348 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: Thanks @salvalcantara for the detail explanation, Adding to current test could check the metrics and checks the writing null ProduceRecord as well, it makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: I considered that too, indeed that was my initial plan: - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply returns `null` regardless of the input element - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines of your suggestion In the end, I didn't do so because it seems simpler and at the same time more realistic to me having the current test (`testIncreasingRecordBasedCounters`) check that we get the counting right when interleaving both valid and invalid elements (using the same `DummyRecordSerializer`), as opposed to checking those cases in isolation. Also, the new, separate, test would essentially be a clone of the existing one, plus require yet another`createWriterWithConfiguration` method/override that considers the new dummy serializer instead. In summary, I think `testIncreasingRecordBasedCounters` is actually a good spot for checking the new functionality. Having said that, we can of course create a separate test if my arguments are not convincing enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: I considered that too, indeed that was my initial plan: - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply returns `null` regardless of the input element - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines of your suggestion In the end, I didn't do so because it seems simpler and at the same time more realistic to me having the current test (`testIncreasingRecordBasedCounters`) check that we get the counting right when interleaving both valid and invalid elements (using the same `DummyRecordSerializer`), as opposed to doing so in isolation. Also, the new, separate, test would essentially be a clone of the existing one, plus require yet another`createWriterWithConfiguration` method/override that considers the new dummy serializer instead. In summary, I think `testIncreasingRecordBasedCounters` is actually a good spot for checking the new functionality. Having said that, we can of course create a separate test if my arguments are not convincing enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: I considered that too, indeed that was my initial plan: - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply returns `null` regardless of the input element - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines of your suggestion In the end, I didn't do so because it seems simpler and at the same time more realistic to me having the current test (`testIncreasingRecordBasedCounters`) check that we get the counting right when interleaving both valid and invalid elements (using the same `DummyRecordSerializer`), as opposed to doing so in isolation. Also note that new separate test would essentially be a clone of the existing one, plus require yet another`createWriterWithConfiguration` method/override that considers the new dummy serializer instead. In summary, I think `testIncreasingRecordBasedCounters` is actually a good spot for checking the new functionality. Having said that, we can of course create a separate test if my arguments are not convincing enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29788) StatefulJobWBroadcastStateMigrationITCase failed in native savepoints
[ https://issues.apache.org/jira/browse/FLINK-29788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626426#comment-17626426 ] Hangxiang Yu commented on FLINK-29788: -- I have commited a pr to fix it in the test. I think the release processes could be improved here. We could update the FlinkVersion of these classes finally, but the migration test should be executed when we prepare for the release. It could help us to find the migration exception earily. (Some exceptions maybe not so simple) WDYT? [~hxbks2ks] > StatefulJobWBroadcastStateMigrationITCase failed in native savepoints > - > > Key: FLINK-29788 > URL: https://issues.apache.org/jira/browse/FLINK-29788 > Project: Flink > Issue Type: Bug > Components: Release System, Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: pull-request-available > Attachments: image-2022-10-28-11-18-45-471.png > > > !image-2022-10-28-11-18-45-471.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: I considered that too, indeed that was my initial plan: - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply returns `null` regardless of the input element - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines of your suggestion In the end, I didn't do so because it seems simpler and at the same time more realistic to me having the current test (`testIncreasingRecordBasedCounters`) check that we get the counting right when interleaving both valid and invalid elements (using the same `NullRecordSerializer`), as opposed to doing so in isolation. Also note that new separate test would essentially be a clone of the existing one, plus require yet another`createWriterWithConfiguration` method/override that considers the new dummy serializer instead. In summary, I think `testIncreasingRecordBasedCounters` is actually a good spot for checking the new functionality. Having said that, we can of course create a separate test if my arguments are not convincing enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626425#comment-17626425 ] Jark Wu commented on FLINK-29801: - I think this is a nice feature. [~zhuzh] what do you think? > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29458) When two tables have the same field, do not specify the table name,Exception will be thrown:SqlValidatorException :Column 'currency' is ambiguous
[ https://issues.apache.org/jira/browse/FLINK-29458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-29458. --- Fix Version/s: 1.17.0 Resolution: Fixed Fixed in master: 97fbb701314205fd1d51d7edb1f6ef7a27f880c7 > When two tables have the same field, do not specify the table name,Exception > will be thrown:SqlValidatorException :Column 'currency' is ambiguous > - > > Key: FLINK-29458 > URL: https://issues.apache.org/jira/browse/FLINK-29458 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / API >Affects Versions: 1.14.4 >Reporter: ZuoYan >Assignee: ZuoYan >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: image-2022-09-28-21-00-01-302.png, > image-2022-09-28-21-00-09-054.png, image-2022-09-28-21-00-22-733.png > > > When two tables are join, the two tables have the same field. When querying > select, an exception will be thrown if the table name is not specified > exception content > Column 'currency' is ambiguous。 > !image-2022-09-28-21-00-22-733.png! > > !image-2022-09-28-21-00-01-302.png! > !image-2022-09-28-21-00-09-054.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20983: [FLINK-29458][docs] When two tables have the same field, do not speci…
wuchong merged PR #20983: URL: https://github.com/apache/flink/pull/20983 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on pull request #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint
masteryhx commented on PR #21196: URL: https://github.com/apache/flink/pull/21196#issuecomment-1296601937 @HuangXingBo Could you also help to take a look ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21197: [FLINK-29801] OperatorCoordinator need open the way to operate metric…
flinkbot commented on PR #21197: URL: https://github.com/apache/flink/pull/21197#issuecomment-1296601625 ## CI report: * beeb3741a5b0f156aa2ae30f595e7355ea2b5e4d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626423#comment-17626423 ] yuemeng commented on FLINK-29801: - [~danny0405] Can you review this improvement, we need this to report some metrics in stream write coordinator > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint
flinkbot commented on PR #21196: URL: https://github.com/apache/flink/pull/21196#issuecomment-1296596494 ## CI report: * 72aee69358c17d1e00e921b36a0119716a17b556 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626419#comment-17626419 ] yuemeng commented on FLINK-29801: - [~jark] [~rmetzger] Can you review this improvement, because we really need it in Hudi to report some metric > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] masteryhx commented on pull request #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint
masteryhx commented on PR #21196: URL: https://github.com/apache/flink/pull/21196#issuecomment-1296593973 @klion26 @Myasuka Could you help to take a look ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29801: --- Labels: pull-request-available (was: ) > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] JerryYue-M opened a new pull request, #21197: [FLINK-29801] OperatorCoordinator need open the way to operate metric…
JerryYue-M opened a new pull request, #21197: URL: https://github.com/apache/flink/pull/21197 …Group interface ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 closed pull request #168: [hotfix] Remove redundant Internal annotations from BroadcastUtils
zhipeng93 closed pull request #168: [hotfix] Remove redundant Internal annotations from BroadcastUtils URL: https://github.com/apache/flink-ml/pull/168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on pull request #168: [hotfix] Remove redundant Internal annotations from BroadcastUtils
zhipeng93 commented on PR #168: URL: https://github.com/apache/flink-ml/pull/168#issuecomment-1296590671 Thanks for the PR. As we discussed in [1] and @jiangxin369 has already resolved it in #162, I am closing it. [1] https://github.com/apache/flink-ml/pull/162#discussion_r1007628054 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29802) ChangelogStateBackend supports native savepoint
Hangxiang Yu created FLINK-29802: Summary: ChangelogStateBackend supports native savepoint Key: FLINK-29802 URL: https://issues.apache.org/jira/browse/FLINK-29802 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Hangxiang Yu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
yuemeng created FLINK-29801: --- Summary: OperatorCoordinator need open the way to operate metricGroup interface Key: FLINK-29801 URL: https://issues.apache.org/jira/browse/FLINK-29801 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: yuemeng Currently, We have no way to get metric group instances in OperatorCoordinator In some cases, we may report some metric in OperatorCoordinator such as Flink hudi integrate scene, some meta will send to operator coordinator to commit to hdfs or hms but we also need to report some metrics in operator coordinator for monitor purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29788) StatefulJobWBroadcastStateMigrationITCase failed in native savepoints
[ https://issues.apache.org/jira/browse/FLINK-29788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29788: --- Labels: pull-request-available (was: ) > StatefulJobWBroadcastStateMigrationITCase failed in native savepoints > - > > Key: FLINK-29788 > URL: https://issues.apache.org/jira/browse/FLINK-29788 > Project: Flink > Issue Type: Bug > Components: Release System, Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: pull-request-available > Attachments: image-2022-10-28-11-18-45-471.png > > > !image-2022-10-28-11-18-45-471.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] masteryhx opened a new pull request, #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint
masteryhx opened a new pull request, #21196: URL: https://github.com/apache/flink/pull/21196 ## What is the purpose of the change ChangelogStateBackend doesn't support native savepoint currently which is known, so we just disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint ## Brief change log - Make SnapshotMigrationTestBase#executeAndSnapshot disbale changelog when triggering native savepoint. ## Verifying this change This change is already covered by existing tests, such as TypeSerializerSnapshotMigrationITCase/StatefulJobWBroadcastStateMigrationITCase. (need to modify FlinkVersion and ExecutionMode which will be modified by release manager later). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-29800) Continuous failover will leak the inprogress output file
[ https://issues.apache.org/jira/browse/FLINK-29800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626415#comment-17626415 ] luoyuxia edited comment on FLINK-29800 at 10/31/22 6:12 AM: What's you idea for cleaning up these files? From my sides, it may be not a trivial work. was (Author: luoyuxia): What's you idea for cleaning up these files? > Continuous failover will leak the inprogress output file > > > Key: FLINK-29800 > URL: https://issues.apache.org/jira/browse/FLINK-29800 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Reporter: Aitozi >Priority: Major > > When running job which sink to the file system, the inprogress files will > keep growing when job keeps failover, it will do harm to the filesystem. I > think the clean up to the file which is currently written to should be > performed when job failing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] zhipeng93 merged pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles
zhipeng93 merged PR #162: URL: https://github.com/apache/flink-ml/pull/162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles
zhipeng93 commented on PR #162: URL: https://github.com/apache/flink-ml/pull/162#issuecomment-1296585869 Thanks for the update. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29800) Continuous failover will leak the inprogress output file
[ https://issues.apache.org/jira/browse/FLINK-29800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626415#comment-17626415 ] luoyuxia commented on FLINK-29800: -- What's you idea for cleaning up these files? > Continuous failover will leak the inprogress output file > > > Key: FLINK-29800 > URL: https://issues.apache.org/jira/browse/FLINK-29800 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Reporter: Aitozi >Priority: Major > > When running job which sink to the file system, the inprogress files will > keep growing when job keeps failover, it will do harm to the filesystem. I > think the clean up to the file which is currently written to should be > performed when job failing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009057372 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: @leonardBang I would also update this companion note... ```suggestion // but elements for which a non-null producer record is returned should count ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009057372 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: @leonardBang I would also update this companion note ```suggestion // but elements for which a non-null producer record is returned should count ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #21185: [FLINK-28643][runtime-web] HistoryServer support lazy unzip
reswqa commented on code in PR #21185: URL: https://github.com/apache/flink/pull/21185#discussion_r1008996802 ## flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java: ## @@ -143,5 +143,16 @@ public class HistoryServerOptions { code("IllegalConfigurationException")) .build()); +public static final ConfigOption HISTORY_SERVER_CACHED_JOBS = +key("historyserver.archive.cached-jobs") +.intType() +.defaultValue(500) +.withDescription( Review Comment: Why not use `withDescription(String description)` directly. ## flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java: ## @@ -30,12 +31,33 @@ import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the HistoryServerStaticFileServerHandler. */ class HistoryServerStaticFileServerHandlerTest { +@Test +void testExtractJobId() { Review Comment: we should migrate tests involved in this pr to JUnit5 and AssertJ. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
leonardBang commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009056433 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped Review Comment: > Agreed. Maybe something along these lines? Yes, this is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009054195 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped Review Comment: Agreed. Maybe something along these lines @leonardBang ? ```suggestion // elements for which the serializer returns null should be silently skipped ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing
salvalcantara commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1009054195 ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped Review Comment: Agreed. Maybe something along these lines? ```suggestion // elements for which the serializer returns null should be silently skipped ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
libenchao commented on code in PR #20140: URL: https://github.com/apache/flink/pull/20140#discussion_r1009034445 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionDefaultVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +/** + * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot + * push down the filter. + */ +@Experimental +public class JdbcFilterPushdownPreparedStatementVisitor +extends ExpressionDefaultVisitor> { + +private Function quoteIdentifierFunction; + +public JdbcFilterPushdownPreparedStatementVisitor( +Function quoteIdentifierFunction) { +this.quoteIdentifierFunction = quoteIdentifierFunction; +} + +@Override +public Optional visit(CallExpression call) { +if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<>", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("OR", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("AND", call.getResolvedChildren()); +} + +return Optional.empty(); +} + +private Optional renderBinaryOperator( +String operator, List allOperands) { +Optional leftOperandString = allOperands.get(0).accept(this); + +Optional rightOperandString = allOperands.get(1).accept(this); + +return leftOperandString.flatMap( +left -> rightOperandString.map(right -> left.combine(operator, right))); +} + +@Override +public Optional visit(ValueLiteralExpression litExp) { +LogicalType tpe = litExp.getOutputDataType().getLogicalType(); +Serializable[] params = new Serializable[1]; + +ParameterizedPredicate predicate = new ParameterizedPredicate("?"); +switch (tpe.getTypeRoot()) { Review Comment: 1. could you sort out the case branches, similar to `LogicalTypeRoot` 2. there are still some left types, such as `CHAR`, `SMALLINT`, `TINYINT`, I would suggest that we add these types as much as possible, unless we cannot for now. ##
[jira] [Created] (FLINK-29800) Continuous failover will leak the inprogress output file
Aitozi created FLINK-29800: -- Summary: Continuous failover will leak the inprogress output file Key: FLINK-29800 URL: https://issues.apache.org/jira/browse/FLINK-29800 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Reporter: Aitozi When running job which sink to the file system, the inprogress files will keep growing when job keeps failover, it will do harm to the filesystem. I think the clean up to the file which is currently written to should be performed when job failing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29609) Clean up jobmanager deployment on suspend after recording savepoint info
[ https://issues.apache.org/jira/browse/FLINK-29609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626372#comment-17626372 ] Hector Miuler Malpica Gallegos commented on FLINK-29609: [~sriramgr] In my opinion, this should only happen in application mode, in session mode it should continue to exist waiting for a new job. > Clean up jobmanager deployment on suspend after recording savepoint info > > > Key: FLINK-29609 > URL: https://issues.apache.org/jira/browse/FLINK-29609 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Sriram Ganesh >Priority: Major > Fix For: kubernetes-operator-1.3.0 > > > Currently in case of suspending with savepoint. The jobmanager pod will > linger there forever after cancelling the job. > This is currently used to ensure consistency in case the > operator/cancel-with-savepoint operation fails. > Once we are sure however that the savepoint has been recorded and the job is > shut down, we should clean up all the resources. Optionally we can make this > configurable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28539) Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED
[ https://issues.apache.org/jira/browse/FLINK-28539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626370#comment-17626370 ] Yanfei Lei commented on FLINK-28539: [~usamj] Do you want to fix this? (I'd like to open a PR to address this. > Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED > - > > Key: FLINK-28539 > URL: https://issues.apache.org/jira/browse/FLINK-28539 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Usamah Jassat >Priority: Minor > > Investigating the RocksDB predefined options I see that > `setLevelCompactionDynamicLevelBytes` is set for SPINNING_DISK options but > not FLASH_SSD_OPTIMIZED. > > From my research it looks like this change would improve the Space > Amplification of RocksDB [1] (which can also lead to a trade-off from > read/write amplification [2]). It makes sense to me that this feature should > be enabled for SSD's as they tend to have less space compared to their HDD > counterparts. > There is also an argument to be made to also disable it for SPINNING_DISK > options as it could give increased read/write performance [2] > [1] [http://rocksdb.org/blog/2015/07/23/dynamic-level.html] > [2] > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#amplification-factors] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28539) Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED
[ https://issues.apache.org/jira/browse/FLINK-28539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566618#comment-17566618 ] Yanfei Lei edited comment on FLINK-28539 at 10/31/22 4:20 AM: -- hi [~usamj] , +1 for enabling {{LevelCompactionDynamicLevelBytes}} for FLASH_SSD_OPTIMIZED. And now it's also possible to use {{LevelCompactionDynamicLevelBytes}} by configuring "RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE: true" manually. {quote}There is also an argument to be made to also disable it for SPINNING_DISK options as it could give increased read/write performance [2] {quote} As the doc says, "Spinning disks usually provide much lower random read throughput than flash. If you use level-based compaction, use options.level_compaction_dynamic_level_bytes=true." I don't see it suggesting disabling {{LevelCompactionDynamicLevelBytes?}} was (Author: yanfei lei): hi [~usamj] , you can use `setLevelCompactionDynamicLevelBytes` for FLASH_SSD_OPTIMIZED by configuring "RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE: true" manually. {quote}There is also an argument to be made to also disable it for SPINNING_DISK options as it could give increased read/write performance [2] {quote} Currently, the default value of PREDEFINED_OPTIONS is [empty in Flink|https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java#L77-L90], users can choose whether to open according to their own situation. > Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED > - > > Key: FLINK-28539 > URL: https://issues.apache.org/jira/browse/FLINK-28539 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Usamah Jassat >Priority: Minor > > Investigating the RocksDB predefined options I see that > `setLevelCompactionDynamicLevelBytes` is set for SPINNING_DISK options but > not FLASH_SSD_OPTIMIZED. > > From my research it looks like this change would improve the Space > Amplification of RocksDB [1] (which can also lead to a trade-off from > read/write amplification [2]). It makes sense to me that this feature should > be enabled for SSD's as they tend to have less space compared to their HDD > counterparts. > There is also an argument to be made to also disable it for SPINNING_DISK > options as it could give increased read/write performance [2] > [1] [http://rocksdb.org/blog/2015/07/23/dynamic-level.html] > [2] > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#amplification-factors] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on pull request #21137: [FLINK-29234][runtime] JobMasterServiceLeadershipRunner handle leader event in a separate executor to avoid dead lock
reswqa commented on PR #21137: URL: https://github.com/apache/flink/pull/21137#issuecomment-1296507911 > We might also want to check the other `LeaderContender` implementations (as you already mentioned, `ResourceManagerImpl` does not suffer from this bug but the others might because they don't use this `handleLeaderEventHandler` pattern. @XComp I totally agree that we should also check the other `LeaderContender` to ensure they are not suffer from this. However, from my point of view, in consideration of limiting the scope of this pull request, I suggest that only the `JobMasterServiceLeadershipRunner` be tested in this PR. For other `LeaderContender`, I will open a separate ticket for tracking. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29639) Add ResourceId in TransportException for debugging
[ https://issues.apache.org/jira/browse/FLINK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626366#comment-17626366 ] Xintong Song commented on FLINK-29639: -- Hi [~Jiangang], is there any updates on this issue? > Add ResourceId in TransportException for debugging > --- > > Key: FLINK-29639 > URL: https://issues.apache.org/jira/browse/FLINK-29639 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Liu >Assignee: Liu >Priority: Major > > When the taskmanager is lost, only the host and port are shown in the > exception. It is hard to find the exactly taskmanger by resourceId. Add > ResourceId info will help a lot in debugging the job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28863) Snapshot result of RocksDB native savepoint should have empty shared-state
[ https://issues.apache.org/jira/browse/FLINK-28863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-28863: Assignee: Jinzhong Li > Snapshot result of RocksDB native savepoint should have empty shared-state > -- > > Key: FLINK-28863 > URL: https://issues.apache.org/jira/browse/FLINK-28863 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Assignee: Jinzhong Li >Priority: Major > Fix For: 1.17.0, 1.15.3, 1.16.1 > > > The current snapshot result of RocksDB native savepoint has non-empty shared > state, which is obviously not correct as all snapshot artifacts already stay > in the exclusive checkpoint scope folder. > This does not bring real harmful result due to we would not register the > snapshot results of RocksDB native savepoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28863) Snapshot result of RocksDB native savepoint should have empty shared-state
[ https://issues.apache.org/jira/browse/FLINK-28863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626365#comment-17626365 ] Yun Tang commented on FLINK-28863: -- [~lijinzhong] I think this idea should be correct, already assigned to you. > Snapshot result of RocksDB native savepoint should have empty shared-state > -- > > Key: FLINK-28863 > URL: https://issues.apache.org/jira/browse/FLINK-28863 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Major > Fix For: 1.17.0, 1.15.3, 1.16.1 > > > The current snapshot result of RocksDB native savepoint has non-empty shared > state, which is obviously not correct as all snapshot artifacts already stay > in the exclusive checkpoint scope folder. > This does not bring real harmful result due to we would not register the > snapshot results of RocksDB native savepoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28390) Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.
[ https://issues.apache.org/jira/browse/FLINK-28390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626363#comment-17626363 ] Yanfei Lei commented on FLINK-28390: [~Ming Li] As the bug has been fixed and the changes are minor, I don't think it will conflict with the [fork version of flink.|https://github.com/ververica/frocksdb] I'd like to cherry-pick this to frocksdb. And I'm +1 to [~yunta]'s comments. Rocksdb Configuration documentation has already been complicated, introducing new TTL setting maybe increase the burden on users to use TTL. > Allows RocksDB to configure FIFO Compaction to reduce CPU overhead. > --- > > Key: FLINK-28390 > URL: https://issues.apache.org/jira/browse/FLINK-28390 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: ming li >Priority: Major > > We know that the fifo compaction strategy may silently delete data and may > lose data for the business. But in some scenarios, FIFO compaction can be a > very effective way to reduce CPU usage. > > Flink's Taskmanager is usually some small-scale processes, such as allocating > 4 CPUs and 16G memory. When the state size is small, the CPU overhead > occupied by RocksDB is not high, and as the state increases, RocksDB may > frequently be in the compaction operation, which will occupy a large amount > of CPU and affect the computing operation. > > We usually configure a TTL for the state, so when using FIFO we can configure > it to be slightly longer than the TTL, so that the upper layer is the same as > before. > > Although the FIFO Compaction strategy may bring space amplification, the disk > is cheaper than the CPU after all, so the overall cost is reduced. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28889) Hybrid shuffle should supports multiple consumer
[ https://issues.apache.org/jira/browse/FLINK-28889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-28889. Resolution: Done master (1.17): d11940c4a78c71548b5a06af50da2e5f9cb68918 > Hybrid shuffle should supports multiple consumer > > > Key: FLINK-28889 > URL: https://issues.apache.org/jira/browse/FLINK-28889 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.16.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0 > > > Hybrid shuffle does not support multiple consumer for single subpartition > data. This will bring some defects, such as the inability to support > partition reuse, speculative execution. In particular, it cannot support > broadcast optimization, that is, hybrid shuffle writes multiple copies of > broadcast data. This will cause a waste of memory and disk space and affect > the performance of shuffle write phase. Ideally, for the full spilling > strategy, any broadcast data (record or event) should only write one piece of > data in the memory, and the same is true for the disk. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong closed pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization
xintongsong closed pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization URL: https://github.com/apache/flink/pull/21122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles
jiangxin369 commented on code in PR #162: URL: https://github.com/apache/flink-ml/pull/162#discussion_r100968 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java: ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +@Internal Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles
jiangxin369 commented on code in PR #162: URL: https://github.com/apache/flink-ml/pull/162#discussion_r1008999126 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java: ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +@Internal +public class QuantileSummary implements Serializable { + +/** The target relative error. */ +private final double relativeError; + +/** + * The compression threshold. After the internal buffer of statistics crosses this size, it + * attempts to compress the statistics together. + */ +private final int compressThreshold; + +/** The count of all the elements inserted to be calculated. */ +private final long count; + +/** A buffer of quantile statistics. */ +private final List sampled; + +/** The default size of head buffer. */ +private static final int DEFAULT_HEAD_SIZE = 5; + +/** The default compression threshold. */ +private static final int DEFAULT_COMPRESS_THRESHOLD = 1; + +/** A buffer of the latest samples seen so far. */ +private List headBuffer = new ArrayList<>(DEFAULT_HEAD_SIZE); + +/** + * QuantileSummary Constructor. + * + * @param relativeError The target relative error. + */ +public QuantileSummary(double relativeError) { +this(relativeError, DEFAULT_COMPRESS_THRESHOLD); +} + +/** + * QuantileSummary Constructor. + * + * @param relativeError The target relative error. + * @param compressThreshold the compression threshold. After the internal buffer of statistics + * crosses this size, it attempts to compress the statistics together. + */ +public QuantileSummary(double relativeError, int compressThreshold) { +this(relativeError, compressThreshold, Collections.EMPTY_LIST, 0); +} + +/** + * QuantileSummary Constructor. + * + * @param relativeError The target relative error. + * @param compressThreshold the compression threshold. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ +private QuantileSummary( +double relativeError, int compressThreshold, List sampled, long count) { +Preconditions.checkArgument( +relativeError > 0 && relativeError < 1, +"An appropriate relative error must lay between 0 and 1."); +Preconditions.checkArgument( +compressThreshold > 0, "An compress threshold must greater than 0."); +this.relativeError = relativeError; +this.compressThreshold = compressThreshold; +this.sampled = sampled; +this.count = count; +} + +/** + * Insert a new observation to the summary. + * + * @param item The new observation to insert into the summary. + * @return A summary with the given observation inserted into the summary. + */ +public QuantileSummary insert(Double item) { +headBuffer.add(item); +if (headBuffer.size() >= DEFAULT_HEAD_SIZE) { +QuantileSummary result = insertHeadBuffer(); +if (result.sampled.size() >= compressThreshold) { +return result.compress(); +} else { +return result; +} +} else { +return this; +} +} + +/** + * Return
[jira] [Updated] (FLINK-29795) The source.file.stream.io-fetch-size can not be set by table properties
[ https://issues.apache.org/jira/browse/FLINK-29795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-29795: --- Description: The {{source.file.stream.io-fetch-size}} is used in the bulk format mode, but it is not exposed to the filesystem connector options. If I try to use it in the with property, it will fails with {{{}Unsupported options{}}}. It can only be set by add it to the {{flink-conf.yaml}} now, and the same session cluster share the same config value. It's not convenient to adjust it and I think it should be scoped to the table's property. (was: The {{source.file.stream.io-fetch-size}} is used in the bulk format mode, but it is not exposed to the filesystem connector options. If I try to use it in the with property, it will fails with \{{Unsupported options}}. It can only be set by add it to the {{flink-conf.yaml}} now. It's not convenient and it should be scoped to the table's property.) > The source.file.stream.io-fetch-size can not be set by table properties > --- > > Key: FLINK-29795 > URL: https://issues.apache.org/jira/browse/FLINK-29795 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.17.0 >Reporter: Aitozi >Priority: Major > > The {{source.file.stream.io-fetch-size}} is used in the bulk format mode, but > it is not exposed to the filesystem connector options. If I try to use it in > the with property, it will fails with {{{}Unsupported options{}}}. It can > only be set by add it to the {{flink-conf.yaml}} now, and the same session > cluster share the same config value. It's not convenient to adjust it and I > think it should be scoped to the table's property. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29639) Add ResourceId in TransportException for debugging
[ https://issues.apache.org/jira/browse/FLINK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-29639: - Component/s: Runtime / Coordination > Add ResourceId in TransportException for debugging > --- > > Key: FLINK-29639 > URL: https://issues.apache.org/jira/browse/FLINK-29639 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Liu >Assignee: Liu >Priority: Major > > When the taskmanager is lost, only the host and port are shown in the > exception. It is hard to find the exactly taskmanger by resourceId. Add > ResourceId info will help a lot in debugging the job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connector/Kafka] Skip null records when writing
leonardBang commented on code in PR #21186: URL: https://github.com/apache/flink/pull/21186#discussion_r1008991930 ## flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java: ## @@ -194,8 +194,10 @@ public void write(IN element, Context context) throws IOException { final ProducerRecord record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); Review Comment: ```suggestion public void write(@Nullable IN element, Context context) throws IOException { final ProducerRecord record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); ``` ## flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java: ## @@ -54,7 +54,7 @@ default void open( * @param element element to be serialized * @param context context to possibly determine target partition * @param timestamp timestamp - * @return Kafka {@link ProducerRecord} + * @return Kafka {@link ProducerRecord} (null if the element cannot be serialized) */ ProducerRecord serialize(T element, KafkaSinkContext context, Long timestamp); Review Comment: ```suggestion * @return Kafka {@link ProducerRecord} or null if the given element cannot be serialized */ @Nullable ProducerRecord serialize(T element, KafkaSinkContext context, Long timestamp); ``` ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped Review Comment: minor: we can improve the note, because not all serializers will return `null` when the element cannot be serialized, not all `null` are due to cannot be serialized . What we can ensure is only that the `KafkaSinkWriter` will silently skip `null` that returned by serializers ## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ## @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); +// elements that cannot be serialized should be silently skipped +writer.write(null, SINK_WRITER_CONTEXT); +timeService.trigger(); +assertThat(numBytesOut.getCount()).isEqualTo(0L); +assertThat(numRecordsOut.getCount()).isEqualTo(0); +assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); +assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + +// but properly serialized elements should count just normally Review Comment: Could we add a new test like `testWriteNullElement` instead of modification in an existed test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] godfreyhe merged pull request #578: [hotfix] Fix some typos in 1.16 announcement
godfreyhe merged PR #578: URL: https://github.com/apache/flink-web/pull/578 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28553) The serializer in StateMap has not been updated when metaInfo of StateTable updated
[ https://issues.apache.org/jira/browse/FLINK-28553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626350#comment-17626350 ] Yanfei Lei commented on FLINK-28553: Thanks [~roman] for merging in master [8df50536ef913b63620d896423c39cdd01941c55|https://github.com/apache/flink/commit/8df50536ef913b63620d896423c39cdd01941c55] > The serializer in StateMap has not been updated when metaInfo of StateTable > updated > --- > > Key: FLINK-28553 > URL: https://issues.apache.org/jira/browse/FLINK-28553 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.5, 1.15.1 >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Minor > Fix For: 1.17.0 > > > When the meta info in StateTable updated, the serializer in StateMap has not > been updated. > (See StateTable#setMetaInfo) > The value may be serialized/deserialized/copied incorrectly after triggering > state migration of HashMapStateBackend. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28581) Test Changelog StateBackend V2 Manually
[ https://issues.apache.org/jira/browse/FLINK-28581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei closed FLINK-28581. -- Fix Version/s: 1.16.0 Resolution: Resolved > Test Changelog StateBackend V2 Manually > --- > > Key: FLINK-28581 > URL: https://issues.apache.org/jira/browse/FLINK-28581 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Hangxiang Yu >Priority: Major > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29748) Expose the optimize phase in the connector context
[ https://issues.apache.org/jira/browse/FLINK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626349#comment-17626349 ] Aitozi commented on FLINK-29748: {noformat} The rule will only apply once during the optimization because the rule uses hep optimizer which is not based on the cost. {noformat} Yes you are right. My {{optimization}} words mainly point to the whole planner phase, so the connector do not know whether the optimize is finished or not. As a workaround, I think we can add the callback interface eg: {{onOptimizeFinished}} or {{onTranslateExecNodeStart}} on the {{ScanTableSource}} . All the external validation work on the final results can perform on that stage. what do you think about it ? > Expose the optimize phase in the connector context > -- > > Key: FLINK-29748 > URL: https://issues.apache.org/jira/browse/FLINK-29748 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Aitozi >Priority: Minor > > Currently, in the connector it can not know whether the whole optimize is > finished. > When the optimize finished, the all information is static, eg: the reading > partitions. If I want to validate the final optimized result (like whether > the reading partition is too much or empty), it needs the context of what is > the current phase. I think the {{ScanContext}} is ok to expose this > information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347 ] Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:18 AM: --- Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split most of the looping code blocks when implementing the first version of {{JavaCodeSplitter}}. However for {{if}} code blocks I implement a special {{IfStatementRewriter}} to split them because they are quite common in generated code and they don't suffer from those corner cases. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator. However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. To me this issue seems like a common one. I'd prefer fixing the {{FunctionSplitter}} if possible. was (Author: tsreaper): Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split most of the looping code blocks when implementing the first version of {{JavaCodeSplitter}}. However for {{if}} code blocks I implement a special {{IfStatementRewriter}} to split them because they are quite common in generated code and they don't suffer from those corner cases. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator. However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. To me this issue seems like a common one. I'd prefer fixing the {{FunctionSplitter}} if possible. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3 >Reporter: Maciej Bryński >Priority: Major > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[fl
[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347 ] Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:17 AM: --- Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split most of the looping code blocks when implementing the first version of {{JavaCodeSplitter}}. However for {{if}} code blocks I implement a special {{IfStatementRewriter}} to split them because they are quite common in generated code and they don't suffer from those corner cases. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator. However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. To me this issue seems like a common one. I'd prefer fixing the {{FunctionSplitter}} if possible. was (Author: tsreaper): Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator. However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. To me this issue seems like a common one. I'd prefer fixing the {{FunctionSplitter}} if possible. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3 >Reporter: Maciej Bryński >Priority: Major > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1
[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347 ] Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:15 AM: --- Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator. However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. To me this issue seems like a common one. I'd prefer fixing the {{FunctionSplitter}} if possible. was (Author: tsreaper): Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator (if possible). However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3 >Reporter: Maciej Bryński >Priority: Major > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.3-stream1.jar
[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347 ] Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:14 AM: --- Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}'s and {{break}}'s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}'s or {{break}}'s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator (if possible). However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. was (Author: tsreaper): Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}s and {{break}}s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}s or {{break}}s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator (if possible). However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3 >Reporter: Maciej Bryński >Priority: Major > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(T
[jira] [Commented] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347 ] Caizhi Weng commented on FLINK-27246: - Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer some insight about this. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}s and {{break}}s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}s or {{break}}s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator (if possible). However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3 >Reporter: Maciej Bryński >Priority: Major > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:102) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:83) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > ... 11 more > Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compile
[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347 ] Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:13 AM: --- Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer my thoughts. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}s and {{break}}s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}s or {{break}}s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator (if possible). However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. was (Author: tsreaper): Hi [~KristoffSC]! Thanks for your interest in solving this issue. I'm the author of {{JavaCodeSplitter}} and I'd like to offer some insight about this. *1. if my understanding and proposed high level solution for splitting the problematic code block into smaller chunks is correct?* Your understanding is quite correct. However it might not be that easy to solve this issue due to some corner cases. For example we may have {{continue}}s and {{break}}s in a {{while}} loop, and we also need to deal with the loop variable in a {{for}} loop. Due to these cases I choose not to recursively split the code block when implementing the first version of {{JavaCodeSplitter}}. I'm not sure if the generated code in this particular ticket contains {{continue}}s or {{break}}s. If not we may choose to split code blocks without these keywords. *2. should this be done by FunctionSplitter or this should be implemented in Scala code for code generation?* It depends. If this issue is only caused by a single operator I'd like to change the code generation Scala code in that operator (if possible). However if other operators may also cause this issue it would be better to fix the {{FunctionSplitter}}. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3 >Reporter: Maciej Bryński >Priority: Major > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndI
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer
jiangxin369 commented on code in PR #166: URL: https://github.com/apache/flink-ml/pull/166#discussion_r1008977825 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java: ## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature.imputer; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.ml.api.Estimator; +import org.apache.flink.ml.common.datastream.DataStreamUtils; +import org.apache.flink.ml.common.util.QuantileSummary; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * The Imputer estimator completes missing values in a dataset. Missing values can be imputed using Review Comment: How about `The Imputer estimator completes missing values of the input columns. `, cause that the model can also be transformed on an unbounded data stream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29748) Expose the optimize phase in the connector context
[ https://issues.apache.org/jira/browse/FLINK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626343#comment-17626343 ] Shengkai Fang commented on FLINK-29748: --- Yes. I think you are right. But I am confused about > If I want to validate the source do not consume too much partition, from the > connector's perspective, It do not know whether the optimization is finished, > so it do not know when to apply the validation on the final optimization > results. The rule will only apply once during the optimization because the rule uses hep optimizer which is not based on the cost. > Expose the optimize phase in the connector context > -- > > Key: FLINK-29748 > URL: https://issues.apache.org/jira/browse/FLINK-29748 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Aitozi >Priority: Minor > > Currently, in the connector it can not know whether the whole optimize is > finished. > When the optimize finished, the all information is static, eg: the reading > partitions. If I want to validate the final optimized result (like whether > the reading partition is too much or empty), it needs the context of what is > the current phase. I think the {{ScanContext}} is ok to expose this > information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration
[ https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626342#comment-17626342 ] Xintong Song commented on FLINK-29572: -- I'm still not convinced that this is a bug of Flink. Maybe we can live with the disagreement for now and focus solving your problem first. bq. You don't have to configure different ports for each task manager. You just need to remove `taskmanager.rpc.port` from your configuration, and Flink by default should use random ports. Have you tried the random port approach? One more question: when not binding to the loopback address, does all the traffics still go through the proxy? > Flink Task Manager skip loopback interface for resource manager registration > > > Key: FLINK-29572 > URL: https://issues.apache.org/jira/browse/FLINK-29572 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.15.2 > Environment: Flink 1.15.2 > Kubernetes with Istio Proxy >Reporter: Kevin Li >Priority: Major > > Currently Flink Task Manager use different local interface to bind to connect > to Resource Manager. First one is Loopback interface. Normally if Job Manager > is running on remote host/container, using loopback interface to connect will > fail and it will pick up correct IP address. > However, if Task Manager is running with some proxy, loopback interface can > connect to remote host as well. This will result 127.0.0.1 reported to > Resource Manager during registration, even Job Manager/Resource Manager runs > on remote host, and problem will happen. For us, only one Task Manager can > register in this case. > I suggest adding configuration to skip Loopback interface check if we know > Job/Resource Manager is running on remote host/container. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29792) FileStoreCommitTest is unstable and may stuck
[ https://issues.apache.org/jira/browse/FLINK-29792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29792: --- Labels: pull-request-available (was: ) > FileStoreCommitTest is unstable and may stuck > - > > Key: FLINK-29792 > URL: https://issues.apache.org/jira/browse/FLINK-29792 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > > {{FileStoreCommitTest}} may stuck because the {{FileStoreCommit}} in > {{TestCommitThread}} does not commit APPEND snapshot when no new files are > produced. In this case, if the following COMPACT snapshot conflicts with the > current merge tree, the test will stuck. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper opened a new pull request, #341: [FLINK-29792] Fix unstable test FileStoreCommitTest which may stuck
tsreaper opened a new pull request, #341: URL: https://github.com/apache/flink-table-store/pull/341 `FileStoreCommitTest` may stuck because the `FileStoreCommit` in `TestCommitThread` does not commit APPEND snapshot when no new files are produced. In this case, if the following COMPACT snapshot conflicts with the current merge tree, the test will stuck. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-29792) FileStoreCommitTest is unstable and may stuck
[ https://issues.apache.org/jira/browse/FLINK-29792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng reassigned FLINK-29792: --- Assignee: Caizhi Weng > FileStoreCommitTest is unstable and may stuck > - > > Key: FLINK-29792 > URL: https://issues.apache.org/jira/browse/FLINK-29792 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > > {{FileStoreCommitTest}} may stuck because the {{FileStoreCommit}} in > {{TestCommitThread}} does not commit APPEND snapshot when no new files are > produced. In this case, if the following COMPACT snapshot conflicts with the > current merge tree, the test will stuck. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link
[ https://issues.apache.org/jira/browse/FLINK-29728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-29728. Fix Version/s: 1.17.0 1.16.1 Resolution: Fixed - master (1.17): e8e9db37e17110ff04175d2720484b34f5c4d5ba - release-1.16: 8fd9aa63a30a6037fcad752ab74fbdd6649ca3f0 > TablePlanner prevents Flink from starting is working directory is a symbolic > link > - > > Key: FLINK-29728 > URL: https://issues.apache.org/jira/browse/FLINK-29728 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Angelo Kastroulis >Assignee: Weijie Guo >Priority: Major > Fix For: 1.17.0, 1.16.1 > > > The Flink runtime throws an exception when using the table API if the working > directory is a symbolic link. This is the case when run on AWS EMR with Yarn. > There is a similar issue > [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the > same fix applied there would work. > > > {code:java} > Caused by: org.apache.flink.table.api.TableException: Could not initialize > the table planner components loader. > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > ~[?:1.8.0_342] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_342] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_342] > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > ~[?:1.8.0_342] > at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342] > at > java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) > ~[?:1.8.0_342] > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > ~[?:1.8.0_342] > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342] > at > org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?] > at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?] > at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_342] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_342] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_342] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.15.1.jar:1.15.1] > ... 7 more > Caused by: java.nio.file.FileAlreadyExistsException: /tmp > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) > ~[?:1.8.0_342] > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > ~[?:1.8.0_342] > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > ~[?:1.8.0_342] > at > sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) > ~[?:1.8.0_342] > at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342] > at java.nio.file.Files.createAndCheckIsDirectory(Files.java
[jira] [Updated] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link
[ https://issues.apache.org/jira/browse/FLINK-29728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-29728: - Component/s: Table SQL / Planner (was: Runtime / Coordination) > TablePlanner prevents Flink from starting is working directory is a symbolic > link > - > > Key: FLINK-29728 > URL: https://issues.apache.org/jira/browse/FLINK-29728 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Angelo Kastroulis >Assignee: Weijie Guo >Priority: Major > > The Flink runtime throws an exception when using the table API if the working > directory is a symbolic link. This is the case when run on AWS EMR with Yarn. > There is a similar issue > [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the > same fix applied there would work. > > > {code:java} > Caused by: org.apache.flink.table.api.TableException: Could not initialize > the table planner components loader. > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > ~[?:1.8.0_342] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_342] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_342] > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > ~[?:1.8.0_342] > at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342] > at > java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) > ~[?:1.8.0_342] > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > ~[?:1.8.0_342] > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342] > at > org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?] > at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?] > at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_342] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_342] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_342] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.15.1.jar:1.15.1] > ... 7 more > Caused by: java.nio.file.FileAlreadyExistsException: /tmp > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) > ~[?:1.8.0_342] > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > ~[?:1.8.0_342] > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > ~[?:1.8.0_342] > at > sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) > ~[?:1.8.0_342] > at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342] > at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) > ~[?:1.8.0_342] > at java.nio.file.Files.createDirectories(Files.java:727) ~[?:1.8.0_342] > at > org.apache.flink.table.planne
[jira] [Assigned] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link
[ https://issues.apache.org/jira/browse/FLINK-29728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-29728: Assignee: Weijie Guo > TablePlanner prevents Flink from starting is working directory is a symbolic > link > - > > Key: FLINK-29728 > URL: https://issues.apache.org/jira/browse/FLINK-29728 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.2 >Reporter: Angelo Kastroulis >Assignee: Weijie Guo >Priority: Major > > The Flink runtime throws an exception when using the table API if the working > directory is a symbolic link. This is the case when run on AWS EMR with Yarn. > There is a similar issue > [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the > same fix applied there would work. > > > {code:java} > Caused by: org.apache.flink.table.api.TableException: Could not initialize > the table planner components loader. > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at > org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34) > ~[flink-table-planner-loader-1.15.1.jar:1.15.1] > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > ~[?:1.8.0_342] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_342] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_342] > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > ~[?:1.8.0_342] > at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342] > at > java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) > ~[?:1.8.0_342] > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > ~[?:1.8.0_342] > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342] > at > org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at > org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) > ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] > at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?] > at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?] > at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_342] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_342] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_342] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.15.1.jar:1.15.1] > ... 7 more > Caused by: java.nio.file.FileAlreadyExistsException: /tmp > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) > ~[?:1.8.0_342] > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > ~[?:1.8.0_342] > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > ~[?:1.8.0_342] > at > sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) > ~[?:1.8.0_342] > at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342] > at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) > ~[?:1.8.0_342] > at java.nio.file.Files.createDirectories(Files.java:727) ~[?:1.8.0_342] > at > org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:96) > ~[f
[jira] [Closed] (FLINK-28102) Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
[ https://issues.apache.org/jira/browse/FLINK-28102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-28102. Fix Version/s: 1.17.0 1.16.1 Resolution: Fixed - master (1.17): 2859196f9ab1d86a3d90e47a89cbd13be74741b9 - release-1.16: 3ae578e2233abd42f770d1bf395792c85698fd89 > Flink AkkaRpcSystemLoader fails when temporary directory is a symlink > - > > Key: FLINK-28102 > URL: https://issues.apache.org/jira/browse/FLINK-28102 > Project: Flink > Issue Type: Bug > Components: Runtime / RPC >Affects Versions: 1.16.0, 1.15.2 >Reporter: Prabhu Joseph >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.16.1 > > > Flink AkkaRpcSystemLoader fails when temporary directory is a symlink > *Error Message:* > {code} > Caused by: java.nio.file.FileAlreadyExistsException: /tmp > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) > ~[?:1.8.0_332] > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > ~[?:1.8.0_332] > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > ~[?:1.8.0_332] > at > sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) > ~[?:1.8.0_332] > at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_332] > at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) > ~[?:1.8.0_332] > at java.nio.file.Files.createDirectories(Files.java:727) > ~[?:1.8.0_332] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcSystemLoader.loadRpcSystem(AkkaRpcSystemLoader.java:58) > ~[flink-dist-1.15.0.jar:1.15.0] > at org.apache.flink.runtime.rpc.RpcSystem.load(RpcSystem.java:101) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:186) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) > ~[flink-dist-1.15.0.jar:1.15.0] > {code} > *Repro:* > {code} > 1. /tmp is a symlink points to actual directory /mnt/tmp > [root@prabhuHost log]# ls -lrt /tmp > lrwxrwxrwx 1 root root 8 Jun 15 07:51 /tmp -> /mnt/tmp > 2. Start Cluster > ./bin/start-cluster.sh > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28102) Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
[ https://issues.apache.org/jira/browse/FLINK-28102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-28102: - Affects Version/s: 1.15.2 (was: 1.15.0) (was: 1.17.0) > Flink AkkaRpcSystemLoader fails when temporary directory is a symlink > - > > Key: FLINK-28102 > URL: https://issues.apache.org/jira/browse/FLINK-28102 > Project: Flink > Issue Type: Bug > Components: Runtime / RPC >Affects Versions: 1.16.0, 1.15.2 >Reporter: Prabhu Joseph >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available > > Flink AkkaRpcSystemLoader fails when temporary directory is a symlink > *Error Message:* > {code} > Caused by: java.nio.file.FileAlreadyExistsException: /tmp > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) > ~[?:1.8.0_332] > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > ~[?:1.8.0_332] > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > ~[?:1.8.0_332] > at > sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) > ~[?:1.8.0_332] > at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_332] > at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) > ~[?:1.8.0_332] > at java.nio.file.Files.createDirectories(Files.java:727) > ~[?:1.8.0_332] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcSystemLoader.loadRpcSystem(AkkaRpcSystemLoader.java:58) > ~[flink-dist-1.15.0.jar:1.15.0] > at org.apache.flink.runtime.rpc.RpcSystem.load(RpcSystem.java:101) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:186) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) > ~[flink-dist-1.15.0.jar:1.15.0] > {code} > *Repro:* > {code} > 1. /tmp is a symlink points to actual directory /mnt/tmp > [root@prabhuHost log]# ls -lrt /tmp > lrwxrwxrwx 1 root root 8 Jun 15 07:51 /tmp -> /mnt/tmp > 2. Start Cluster > ./bin/start-cluster.sh > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong closed pull request #21125: [FLINK-28102] Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
xintongsong closed pull request #21125: [FLINK-28102] Flink AkkaRpcSystemLoader fails when temporary directory is a symlink URL: https://github.com/apache/flink/pull/21125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles
lindong28 commented on code in PR #162: URL: https://github.com/apache/flink-ml/pull/162#discussion_r1008964931 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java: ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +@Internal Review Comment: @jiangxin369 If you need to update this PR, it might be simpler to also update `withBroadcast()` to follow the best practice of using `@Internal`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles
lindong28 commented on code in PR #162: URL: https://github.com/apache/flink-ml/pull/162#discussion_r1008964672 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java: ## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +@Internal +public class QuantileSummary implements Serializable { + +/** The target relative error. */ +private final double relativeError; + +/** + * The compression threshold. After the internal buffer of statistics crosses this size, it + * attempts to compress the statistics together. + */ +private final int compressThreshold; + +/** The count of all the elements inserted to be calculated. */ +private final long count; + +/** A buffer of quantile statistics. */ +private final List sampled; + +/** The default size of head buffer. */ +private static final int DEFAULT_HEAD_SIZE = 5; + +/** The default compression threshold. */ +private static final int DEFAULT_COMPRESS_THRESHOLD = 1; + +/** A buffer of the latest samples seen so far. */ +private List headBuffer = new ArrayList<>(DEFAULT_HEAD_SIZE); + +/** + * QuantileSummary Constructor. + * + * @param relativeError The target relative error. + */ +public QuantileSummary(double relativeError) { +this(relativeError, DEFAULT_COMPRESS_THRESHOLD); +} + +/** + * QuantileSummary Constructor. + * + * @param relativeError The target relative error. + * @param compressThreshold the compression threshold. After the internal buffer of statistics + * crosses this size, it attempts to compress the statistics together. + */ +public QuantileSummary(double relativeError, int compressThreshold) { +this(relativeError, compressThreshold, Collections.EMPTY_LIST, 0); +} + +/** + * QuantileSummary Constructor. + * + * @param relativeError The target relative error. + * @param compressThreshold the compression threshold. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ +private QuantileSummary( +double relativeError, int compressThreshold, List sampled, long count) { +Preconditions.checkArgument( +relativeError > 0 && relativeError < 1, +"An appropriate relative error must lay between 0 and 1."); +Preconditions.checkArgument( +compressThreshold > 0, "An compress threshold must greater than 0."); +this.relativeError = relativeError; +this.compressThreshold = compressThreshold; +this.sampled = sampled; +this.count = count; +} + +/** + * Insert a new observation to the summary. + * + * @param item The new observation to insert into the summary. + * @return A summary with the given observation inserted into the summary. + */ +public QuantileSummary insert(Double item) { +headBuffer.add(item); +if (headBuffer.size() >= DEFAULT_HEAD_SIZE) { +QuantileSummary result = insertHeadBuffer(); +if (result.sampled.size() >= compressThreshold) { +return result.compress(); +} else { +return result; +} +} else { +return this; +} +} + +/** + * Returns
[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization
xintongsong commented on PR #21122: URL: https://github.com/apache/flink/pull/21122#issuecomment-1296419534 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs
snuyanzin commented on code in PR #17649: URL: https://github.com/apache/flink/pull/17649#discussion_r1008952683 ## docs/content/docs/dev/table/sqlClient.md: ## @@ -91,6 +91,40 @@ The `SET` command allows you to tune the job execution and the sql client behavi After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. The [configuration section](#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. +### Key-strokes + +There is a list of available key-strokes in SQL client + +| Key-Stroke (Linux, Windows(WSL)) | Key-Stroke (Mac) | Description | +|:-|--|:---| +| `alt-b` | `Esc-b` | Backward word | +| `alt-f` | `Esc-f` | Forward word | +| `alt-c` | `Esc-c` | Capitalize word | +| `alt-l` | `Esc-l` | Lowercase word | +| `alt-u` | `Esc-u` | Uppercase word | +| `alt-d` | `Esc-d` | Kill word | +| `alt-n` | `Esc-n` | History search forward | +| `alt-p` | `Esc-p` | History search backward| Review Comment: i added this clarification -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs
snuyanzin commented on code in PR #17649: URL: https://github.com/apache/flink/pull/17649#discussion_r1008952635 ## docs/content/docs/dev/table/sqlClient.md: ## @@ -91,6 +91,40 @@ The `SET` command allows you to tune the job execution and the sql client behavi After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. The [configuration section](#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. +### Key-strokes + +There is a list of available key-strokes in SQL client + +| Key-Stroke (Linux, Windows(WSL)) | Key-Stroke (Mac) | Description | +|:-|--|:---| +| `alt-b` | `Esc-b` | Backward word | +| `alt-f` | `Esc-f` | Forward word | Review Comment: thanks, added this as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs
snuyanzin commented on code in PR #17649: URL: https://github.com/apache/flink/pull/17649#discussion_r1008951197 ## docs/content/docs/dev/table/sqlClient.md: ## @@ -91,6 +91,40 @@ The `SET` command allows you to tune the job execution and the sql client behavi After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. The [configuration section](#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. +### Key-strokes + +There is a list of available key-strokes in SQL client + +| Key-Stroke (Linux, Windows(WSL)) | Key-Stroke (Mac) | Description | +|:-|--|:---| +| `alt-b` | `Esc-b` | Backward word | +| `alt-f` | `Esc-f` | Forward word | +| `alt-c` | `Esc-c` | Capitalize word | +| `alt-l` | `Esc-l` | Lowercase word | +| `alt-u` | `Esc-u` | Uppercase word | +| `alt-d` | `Esc-d` | Kill word | +| `alt-n` | `Esc-n` | History search forward | +| `alt-p` | `Esc-p` | History search backward| Review Comment: it's not 100% same. `history search forward/backward` behaves same as `Up/line from history` in case there is no input provided. However if there is a non-empty input it will navigate to the first occurrence `forward/backward` containing input. That's the difference -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on PR #20140: URL: https://github.com/apache/flink/pull/20140#issuecomment-1296359376 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] salvalcantara commented on pull request #21186: [FLINK-29480][Connector/Kafka] Skip null records when writing
salvalcantara commented on PR #21186: URL: https://github.com/apache/flink/pull/21186#issuecomment-1296333613 Initially, the ci pipeline passed successfully, I made some cosmetic changes and then I got a failure which seems unrelated to my code changes, really. Can you confirm @MartijnVisser @mas-chen @leonardBang @PatrickRen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on code in PR #20140: URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionDefaultVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +/** + * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot + * push down the filter. + */ +@Experimental +public class JdbcFilterPushdownPreparedStatementVisitor +extends ExpressionDefaultVisitor> { + +private Function quoteIdentifierFunction; + +private static final Set> SUPPORTED_DATA_TYPES; + +static { +SUPPORTED_DATA_TYPES = new HashSet<>(); +SUPPORTED_DATA_TYPES.add(IntType.class); +SUPPORTED_DATA_TYPES.add(BigIntType.class); +SUPPORTED_DATA_TYPES.add(BooleanType.class); +SUPPORTED_DATA_TYPES.add(DecimalType.class); +SUPPORTED_DATA_TYPES.add(DoubleType.class); +SUPPORTED_DATA_TYPES.add(FloatType.class); +SUPPORTED_DATA_TYPES.add(SmallIntType.class); +SUPPORTED_DATA_TYPES.add(VarCharType.class); +SUPPORTED_DATA_TYPES.add(TimestampType.class); +SUPPORTED_DATA_TYPES.add(DateType.class); +SUPPORTED_DATA_TYPES.add(TimeType.class); +} + +public JdbcFilterPushdownPreparedStatementVisitor( +Function quoteIdentifierFunction) { +this.quoteIdentifierFunction = quoteIdentifierFunction; +} + +@Override +public Optional visit(CallExpression call) { +if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<>", call.getResolvedCh
[GitHub] [flink] salvalcantara commented on pull request #21186: [FLINK-29480][Connectors / Kafka] Skip null records when writing
salvalcantara commented on PR #21186: URL: https://github.com/apache/flink/pull/21186#issuecomment-1296303288 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29799) How to override ConfigMap values while deploying the Operator via OLM?
Cansu Kavili created FLINK-29799: Summary: How to override ConfigMap values while deploying the Operator via OLM? Key: FLINK-29799 URL: https://issues.apache.org/jira/browse/FLINK-29799 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Cansu Kavili Hi, Flink Kubernetes Operator is great - thank you! I deployed it via OLM on OpenShift 4.10 and would like to override some config from `flink-operator-config` CM. When I override a value manually, it persists (which is strange - it's a resource managed via Operator) so when I change a value in CM and restart Operator, it works. How can I give the parameters I want while installing the operator? Some operators support setting environment variables in `Subscription` object but I couldn't find such a thing in the documentation. I can do it easily with Helm installation but I want to use OLM. It'd be appreciated if you can guide me. Many many thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21195: [IN_PROGRESS] Remove "flink-sql-parser-hive" from "flink-table"
flinkbot commented on PR #21195: URL: https://github.com/apache/flink/pull/21195#issuecomment-1296283103 ## CI report: * bbfcc159bebcec8ad634c48ee8f39e975640696a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ferenc-csaky opened a new pull request, #21195: [IN_PROGRESS] Remove "flink-sql-parser-hive" from "flink-table"
ferenc-csaky opened a new pull request, #21195: URL: https://github.com/apache/flink/pull/21195 ## What is the purpose of the change This is the actualized version of https://github.com/apache/flink/pull/19077. Removes the `flink-sql-parser-hive` module from `flink-table` to decouple Hive from the table planner module, which is required to externalize the connector. ## Brief change log ... ## Verifying this change ... ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mbalassi commented on pull request #21189: [FLINK-24119][tests] Add random to Kafka tests topic name
mbalassi commented on PR #21189: URL: https://github.com/apache/flink/pull/21189#issuecomment-1296277519 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mbalassi commented on pull request #21180: [FLINK-29783][tests] Add some random to KafkaShuffleExactlyOnceITCase topic names
mbalassi commented on PR #21180: URL: https://github.com/apache/flink/pull/21180#issuecomment-1296277420 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29798) Rename K8s operator client code module
Márton Balassi created FLINK-29798: -- Summary: Rename K8s operator client code module Key: FLINK-29798 URL: https://issues.apache.org/jira/browse/FLINK-29798 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.2.0 Reporter: Márton Balassi Assignee: Márton Balassi Fix For: kubernetes-operator-1.3.0 The example code module in the k8s operator is named simply kubernetes-client-examples, and thus is published like so: [https://repo1.maven.org/maven2/org/apache/flink/kubernetes-client-examples/1.2.0/] We should make this more specific. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on PR #20140: URL: https://github.com/apache/flink/pull/20140#issuecomment-1296273034 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on PR #20140: URL: https://github.com/apache/flink/pull/20140#issuecomment-1296238300 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on code in PR #20140: URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionDefaultVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +/** + * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot + * push down the filter. + */ +@Experimental +public class JdbcFilterPushdownPreparedStatementVisitor +extends ExpressionDefaultVisitor> { + +private Function quoteIdentifierFunction; + +private static final Set> SUPPORTED_DATA_TYPES; + +static { +SUPPORTED_DATA_TYPES = new HashSet<>(); +SUPPORTED_DATA_TYPES.add(IntType.class); +SUPPORTED_DATA_TYPES.add(BigIntType.class); +SUPPORTED_DATA_TYPES.add(BooleanType.class); +SUPPORTED_DATA_TYPES.add(DecimalType.class); +SUPPORTED_DATA_TYPES.add(DoubleType.class); +SUPPORTED_DATA_TYPES.add(FloatType.class); +SUPPORTED_DATA_TYPES.add(SmallIntType.class); +SUPPORTED_DATA_TYPES.add(VarCharType.class); +SUPPORTED_DATA_TYPES.add(TimestampType.class); +SUPPORTED_DATA_TYPES.add(DateType.class); +SUPPORTED_DATA_TYPES.add(TimeType.class); +} + +public JdbcFilterPushdownPreparedStatementVisitor( +Function quoteIdentifierFunction) { +this.quoteIdentifierFunction = quoteIdentifierFunction; +} + +@Override +public Optional visit(CallExpression call) { +if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<>", call.getResolvedCh
[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on code in PR #20140: URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionDefaultVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +/** + * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot + * push down the filter. + */ +@Experimental +public class JdbcFilterPushdownPreparedStatementVisitor +extends ExpressionDefaultVisitor> { + +private Function quoteIdentifierFunction; + +private static final Set> SUPPORTED_DATA_TYPES; + +static { +SUPPORTED_DATA_TYPES = new HashSet<>(); +SUPPORTED_DATA_TYPES.add(IntType.class); +SUPPORTED_DATA_TYPES.add(BigIntType.class); +SUPPORTED_DATA_TYPES.add(BooleanType.class); +SUPPORTED_DATA_TYPES.add(DecimalType.class); +SUPPORTED_DATA_TYPES.add(DoubleType.class); +SUPPORTED_DATA_TYPES.add(FloatType.class); +SUPPORTED_DATA_TYPES.add(SmallIntType.class); +SUPPORTED_DATA_TYPES.add(VarCharType.class); +SUPPORTED_DATA_TYPES.add(TimestampType.class); +SUPPORTED_DATA_TYPES.add(DateType.class); +SUPPORTED_DATA_TYPES.add(TimeType.class); +} + +public JdbcFilterPushdownPreparedStatementVisitor( +Function quoteIdentifierFunction) { +this.quoteIdentifierFunction = quoteIdentifierFunction; +} + +@Override +public Optional visit(CallExpression call) { +if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<>", call.getResolvedCh
[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
qingwei91 commented on PR #20140: URL: https://github.com/apache/flink/pull/20140#issuecomment-1296222402 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21194: Update fault_tolerance.md
flinkbot commented on PR #21194: URL: https://github.com/apache/flink/pull/21194#issuecomment-1296219332 ## CI report: * ff2dc1314335fa68e043980a5a4f1827803ab96b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] chandan1602 opened a new pull request, #21194: Update fault_tolerance.md
chandan1602 opened a new pull request, #21194: URL: https://github.com/apache/flink/pull/21194 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
libenchao commented on code in PR #20140: URL: https://github.com/apache/flink/pull/20140#discussion_r1008821587 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionDefaultVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +/** + * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot + * push down the filter. + */ +@Experimental +public class JdbcFilterPushdownPreparedStatementVisitor +extends ExpressionDefaultVisitor> { + +private Function quoteIdentifierFunction; + +private static final Set> SUPPORTED_DATA_TYPES; + +static { +SUPPORTED_DATA_TYPES = new HashSet<>(); +SUPPORTED_DATA_TYPES.add(IntType.class); +SUPPORTED_DATA_TYPES.add(BigIntType.class); +SUPPORTED_DATA_TYPES.add(BooleanType.class); +SUPPORTED_DATA_TYPES.add(DecimalType.class); +SUPPORTED_DATA_TYPES.add(DoubleType.class); +SUPPORTED_DATA_TYPES.add(FloatType.class); +SUPPORTED_DATA_TYPES.add(SmallIntType.class); +SUPPORTED_DATA_TYPES.add(VarCharType.class); +SUPPORTED_DATA_TYPES.add(TimestampType.class); +SUPPORTED_DATA_TYPES.add(DateType.class); +SUPPORTED_DATA_TYPES.add(TimeType.class); +} + +public JdbcFilterPushdownPreparedStatementVisitor( +Function quoteIdentifierFunction) { +this.quoteIdentifierFunction = quoteIdentifierFunction; +} + +@Override +public Optional visit(CallExpression call) { +if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { +return renderBinaryOperator(">=", call.getResolvedChildren()); +} +if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("<>", call.getResolvedCh
[GitHub] [flink-kubernetes-operator] mbalassi commented on pull request #417: [FLINK-29655] Split Flink CRD from flink-kubernates-operator module
mbalassi commented on PR #417: URL: https://github.com/apache/flink-kubernetes-operator/pull/417#issuecomment-1296153291 @tweise has pointed out to me that we actually do not guarantee Java API compatibility at this point, and consequently it is best to make the changes now necessary to be able to provide said guarantee later. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/compatibility/#java-api-compatibility -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization
xintongsong commented on PR #21122: URL: https://github.com/apache/flink/pull/21122#issuecomment-1296144735 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-29480) Skip invalid messages when writing
[ https://issues.apache.org/jira/browse/FLINK-29480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626176#comment-17626176 ] Salva edited comment on FLINK-29480 at 10/30/22 7:08 AM: - [~mason6345] [~leonard] [~renqs] [~martijnvisser] I think the PR is ready for review. Can you please take a look? was (Author: JIRAUSER287051): [~mason6345] [~leonard] [~renqs] [~martijnvisser] From my perspective, the PR is ready for review. Can you please take a look? > Skip invalid messages when writing > -- > > Key: FLINK-29480 > URL: https://issues.apache.org/jira/browse/FLINK-29480 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Salva >Assignee: Salva >Priority: Minor > Labels: pull-request-available > Attachments: Screenshot 2022-10-28 at 13.48.12.png > > > As reported in [1], it seems that it's not possible to skip invalid messages > when writing. More specifically, if there is an error serializing messages, > there is no option for skipping them and then Flink job enters a crash loop. > In particular, the `write` method of the `KafkaWriter` looks like this: > {code:java} > @Override > public void write(IN element, Context context) throws IOException { > final ProducerRecord record = > recordSerializer.serialize(element, ...); > currentProducer.send(record, deliveryCallback); // line 200 > numRecordsSendCounter.inc(); > } {code} > So, If you make your `serialize` method return `null`, this is what you get > at runtime > {code:java} > java.lang.NullPointerException at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > {code} > What I propose is to modify the KafkaWriter [2, 3] like this: > {code:java} > @Override > public void write(IN element, Context context) throws IOException { > final ProducerRecord record = > recordSerializer.serialize(element, ...); > if (record != null) { // skip null records (check to be added) > currentProducer.send(record, deliveryCallback); > numRecordsSendCounter.inc(); > } > } {code} > In order to at least give a chance of skipping those messages and move on to > the next ones. > Obviously, one could prepend the sink with a flatMap operator for filtering > out invalid messages, but > # It looks weird that one has to prepend an operator for "making sure" that > the serializer will not fail right after. Wouldn't it be simpler to skip the > null records directly in order to avoid this pre-check? [4] > # It's such a simple change (apparently) > # Brings consistency/symmetry with the reading case [4, 5] > To expand on point 3, by looking at `KafkaDeserializationSchema`: > {code:java} > T deserialize(ConsumerRecord record) throws Exception; > default void deserialize(ConsumerRecord message, Collector > out) throws Exception { > T deserialized = deserialize(message); > if (deserialized != null) { // skip null records (check already exists) > out.collect(deserialized); > } > } {code} > one can simply return `null` in the overriden `deserialize` method in order > to skip any message that fails to be deserialized. Similarly, if one uses the > `KafkaRecordDeserializationSchema` interface instead: > {code:java} > void deserialize(ConsumerRecord record, Collector out) > throws IOException {code} > then it's also possible not to invoke `out.collect(...)` on null records. To > me, it looks strange that the same flexibility is not given in the writing > case. > *References* > [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h] > [2] > [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143] > > [3] > [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197] > > [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] > [5] > [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink] > -- This message was sent by Atlassian Jira (v8.20.10#820010)