[GitHub] [flink-table-store] FangYongs closed pull request #313: [FLINK-27958] Compare batch maxKey to reduce comparisons in SortMergeReader
FangYongs closed pull request #313: [FLINK-27958] Compare batch maxKey to reduce comparisons in SortMergeReader URL: https://github.com/apache/flink-table-store/pull/313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30979) The buckets of the secondary partition should fall on different tasks
[ https://issues.apache.org/jira/browse/FLINK-30979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30979: --- Labels: pull-request-available (was: ) > The buckets of the secondary partition should fall on different tasks > - > > Key: FLINK-30979 > URL: https://issues.apache.org/jira/browse/FLINK-30979 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Shammon >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > In Flink Streaming Job, sink to table store. > Considering that I only set one bucket now, but there are many secondary > partitions, I expect to use multiple parallelism tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] FangYongs opened a new pull request, #522: [FLINK-30979] Support shuffling data by partition
FangYongs opened a new pull request, #522: URL: https://github.com/apache/flink-table-store/pull/522 Currently sink operator in flink will shuffle data by bucket id, which cause data skew when there is only 1 bucket with multiple partitions in the table. This PR aims to support shuffling data by bucket id and partition when `sink.shuffle-by-partition.enable` is set. The main changes are 1. Added config `sink.shuffle-by-partition.enable` to support shuffling data by partition 2. Added `PartitionComputer` to get partition from row data 3. Added shuffling data by partition in `BucketStreamPartitioner` The main tests are 1. Added `FileStoreShuffleBucketTest` to shuffle data by bucket and partition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31021) JavaCodeSplitter doesn't split static method properly
[ https://issues.apache.org/jira/browse/FLINK-31021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687368#comment-17687368 ] Krzysztof Chmielewski commented on FLINK-31021: --- Hi, i have few questions. 1. Could you provide full body of original decode method? 2. do you have sql query that reproduces the problem? 3. You marked affect version as 1.16.1 and below. Did you in fact had this on those or on a current master? Im asking because recently there was a change in code splitter merged to master 1.17 and 1.16 release that is not included in 1.16.1 so I'm wondering if this is a regression or something new. Let me know, Cheers. > JavaCodeSplitter doesn't split static method properly > - > > Key: FLINK-31021 > URL: https://issues.apache.org/jira/browse/FLINK-31021 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.4, 1.15.3, 1.16.1 >Reporter: Xingcan Cui >Priority: Minor > > The exception while compiling the generated source > {code:java} > cause=org.codehaus.commons.compiler.CompileException: Line 3383, Column 90: > Instance method "default void > org.apache.flink.formats.protobuf.deserialize.GeneratedProtoToRow_655d75db1cf943838f5500013edfba82.decodeImpl(foo.bar.LogData)" > cannot be invoked in static context,{code} > The original method header > {code:java} > public static RowData decode(foo.bar.LogData message){{code} > The code after split > > {code:java} > Line 3383: public static RowData decode(foo.bar.LogData message){ > decodeImpl(message); return decodeReturnValue$0; } > Line 3384: > Line 3385: void decodeImpl(foo.bar.LogData message) {{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange splitsChanges } // Create pulsar consumer. -this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +try { +this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +} catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumers in a same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to be wrapped into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ## @@ -220,11 +223,14 @@ private void createSubscription(List newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); -if (sourceConfiguration.isResetSubscriptionCursor()) { -sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); -} else { -sneakyAdmin( -() -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); +try { +if (sourceConfiguration.isResetSubscriptionCursor()) { +position.seekPosition(pulsarAdmin, topic, subscriptionName); +} else { +position.createInitialPosition(pulsarAdmin, topic, subscriptionName); +} +} catch (PulsarAdminException e) { +throw new FlinkRuntimeException(e); Review Comment: My bad, we can still throw this exception with a little more works on code refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ## @@ -220,11 +223,14 @@ private void createSubscription(List newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); -if (sourceConfiguration.isResetSubscriptionCursor()) { -sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); -} else { -sneakyAdmin( -() -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); +try { +if (sourceConfiguration.isResetSubscriptionCursor()) { +position.seekPosition(pulsarAdmin, topic, subscriptionName); +} else { +position.createInitialPosition(pulsarAdmin, topic, subscriptionName); +} +} catch (PulsarAdminException e) { +throw new FlinkRuntimeException(e); Review Comment: My bad, we can still throw this exception will a little more works on code refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522744 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ## @@ -220,11 +223,14 @@ private void createSubscription(List newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); -if (sourceConfiguration.isResetSubscriptionCursor()) { -sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); -} else { -sneakyAdmin( -() -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); +try { +if (sourceConfiguration.isResetSubscriptionCursor()) { +position.seekPosition(pulsarAdmin, topic, subscriptionName); +} else { +position.createInitialPosition(pulsarAdmin, topic, subscriptionName); +} +} catch (PulsarAdminException e) { +throw new FlinkRuntimeException(e); Review Comment: This method is call in `context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);`. It's hard to throw the exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522426 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java: ## @@ -619,7 +619,7 @@ private void ensureSubscriberIsNull(String attemptingSubscribeMode) { private void ensureSchemaTypeIsValid(Schema schema) { SchemaInfo info = schema.getSchemaInfo(); -if (info.getType() == SchemaType.AUTO_CONSUME || info.getType() == SchemaType.AUTO) { +if (info.getType() == SchemaType.AUTO_CONSUME) { Review Comment: I just notice this is a deprecated API calling in Pulsar and could never happen. Since this is a code refactor PR, I add it here and didn't submit a new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange splitsChanges } // Create pulsar consumer. -this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +try { +this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +} catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumer in same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to be wrapped into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange splitsChanges } // Create pulsar consumer. -this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +try { +this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); +} catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumer in same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to wrap into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30993) Introduce FloatSerializer for Table Store
[ https://issues.apache.org/jira/browse/FLINK-30993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-30993. Assignee: Feng Wang Resolution: Fixed master: c62992cef568274ec777c2d6e264a12f077b0925 > Introduce FloatSerializer for Table Store > - > > Key: FLINK-30993 > URL: https://issues.apache.org/jira/browse/FLINK-30993 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Feng Wang >Assignee: Feng Wang >Priority: Major > Fix For: table-store-0.4.0 > > > Introduce FloatSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30992) Introduce ShortSerializer for Table Store
[ https://issues.apache.org/jira/browse/FLINK-30992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-30992. Assignee: Feng Wang Resolution: Fixed master: f285db56b8a440ef4ef08d15b5a48d58b7c10e01 > Introduce ShortSerializer for Table Store > - > > Key: FLINK-30992 > URL: https://issues.apache.org/jira/browse/FLINK-30992 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Feng Wang >Assignee: Feng Wang >Priority: Major > Fix For: table-store-0.4.0 > > > Introduce ShortSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #511: [Flink-30993] Introduce FloatSerializer for Table Store
JingsongLi merged PR #511: URL: https://github.com/apache/flink-table-store/pull/511 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30991) Introduce LongSerializer for Table Store
[ https://issues.apache.org/jira/browse/FLINK-30991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-30991. Resolution: Fixed master: 430df6c595c229a7128a43c5a1dc831d3f91f905 > Introduce LongSerializer for Table Store > > > Key: FLINK-30991 > URL: https://issues.apache.org/jira/browse/FLINK-30991 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Feng Wang >Assignee: Feng Wang >Priority: Major > Fix For: table-store-0.4.0 > > > Introduce LongSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #510: [Flink-30992] Introduce ShortSerializer for Table Store
JingsongLi merged PR #510: URL: https://github.com/apache/flink-table-store/pull/510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #509: [Flink-30991] Introduce LongSerializer for Table Store
JingsongLi merged PR #509: URL: https://github.com/apache/flink-table-store/pull/509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31021) JavaCodeSplitter doesn't split static method properly
[ https://issues.apache.org/jira/browse/FLINK-31021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-31021: Affects Version/s: 1.15.3 > JavaCodeSplitter doesn't split static method properly > - > > Key: FLINK-31021 > URL: https://issues.apache.org/jira/browse/FLINK-31021 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.4, 1.15.3, 1.16.1 >Reporter: Xingcan Cui >Priority: Minor > > The exception while compiling the generated source > {code:java} > cause=org.codehaus.commons.compiler.CompileException: Line 3383, Column 90: > Instance method "default void > org.apache.flink.formats.protobuf.deserialize.GeneratedProtoToRow_655d75db1cf943838f5500013edfba82.decodeImpl(foo.bar.LogData)" > cannot be invoked in static context,{code} > The original method header > {code:java} > public static RowData decode(foo.bar.LogData message){{code} > The code after split > > {code:java} > Line 3383: public static RowData decode(foo.bar.LogData message){ > decodeImpl(message); return decodeReturnValue$0; } > Line 3384: > Line 3385: void decodeImpl(foo.bar.LogData message) {{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31021) JavaCodeSplitter doesn't split static method properly
Xingcan Cui created FLINK-31021: --- Summary: JavaCodeSplitter doesn't split static method properly Key: FLINK-31021 URL: https://issues.apache.org/jira/browse/FLINK-31021 Project: Flink Issue Type: Bug Affects Versions: 1.16.1, 1.14.4 Reporter: Xingcan Cui The exception while compiling the generated source {code:java} cause=org.codehaus.commons.compiler.CompileException: Line 3383, Column 90: Instance method "default void org.apache.flink.formats.protobuf.deserialize.GeneratedProtoToRow_655d75db1cf943838f5500013edfba82.decodeImpl(foo.bar.LogData)" cannot be invoked in static context,{code} The original method header {code:java} public static RowData decode(foo.bar.LogData message){{code} The code after split {code:java} Line 3383: public static RowData decode(foo.bar.LogData message){ decodeImpl(message); return decodeReturnValue$0; } Line 3384: Line 3385: void decodeImpl(foo.bar.LogData message) {{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] shuiqiangchen commented on pull request #21897: [FLINK-30922][table-planner] Apply persisted columns when doing appendPartitionAndNu…
shuiqiangchen commented on PR #21897: URL: https://github.com/apache/flink/pull/21897#issuecomment-1426584980 @MartijnVisser Thank you for reminding. It worked after rebasing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347 ] Shuiqiang Chen edited comment on FLINK-31003 at 2/11/23 2:51 AM: - Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 |https://issues.apache.org/jira/browse/FLINK-30966], that when normalizing arguments in IfCallGen, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. was (Author: csq): Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 title|https://issues.apache.org/jira/browse/FLINK-30966], that when normalizing arguments in IfCallGen, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347 ] Shuiqiang Chen commented on FLINK-31003: Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 title|https://issues.apache.org/jira/browse/FLINK-30966], that when normalizing arguments in IfCallGen, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687346#comment-17687346 ] Andriy Redko commented on FLINK-30998: -- [~lilyevsky] thanks, you could send the pull request to [https://github.com/apache/flink-connector-opensearch/] and we could work it through. Any other options are more comfortable for you? Thanks! > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Priority: Major > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687343#comment-17687343 ] Leonid Ilyevsky commented on FLINK-30998: - [~reta] Thanks, please let me know how to submit my change. I can do it on Monday. > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Priority: Major > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java: ## @@ -46,17 +45,18 @@ private PulsarTransactionUtils() { /** Create transaction with given timeout millis. */ public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { try { -CompletableFuture future = -sneakyClient(pulsarClient::newTransaction) -.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) -.build(); - -return future.get(); +return pulsarClient +.newTransaction() +.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) +.build() +.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } catch (ExecutionException e) { throw new FlinkRuntimeException(unwrap(e)); +} catch (PulsarClientException e) { Review Comment: You are right. I changed it to Pulsar's built-in handle method. ```java public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) throws PulsarClientException { try { return pulsarClient .newTransaction() .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) .build() .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new PulsarClientException(e); } catch (Exception e) { throw PulsarClientException.unwrap(e); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java: ## @@ -46,17 +45,18 @@ private PulsarTransactionUtils() { /** Create transaction with given timeout millis. */ public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { try { -CompletableFuture future = -sneakyClient(pulsarClient::newTransaction) -.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) -.build(); - -return future.get(); +return pulsarClient +.newTransaction() +.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) +.build() +.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } catch (ExecutionException e) { throw new FlinkRuntimeException(unwrap(e)); +} catch (PulsarClientException e) { Review Comment: You are right. I changed it to Pulsar's built-in handle method. ``` public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) throws PulsarClientException { try { return pulsarClient .newTransaction() .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) .build() .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new PulsarClientException(e); } catch (Exception e) { throw PulsarClientException.unwrap(e); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31020) Read-only mode for Rest API
[ https://issues.apache.org/jira/browse/FLINK-31020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omkar Deshpande updated FLINK-31020: Summary: Read-only mode for Rest API (was: Provide read-only mode for flink web UI) > Read-only mode for Rest API > --- > > Key: FLINK-31020 > URL: https://issues.apache.org/jira/browse/FLINK-31020 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Affects Versions: 1.16.1 >Reporter: Omkar Deshpande >Priority: Major > > We run Flink jobs on application cluster on Kubernetes. We don't > submit/cancel or modify jobs from rest API or web UI. If there was an option > to enable only GET operations on the rest service, it would greatly solve the > problem of configuring access control and reduce the attack surface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31020) Provide read-only mode for flink web UI
Omkar Deshpande created FLINK-31020: --- Summary: Provide read-only mode for flink web UI Key: FLINK-31020 URL: https://issues.apache.org/jira/browse/FLINK-31020 Project: Flink Issue Type: New Feature Components: Runtime / REST Affects Versions: 1.16.1 Reporter: Omkar Deshpande We run Flink jobs on application cluster on Kubernetes. We don't submit/cancel or modify jobs from rest API or web UI. If there was an option to enable only GET operations on the rest service, it would greatly solve the problem of configuring access control and reduce the attack surface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] leeoo commented on pull request #13081: [FLINK-18590][json] Support json array explode to multi messages
leeoo commented on PR #13081: URL: https://github.com/apache/flink/pull/13081#issuecomment-1426566305 @libenchao Okay, but I don't have experience in improving Flink framework, it's better to review it by Flink framework contributors. I will keep an eye on it and test it with my kafka JSON array message data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28283) Improving the log of flink when job start and deploy
[ https://issues.apache.org/jira/browse/FLINK-28283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-28283. Resolution: Won't Do [~zlzhang0122], thanks for your reply. Closing this as won't do. > Improving the log of flink when job start and deploy > > > Key: FLINK-28283 > URL: https://issues.apache.org/jira/browse/FLINK-28283 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.14.2 >Reporter: zlzhang0122 >Priority: Major > > When running a large job with many operators and subtasks on flink, the > JobManager and TaskManager will have a huge logs about the subtask executing > msg such as "XXX switched from CREATED to SCHEDULED、XXX switched from > SCHEDULED to DEPLOYING 、XXX switched from DEPLOYING to RUNNING 、XXX switched > from RUNNING to CANCELING、XXX switched from CANCELING to CANCELED", etc. . > Maybe we can do some improvement about this, such as aggregate these msg to > reduce the log, or change the log level and only logs the failure msg and > subtask, etc. Not so sure about the solution, but these msg is really too > much. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103451369 ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicsConsumingContext.java: ## @@ -56,7 +57,11 @@ protected String subscriptionName() { @Override protected String generatePartitionName() { String topic = topicPrefix + index; -operator.createTopic(topic, 1); +try { +operator.createTopic(topic, 1); +} catch (Exception e) { Review Comment: I'll keep it unfixed in this PR until it get fixed in [FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014). ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java: ## @@ -55,7 +56,11 @@ public Sink createSink(TestingSinkSettings sinkSettings) { // Create the topic if it needs. if (creatTopic()) { for (String topic : topics) { -operator.createTopic(topic, 4); +try { +operator.createTopic(topic, 4); +} catch (Exception e) { Review Comment: I'll keep it unfixed in this PR until it get fixed in [FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.
syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103449916 ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java: ## @@ -80,7 +72,7 @@ import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; /** A pulsar cluster operator used for operating pulsar instance. */ -public class PulsarRuntimeOperator implements Closeable { +public class PulsarRuntimeOperator { Review Comment: Yep, this is a mistake from my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] libenchao commented on pull request #13081: [FLINK-18590][json] Support json array explode to multi messages
libenchao commented on PR #13081: URL: https://github.com/apache/flink/pull/13081#issuecomment-1426536898 @leeoo Thanks for your interest, I'll rebase the PR the latest master branch lately, would you please help to review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30757) Ugrade busybox version to a pinned version for operator
[ https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687321#comment-17687321 ] Shipeng Xie edited comment on FLINK-30757 at 2/11/23 12:16 AM: --- The e2e [error|https://github.com/apache/flink-kubernetes-operator/actions/runs/4147827390/jobs/7175460340#step:9:1340] in busybox init container is {_}wget: error getting response: Connection reset by peer{_}. It looks like similar error is already reported as an [issue|https://github.com/docker-library/busybox/issues/162]. I pushed another commit to downgrade from 1.36.0 to 1.35.0 but the workflow run still needs approval. was (Author: JIRAUSER296422): The e2e [error|https://github.com/apache/flink-kubernetes-operator/actions/runs/4147827390/jobs/7175460340#step:9:1340] in busybox init container is `wget: error getting response: Connection reset by peer`. It looks like similar error is already reported as an [issue|https://github.com/docker-library/busybox/issues/162]. I pushed another commit to downgrade from 1.36.0 to 1.35.0 but the workflow run still needs approval. > Ugrade busybox version to a pinned version for operator > --- > > Key: FLINK-30757 > URL: https://issues.apache.org/jira/browse/FLINK-30757 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gabor Somogyi >Assignee: Shipeng Xie >Priority: Minor > Labels: pull-request-available, starter > > It has been seen that the operator e2e tests were flaky when used the latest > version of the busybox so we've pinned it to a relatively old version. > https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b > It would be good to do 2 things > * Upgrade the busybox version to the latest in a pinned way > * Add debug logs of the busybox pod in case of failure (to see why failed) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30757) Ugrade busybox version to a pinned version for operator
[ https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687321#comment-17687321 ] Shipeng Xie commented on FLINK-30757: - The e2e [error|https://github.com/apache/flink-kubernetes-operator/actions/runs/4147827390/jobs/7175460340#step:9:1340] in busybox init container is `wget: error getting response: Connection reset by peer`. It looks like similar error is already reported as an [issue|https://github.com/docker-library/busybox/issues/162]. I pushed another commit to downgrade from 1.36.0 to 1.35.0 but the workflow run still needs approval. > Ugrade busybox version to a pinned version for operator > --- > > Key: FLINK-30757 > URL: https://issues.apache.org/jira/browse/FLINK-30757 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gabor Somogyi >Assignee: Shipeng Xie >Priority: Minor > Labels: pull-request-available, starter > > It has been seen that the operator e2e tests were flaky when used the latest > version of the busybox so we've pinned it to a relatively old version. > https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b > It would be good to do 2 things > * Upgrade the busybox version to the latest in a pinned way > * Add debug logs of the busybox pod in case of failure (to see why failed) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] xshipeng commented on pull request #530: [FLINK-30757] Upgrade busybox version to a pinned version for operator
xshipeng commented on PR #530: URL: https://github.com/apache/flink-kubernetes-operator/pull/530#issuecomment-1426506352 It looks like busybox latest (1.36.0) is unstable -> https://github.com/docker-library/busybox/issues/162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687289#comment-17687289 ] Andriy Redko commented on FLINK-30998: -- [~martijnvisser] [~lilyevsky] sure, happy to help here > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Priority: Major > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30627) Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-30627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687287#comment-17687287 ] Martijn Visser commented on FLINK-30627: [~tanyuxin] Could you elaborate on how you would like to solve this problem? So that some of the other maintainers can see if that's a good approach to solving this. > Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink > - > > Key: FLINK-30627 > URL: https://issues.apache.org/jira/browse/FLINK-30627 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > {{FileSystemTableSink}} currently depends on most of the capabilities from > {{StreamingFileSink}}, for example > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243 > This is necessary to complete FLINK-28641 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30757) Ugrade busybox version to a pinned version for operator
[ https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687284#comment-17687284 ] Shipeng Xie commented on FLINK-30757: - Hi [~gyfora] [~gaborgsomogyi], I added the code to print init container log, tested locally and opened a draft MR to trigger Github CI. However, it is said that {{First-time contributors need a maintainer to approve running workflows}}. Could you please help approve? Thanks! > Ugrade busybox version to a pinned version for operator > --- > > Key: FLINK-30757 > URL: https://issues.apache.org/jira/browse/FLINK-30757 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gabor Somogyi >Assignee: Shipeng Xie >Priority: Minor > Labels: pull-request-available, starter > > It has been seen that the operator e2e tests were flaky when used the latest > version of the busybox so we've pinned it to a relatively old version. > https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b > It would be good to do 2 things > * Upgrade the busybox version to the latest in a pinned way > * Add debug logs of the busybox pod in case of failure (to see why failed) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30757) Ugrade busybox version to a pinned version for operator
[ https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30757: --- Labels: pull-request-available starter (was: starter) > Ugrade busybox version to a pinned version for operator > --- > > Key: FLINK-30757 > URL: https://issues.apache.org/jira/browse/FLINK-30757 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gabor Somogyi >Assignee: Shipeng Xie >Priority: Minor > Labels: pull-request-available, starter > > It has been seen that the operator e2e tests were flaky when used the latest > version of the busybox so we've pinned it to a relatively old version. > https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b > It would be good to do 2 things > * Upgrade the busybox version to the latest in a pinned way > * Add debug logs of the busybox pod in case of failure (to see why failed) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] xshipeng opened a new pull request, #530: [FLINK-30757] Upgrade busybox version to a pinned version for operator
xshipeng opened a new pull request, #530: URL: https://github.com/apache/flink-kubernetes-operator/pull/530 ## What is the purpose of the change Since [e2e test](https://github.com/apache/flink-kubernetes-operator/actions/runs/389690/jobs/6662337363) is flaky with latest busybox image in init container, we [pinned] (https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b) the image version to a relatively old version. ## Brief change log - Add log printing for init container if e2e test is failed. - Upgrade the busybox version to the latest in a pinned way. ## Verifying this change This change is already covered by existing tests, such as `e2e-tests/test_application_kubernetes_ha.sh`. To reproduce the failed tests locally and test modified init container log printing code: - Change the init container command in `e2e-tests/data/flinkdep-cr.yaml` to `command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar', '-O', '/flink-artifact/myjob.jar' ]`. - Run `./e2e-tests/test_application_kubernetes_ha.sh`. - Part of the print log will be ``` ... Flink logs: Current logs for flink-example-statemachine-dddb4d664-m6n5b:artifacts-fetcher: Connecting to repo1.maven.org (198.18.3.208:443) wget: note: TLS certificate validation not implemented saving to '/flink-artifact/myjob.jar' myjob.jar100% || 267k 0:00:00 ETA '/flink-artifact/myjob.jar' saved test error msg Previous logs for flink-example-statemachine-dddb4d664-m6n5b:artifacts-fetcher: Connecting to repo1.maven.org (198.18.3.208:443) wget: note: TLS certificate validation not implemented saving to '/flink-artifact/myjob.jar' myjob.jar100% || 267k 0:00:00 ETA '/flink-artifact/myjob.jar' saved test error msg Current logs for flink-example-statemachine-dddb4d664-m6n5b:flink-main-container: Current logs for flink-kubernetes-operator-9744c66bd-27q2r:flink-kubernetes-operator: Starting Operator ... ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser commented on pull request #21897: [FLINK-30922][table-planner] Apply persisted columns when doing appendPartitionAndNu…
MartijnVisser commented on PR #21897: URL: https://github.com/apache/flink/pull/21897#issuecomment-1426325867 @shuiqiangchen You need to rebase your PR on the latest changes in `master` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687276#comment-17687276 ] Martijn Visser commented on FLINK-30998: [~reta] WDYT? > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Priority: Major > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'
[ https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-30971: -- Assignee: Yunhong Zheng > Modify the default value of parameter > 'table.exec.local-hash-agg.adaptive.sampling-threshold' > - > > Key: FLINK-30971 > URL: https://issues.apache.org/jira/browse/FLINK-30971 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Assignee: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > In our test environment, we set the default parallelism to 1 and got the > most appropriate default value of parameter > 'table.exec.local-hash-agg.adaptive.sampling-threshold' is 500. However, > for these batch jobs with high parallelism in produce environment, the > amount of data in single parallelism is almost less than 500. Therefore, > after testing, we found that set to 50 can get better results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'
[ https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30971: --- Fix Version/s: 1.17.0 > Modify the default value of parameter > 'table.exec.local-hash-agg.adaptive.sampling-threshold' > - > > Key: FLINK-30971 > URL: https://issues.apache.org/jira/browse/FLINK-30971 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > In our test environment, we set the default parallelism to 1 and got the > most appropriate default value of parameter > 'table.exec.local-hash-agg.adaptive.sampling-threshold' is 500. However, > for these batch jobs with high parallelism in produce environment, the > amount of data in single parallelism is almost less than 500. Therefore, > after testing, we found that set to 50 can get better results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'
[ https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30971: --- Fix Version/s: (was: 1.17.0) > Modify the default value of parameter > 'table.exec.local-hash-agg.adaptive.sampling-threshold' > - > > Key: FLINK-30971 > URL: https://issues.apache.org/jira/browse/FLINK-30971 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Priority: Major > Labels: pull-request-available > > In our test environment, we set the default parallelism to 1 and got the > most appropriate default value of parameter > 'table.exec.local-hash-agg.adaptive.sampling-threshold' is 500. However, > for these batch jobs with high parallelism in produce environment, the > amount of data in single parallelism is almost less than 500. Therefore, > after testing, we found that set to 50 can get better results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31006) job is not finished when using pipeline mode to run bounded source like kafka/pulsar
[ https://issues.apache.org/jira/browse/FLINK-31006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687275#comment-17687275 ] Martijn Visser commented on FLINK-31006: [~renqs] WDYT? > job is not finished when using pipeline mode to run bounded source like > kafka/pulsar > > > Key: FLINK-31006 > URL: https://issues.apache.org/jira/browse/FLINK-31006 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: jackylau >Assignee: jackylau >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: image-2023-02-10-13-20-52-890.png, > image-2023-02-10-13-23-38-430.png, image-2023-02-10-13-24-46-929.png > > > when i do failover works like kill jm/tm when using pipeline mode to run > bounded source like kafka, i found job is not finished, when every partition > data has consumed. > > After dig into code, i found this logical not run when JM recover. the > partition infos are not changed. so noMoreNewPartitionSplits is not set to > true. then this will not run > > !image-2023-02-10-13-23-38-430.png! > > !image-2023-02-10-13-24-46-929.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31007) The code generated by the IF function throws NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687274#comment-17687274 ] Martijn Visser commented on FLINK-31007: [~lincoln.86xy] WDYT? > The code generated by the IF function throws NullPointerException > - > > Key: FLINK-31007 > URL: https://issues.apache.org/jira/browse/FLINK-31007 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime >Affects Versions: 1.15.2, 1.15.3 > Environment: {code:java} > // code placeholder > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > final DataStream tab = > env.fromCollection(Arrays.asList( > new Tuple2<>(1L, "a_b_c"), > new Tuple2<>(-1L, "a_b_c"))); > final Table tableA = tableEnv.fromDataStream(tab); > tableEnv.executeSql("SELECT if(f0 = -1, '', split_index(f1, '_', 0)) as id > FROM " + tableA) > .print(); {code} >Reporter: tivanli >Priority: Major > Attachments: StreamExecCalc$19.java, image-2023-02-10-17-20-51-619.png > > > Caused by: java.lang.NullPointerException > at StreamExecCalc$19.processElement_split1(Unknown Source) > at StreamExecCalc$19.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > at > org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser commented on pull request #21630: [FLINK-30166][Connector/FileSystem] Refactor tests that use the deprecated StreamingFileSink instead of FileSink
MartijnVisser commented on PR #21630: URL: https://github.com/apache/flink/pull/21630#issuecomment-1426319887 @gaoyunhaii Can you take one more look? You've already reviewed the original PR, which missed one thing that is now addressed in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30627) Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-30627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30627: --- Description: {{FileSystemTableSink}} currently depends on most of the capabilities from {{StreamingFileSink}}, for example https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243 This is necessary to complete FLINK-28641 was:In order to be able to remove the StreamingFileSink, the FileSystemTableSink needs to be refactored to no longer depend on StreamingFileSink but on FileSink > Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink > - > > Key: FLINK-30627 > URL: https://issues.apache.org/jira/browse/FLINK-30627 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > {{FileSystemTableSink}} currently depends on most of the capabilities from > {{StreamingFileSink}}, for example > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243 > This is necessary to complete FLINK-28641 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31019) Migrate FileSystemTableSink to FileSink
[ https://issues.apache.org/jira/browse/FLINK-31019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-31019. -- Resolution: Duplicate > Migrate FileSystemTableSink to FileSink > --- > > Key: FLINK-31019 > URL: https://issues.apache.org/jira/browse/FLINK-31019 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > {{FileSystemTableSink}} currently depends on most of the capabilities from > {{StreamingFileSink}}, for example > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243 > This is necessary to complete FLINK-28641 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31019) Migrate FileSystemTableSink to FileSink
Martijn Visser created FLINK-31019: -- Summary: Migrate FileSystemTableSink to FileSink Key: FLINK-31019 URL: https://issues.apache.org/jira/browse/FLINK-31019 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Martijn Visser {{FileSystemTableSink}} currently depends on most of the capabilities from {{StreamingFileSink}}, for example https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243 This is necessary to complete FLINK-28641 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
reta commented on code in PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103191251 ## .github/workflows/push_pr.yml: ## @@ -25,6 +25,6 @@ jobs: compile_and_test: uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: - flink_version: 1.16.0 - flink_url: https://dist.apache.org/repos/dist/release/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz + flink_version: 1.16.1 + flink_url: https://dist.apache.org/repos/dist/release/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz Review Comment: (addressed in https://github.com/apache/flink-connector-opensearch/pull/7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
reta commented on PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#issuecomment-1426283511 Thanks @dannycranmer , I think I went through all your comments, thanks a lot, really appreciate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] dannycranmer commented on pull request #7: [hotfix] Update Apache Flink to 1.16.1
dannycranmer commented on PR #7: URL: https://github.com/apache/flink-connector-opensearch/pull/7#issuecomment-1426268450 Thanks @reta -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] dannycranmer merged pull request #7: [hotfix] Update Apache Flink to 1.16.1
dannycranmer merged PR #7: URL: https://github.com/apache/flink-connector-opensearch/pull/7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21911: [FLINK-19065] [runtime] Generate unique and consistent UID for internal Map operators used in CoGroupedStreams
flinkbot commented on PR #21911: URL: https://github.com/apache/flink/pull/21911#issuecomment-1426267913 ## CI report: * cee862b208cfd3674f6cbcd64d07e7f7c871253b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19065) java.lang.IllegalStateException: Auto generated UIDs have been disabled on join
[ https://issues.apache.org/jira/browse/FLINK-19065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19065: --- Labels: auto-deprioritized-major auto-deprioritized-minor pull-request-available (was: auto-deprioritized-major auto-deprioritized-minor) > java.lang.IllegalStateException: Auto generated UIDs have been disabled on > join > --- > > Key: FLINK-19065 > URL: https://issues.apache.org/jira/browse/FLINK-19065 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.0, 1.11.1 >Reporter: Maris >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > Join operation with AutoGeneratedUID disabled leads to > {code:java} > java.lang.IllegalStateException: Auto generated UIDs have been disabled but > no UID or hash has been assigned to operator Map > {code} > code to reproduce > {code:java} > class JoinSpec extends AnyFlatSpec with Matchers with Serializable { > it should "be able to join streams" in { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.getConfig.disableAutoGeneratedUIDs() > val a = env.fromCollection(List("1", "2", > "3")).name("a").uid("source-uid") > val b = env.fromCollection(List("1", "2", > "3")).name("b").uid("source-uid2") > val c = a > .join(b) > .where(identity) > .equalTo(identity) > .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))((a, b) => > a+b) > .uid("joined").name("joined") > c.addSink(s => println(s)) > .name("ab") > .uid("ab") > println(env.getExecutionPlan) > env.execute > succeed > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] hmit opened a new pull request, #21911: [FLINK-19065] [runtime] Generate unique and consistent UID for internal Map operators used in CoGroupedStreams
hmit opened a new pull request, #21911: URL: https://github.com/apache/flink/pull/21911 ## What is the purpose of the change This fixes issue when flink job using join/cogroup is run with autoGenerateUid disabled. With uid generation disabled, the input -> map-for-union throws a missing Uid exception from StreamGraphUtils. This PR fixes the issue by assigning a unique and consistent uid based on input operator's uid for consistency. ## Brief change log - Generate unique and consistent UID for internal Map operators used in CoGroupedStreams ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mxm commented on a diff in pull request #21908: [FLINK-30895][coordination] Dynamically adjust slot distribution
mxm commented on code in PR #21908: URL: https://github.com/apache/flink/pull/21908#discussion_r1103096757 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java: ## @@ -133,7 +138,56 @@ public Optional determineParallelism( return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments)); } -private static Map determineParallelism( +/** + * Distributes free slots across the slot-sharing groups of the job. Slots are distributed as + * evenly as possible while taking the minimum parallelism of contained vertices into account. + */ +private static Map determineSlotsPerSharingGroup( +JobInformation jobInformation, int freeSlots) { +int numUnassignedSlots = freeSlots; +int numUnassignedSlotSharingGroups = jobInformation.getSlotSharingGroups().size(); + +final Map slotSharingGroupParallelism = new HashMap<>(); + +for (Tuple2 slotSharingGroup : +sortSlotSharingGroupsByUpperParallelism(jobInformation)) { +final int groupParallelism = +Math.min( +slotSharingGroup.f1, +numUnassignedSlots / numUnassignedSlotSharingGroups); Review Comment: That said, all of this is not set in stone. We can probably add a mode to the adaptive scheduler which disables downscaling in combination with pre-requesting the right amount of resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mxm commented on a diff in pull request #21908: [FLINK-30895][coordination] Dynamically adjust slot distribution
mxm commented on code in PR #21908: URL: https://github.com/apache/flink/pull/21908#discussion_r1103092025 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java: ## @@ -133,7 +138,56 @@ public Optional determineParallelism( return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments)); } -private static Map determineParallelism( +/** + * Distributes free slots across the slot-sharing groups of the job. Slots are distributed as + * evenly as possible while taking the minimum parallelism of contained vertices into account. + */ +private static Map determineSlotsPerSharingGroup( +JobInformation jobInformation, int freeSlots) { +int numUnassignedSlots = freeSlots; +int numUnassignedSlotSharingGroups = jobInformation.getSlotSharingGroups().size(); + +final Map slotSharingGroupParallelism = new HashMap<>(); + +for (Tuple2 slotSharingGroup : +sortSlotSharingGroupsByUpperParallelism(jobInformation)) { +final int groupParallelism = +Math.min( +slotSharingGroup.f1, +numUnassignedSlots / numUnassignedSlotSharingGroups); Review Comment: Sorry for the meta comment but I'm looking at this from the perspective of a future Rescale API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mxm commented on a diff in pull request #21908: [FLINK-30895][coordination] Dynamically adjust slot distribution
mxm commented on code in PR #21908: URL: https://github.com/apache/flink/pull/21908#discussion_r1103091640 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java: ## @@ -133,7 +138,56 @@ public Optional determineParallelism( return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments)); } -private static Map determineParallelism( +/** + * Distributes free slots across the slot-sharing groups of the job. Slots are distributed as + * evenly as possible while taking the minimum parallelism of contained vertices into account. + */ +private static Map determineSlotsPerSharingGroup( +JobInformation jobInformation, int freeSlots) { +int numUnassignedSlots = freeSlots; +int numUnassignedSlotSharingGroups = jobInformation.getSlotSharingGroups().size(); + +final Map slotSharingGroupParallelism = new HashMap<>(); + +for (Tuple2 slotSharingGroup : +sortSlotSharingGroupsByUpperParallelism(jobInformation)) { +final int groupParallelism = +Math.min( +slotSharingGroup.f1, +numUnassignedSlots / numUnassignedSlotSharingGroups); Review Comment: That is terrible from an autoscaler perspective, as we want full control over the scaling. We never want the adaptive scheduler to reduce the parallelism in any way. Instead, we provide the spec and the scheduler has to rescale the job safely. It either fulfils the rescaling request, or it does nothing. We do not have unlimited retries for rescaling. Rescaling is costly especially for stateful workloads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
reta commented on code in PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103088811 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContexts; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Flink's Async Sink Writer to insert or update data in an Opensearch index (see please + * {@link OpensearchAsyncSink}). + * + * @param type of the records converted to Opensearch actions (instances of {@link + * DocSerdeRequest}) + */ +@Internal +class OpensearchAsyncWriter extends AsyncSinkWriter> { +private static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncWriter.class); + +private final RestHighLevelClient client; +private final Counter numRecordsOutErrorsCounter; +private volatile boolean closed = false; + +private static final FatalExceptionClassifier OPENSEARCH_FATAL_EXCEPTION_CLASSIFIER = +FatalExceptionClassifier.createChain( +new FatalExceptionClassifier( +err -> +err instanceof NoRouteToHostException +|| err instanceof ConnectException, +err -> +new OpenSearchException( +"Could not connect to Opensearch cluster using provided hosts", +err))); + +/** + * Constructor creating an Opensearch async writer. + * + * @param context the initialization context + * @param elementConverter converting incoming records to Opensearch write document requests + * @param maxBatchSize the maximum size of a batch of entries that may be sent + * @param maxInFlightRequests he maximum number of in flight requests that may exist, if any + * more in flight requests need to be initiated once the maximum has been reached, then it + * will be blocked until some have completed + * @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add + * elements will be
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
reta commented on code in PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103068913 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable}, + * required by AsyncSink scaffolding. + * + * @param type of the write request + */ +@PublicEvolving +public class DocSerdeRequest implements Serializable { Review Comment: ~This is my bad, the `T` must be constrained, I will fix it~ Removed `T`, not necessary indeed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #529: [FLINK-30776] Move autoscaler code into a separate Maven module
mxm commented on code in PR #529: URL: https://github.com/apache/flink-kubernetes-operator/pull/529#discussion_r1103083946 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; +import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; + +import com.google.auto.service.AutoService; +import io.fabric8.kubernetes.client.KubernetesClient; + +/** + * Factory for loading JobAutoScalerImpl included in this module. This class will be dynamically + * instantiated by the main operator module. + */ +@AutoService(JobAutoScalerFactory.class) Review Comment: This is still required. All plugins have to declare a META-INF/service file which this annotation and the auto-service plugin does automatically. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -468,6 +468,16 @@ protected void setOwnerReference(CR owner, Configuration deployConfig) { KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, List.of(ownerReference)); } +private JobAutoScaler loadJobAutoscaler() { +for (JobAutoScalerFactory factory : ServiceLoader.load(JobAutoScalerFactory.class)) { +LOG.info("Loading JobAutoScaler implementation: {}", factory.getClass().getName()); +return factory.create(kubernetesClient, eventRecorder); +} +LOG.info("No JobAutoscaler implementation found. Autoscaling is disabled."); +return new NoopJobAutoscaler(); Review Comment: See the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #529: [FLINK-30776] Move autoscaler code into a separate Maven module
mxm commented on code in PR #529: URL: https://github.com/apache/flink-kubernetes-operator/pull/529#discussion_r1103083946 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; +import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; + +import com.google.auto.service.AutoService; +import io.fabric8.kubernetes.client.KubernetesClient; + +/** + * Factory for loading JobAutoScalerImpl included in this module. This class will be dynamically + * instantiated by the main operator module. + */ +@AutoService(JobAutoScalerFactory.class) Review Comment: This is still required. All plugins have to declare a META-INF/service file which this annotation does automatically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
reta commented on code in PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103068913 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable}, + * required by AsyncSink scaffolding. + * + * @param type of the write request + */ +@PublicEvolving +public class DocSerdeRequest implements Serializable { Review Comment: This is my bad, the `T` must be constrained, I will fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
reta commented on code in PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103066160 ## .github/workflows/push_pr.yml: ## @@ -25,4 +25,4 @@ jobs: compile_and_test: uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: - flink_version: 1.16.0 + flink_version: 1.16.1 Review Comment: @dannycranmer https://github.com/apache/flink-connector-opensearch/pull/7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1101701294 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitState.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import com.datastax.driver.core.ResultSet; + +import javax.annotation.Nullable; + +import java.math.BigInteger; + +/** + * Mutable {@link CassandraSplit} that keeps track of the reading process of the associated split. + */ +public class CassandraSplitState { +private final CassandraSplit cassandraSplit; +// Cassandra ResultSet is paginated, a new page is read only if all the records of the previous +// one were consumed. fetch() can be interrupted so we use the resultSet to keep track of the +// reading process. +// It is null when reading has not started (before fetch is called on the split). +@Nullable private ResultSet resultSet; Review Comment: As discussed in the other comment ResultSet is just a handle so the status of the read will not be part of the checkpoint leading to a re-read of the already output data indeed. The only way is to manage the memory size of the split at the enumerator level and either output all the split or not a all. That way in case of interrupted fetch nothing will be output and the split could be read again from the beginning after recovery leading to no duplicates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1101601871 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s based on Cassandra cluster partitioner and Flink + * source parallelism. + */ +public final class SplitsGenerator { + +private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + +private final CassandraPartitioner partitioner; + +public SplitsGenerator(CassandraPartitioner partitioner) { +this.partitioner = partitioner; +} + +/** + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the ring. + * + * @param numSplits requested number of splits + * @return list containing {@code numSplits} CassandraSplits. + */ +public List generateSplits(long numSplits) { +if (numSplits == 1) { +return Collections.singletonList( +new CassandraSplit(partitioner.minToken(), partitioner.maxToken())); +} +List splits = new ArrayList<>(); +BigInteger splitSize = +(partitioner.ringSize()).divide(new BigInteger(String.valueOf(numSplits))); + +BigInteger startToken, endToken = partitioner.minToken(); +for (int splitCount = 1; splitCount <= numSplits; splitCount++) { +startToken = endToken; +endToken = startToken.add(splitSize); +if (splitCount == numSplits) { +endToken = partitioner.maxToken(); +} +splits.add(new CassandraSplit(startToken, endToken)); +} Review Comment: :+1: for the general change but regarding `splitCount == numSplits` it is to make sure the last split in the list covers the max token for division rounding. If I do this outside of the loop and add a final split it will be very small. I think it is better to extend the previous one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1100252874 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private final Cluster cluster; +private final Session session; +private final Set unprocessedSplits; +private final AtomicBoolean wakeup = new AtomicBoolean(false); +private final String query; + +public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { +this.unprocessedSplits = new HashSet<>(); +this.query = query; +cluster = clusterBuilder.getCluster(); +session = cluster.connect(); Review Comment: Yes I thought about that but the problem I had was when to close the session/cluster. If you prefer this solution, I can override `SourceReaderBase#close()` and close them there + create a factory to initialize cluster, session and mapper at the creation of SourceReader before the super(...) calll. Also, I find elegant the design of passing a map function to the emitter from the source reader. I'll do so -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta opened a new pull request, #7: [hotfix] Update Apache Flink to 1.16.1
reta opened a new pull request, #7: URL: https://github.com/apache/flink-connector-opensearch/pull/7 Update Apache Flink to 1.16.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31018) SQL Client -j option does not load user jars to classpath.
[ https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687220#comment-17687220 ] Martijn Visser commented on FLINK-31018: Great to hear that fixes it, thanks for updating it! > SQL Client -j option does not load user jars to classpath. > -- > > Key: FLINK-31018 > URL: https://issues.apache.org/jira/browse/FLINK-31018 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.16.1 >Reporter: Krzysztof Chmielewski >Priority: Minor > Attachments: image-2023-02-10-15-53-39-330.png, > image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png > > > SQL Client '-j' option does not load custom jars to classpath as it was for > example in Flink 1.15 > As a result Flink 1.16 SQL Client is not able to discover classes through > Flink's Factory discovery mechanism throwing an error like: > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Could not find any factories > that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in > the classpath. > {code} > The same Jar and sample job are working fine with Flink 1.15. > Flink 1.15.2 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-53-39-330.png! > Flink 1.16.1 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-54-32-537.png! > ADD JAR command does not solve " Could not find any factories" issue although > jar seems to be added: > !image-2023-02-10-16-05-12-407.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31018) SQL Client -j option does not load user jars to classpath.
[ https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687218#comment-17687218 ] Krzysztof Chmielewski edited comment on FLINK-31018 at 2/10/23 5:39 PM: [~martijnvisser] yes it seemt that this is the case. I;ve used DynamicTableFactory.Context#getClassLoader instead Thread.currentThread().getContextClassLoader() as suggested in one of the comments and it seems that problem disappeared. Thanks, Thicket can be closed. was (Author: kristoffsc): [~martijnvisser] yes it seemt that this is the case. I;ve used `DynamicTableFactory.Context#getClassLoader` instead `Thread.currentThread().getContextClassLoader()` as suggested in one of the comments and it seems that problem disappeared. Thanks, Thicket can be closed. > SQL Client -j option does not load user jars to classpath. > -- > > Key: FLINK-31018 > URL: https://issues.apache.org/jira/browse/FLINK-31018 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.16.1 >Reporter: Krzysztof Chmielewski >Priority: Minor > Attachments: image-2023-02-10-15-53-39-330.png, > image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png > > > SQL Client '-j' option does not load custom jars to classpath as it was for > example in Flink 1.15 > As a result Flink 1.16 SQL Client is not able to discover classes through > Flink's Factory discovery mechanism throwing an error like: > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Could not find any factories > that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in > the classpath. > {code} > The same Jar and sample job are working fine with Flink 1.15. > Flink 1.15.2 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-53-39-330.png! > Flink 1.16.1 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-54-32-537.png! > ADD JAR command does not solve " Could not find any factories" issue although > jar seems to be added: > !image-2023-02-10-16-05-12-407.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31018) SQL Client -j option does not load user jars to classpath.
[ https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krzysztof Chmielewski closed FLINK-31018. - Resolution: Not A Bug > SQL Client -j option does not load user jars to classpath. > -- > > Key: FLINK-31018 > URL: https://issues.apache.org/jira/browse/FLINK-31018 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.16.1 >Reporter: Krzysztof Chmielewski >Priority: Minor > Attachments: image-2023-02-10-15-53-39-330.png, > image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png > > > SQL Client '-j' option does not load custom jars to classpath as it was for > example in Flink 1.15 > As a result Flink 1.16 SQL Client is not able to discover classes through > Flink's Factory discovery mechanism throwing an error like: > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Could not find any factories > that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in > the classpath. > {code} > The same Jar and sample job are working fine with Flink 1.15. > Flink 1.15.2 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-53-39-330.png! > Flink 1.16.1 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-54-32-537.png! > ADD JAR command does not solve " Could not find any factories" issue although > jar seems to be added: > !image-2023-02-10-16-05-12-407.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31018) SQL Client -j option does not load user jars to classpath.
[ https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687218#comment-17687218 ] Krzysztof Chmielewski commented on FLINK-31018: --- [~martijnvisser] yes it seemt that this is the case. I;ve used `DynamicTableFactory.Context#getClassLoader` instead `Thread.currentThread().getContextClassLoader()` as suggested in one of the comments and it seems that problem disappeared. Thanks, Thicket can be closed. > SQL Client -j option does not load user jars to classpath. > -- > > Key: FLINK-31018 > URL: https://issues.apache.org/jira/browse/FLINK-31018 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.16.1 >Reporter: Krzysztof Chmielewski >Priority: Minor > Attachments: image-2023-02-10-15-53-39-330.png, > image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png > > > SQL Client '-j' option does not load custom jars to classpath as it was for > example in Flink 1.15 > As a result Flink 1.16 SQL Client is not able to discover classes through > Flink's Factory discovery mechanism throwing an error like: > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Could not find any factories > that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in > the classpath. > {code} > The same Jar and sample job are working fine with Flink 1.15. > Flink 1.15.2 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-53-39-330.png! > Flink 1.16.1 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-54-32-537.png! > ADD JAR command does not solve " Could not find any factories" issue although > jar seems to be added: > !image-2023-02-10-16-05-12-407.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] dannycranmer commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink
dannycranmer commented on code in PR #5: URL: https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1102989699 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable}, + * required by AsyncSink scaffolding. + * + * @param type of the write request + */ +@PublicEvolving +public class DocSerdeRequest implements Serializable { Review Comment: Ii think the class level generics are redundant here. We are using `` throughout. Consider changing `private final DocWriteRequest request;` to `private final DocWriteRequest request;` and removing class generics. This makes the Sink interface a bit messy `extends AsyncSinkBase>` ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.apache.http.HttpHost; +import org.opensearch.action.DocWriteRequest; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Builder to construct an Opensearch compatible {@link OpensearchAsyncSink}. + * + * The following example shows the minimal setup to create a OpensearchAsyncSink that submits + * actions with the default number of actions to buffer (1000). + * + * {@code + * OpensearchAsyncSink> sink = OpensearchAsyncSink + * .>builder() + * .setHosts(new HttpHost("localhost:9200") + * .setElementConverter((element, context) -> + * new IndexRequest("my-index").id(element.f0.toString()).source(element.f1)); + * .build(); + * } + * + * @param type of the records converted to Opensearch actions + */ +@PublicEvolving +public class OpensearchAsyncSinkBuilder +extends AsyncSinkBaseBuilder< +InputT, DocSerdeRequest, OpensearchAsyncSinkBuilder> { +private List hosts; +private String username; +private String password; +private String connectionPathPrefix; +private Integer connectionTimeout; +private Integer connectionRequestTimeout; +private Integer socketTimeout; +private Boolean
[GitHub] [flink] wanglijie95 commented on pull request #21813: [hotfix] Fix typo in elastic_scaling.md
wanglijie95 commented on PR #21813: URL: https://github.com/apache/flink/pull/21813#issuecomment-1426056540 Thanks @noahshpak @reswqa, merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687149#comment-17687149 ] weiqinpan edited comment on FLINK-31003 at 2/10/23 4:20 PM: The value of marketing_flow_id is not null, but the result of below sql is empty. So unbelievable. {code:java} SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM source; {code} was (Author: JIRAUSER298918): The field of marketing_flow_id is not null, but the result of below sql is empty. So unbelievable. {code:java} SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM source; {code} > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 merged pull request #21813: [hotfix] Fix typo in elastic_scaling.md
wanglijie95 merged PR #21813: URL: https://github.com/apache/flink/pull/21813 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687149#comment-17687149 ] weiqinpan commented on FLINK-31003: --- The field of marketing_flow_id is not null, but the result of below sql is empty. So unbelievable. {code:java} SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM source; {code} > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zekai-li commented on pull request #21675: [FLINK-30690][javadocs][spelling] Fix java documentation and some wor…
zekai-li commented on PR #21675: URL: https://github.com/apache/flink/pull/21675#issuecomment-1426034122 ![image](https://user-images.githubusercontent.com/58294989/218140853-182a708b-408a-497f-8765-9e1b8fa09d2b.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zekai-li commented on pull request #21675: [FLINK-30690][javadocs][spelling] Fix java documentation and some wor…
zekai-li commented on PR #21675: URL: https://github.com/apache/flink/pull/21675#issuecomment-1426032058 > The compile build failed. Could you fix it first? The compilation error has been fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29825) Improve benchmark stability
[ https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687143#comment-17687143 ] Piotr Nowojski commented on FLINK-29825: Yes, that's a good idea :) > Improve benchmark stability > --- > > Key: FLINK-29825 > URL: https://issues.apache.org/jira/browse/FLINK-29825 > Project: Flink > Issue Type: Improvement > Components: Benchmarks >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > > Currently, regressions are detected by a simple script which may have false > positives and false negatives, especially for benchmarks with small absolute > values, small value changes would cause large percentage changes. see > [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136] > for details. > And all benchmarks are executed on one physical machine, it might happen that > hardware issues affect performance, like "[FLINK-18614] Performance > regression 2020.07.13". > > This ticket aims to improve the precision and recall of the regression-check > script. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687120#comment-17687120 ] Martijn Visser commented on FLINK-31003: [~lincoln.86xy] WDYT? > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31017) Early-started partial match timeout not yield completed matches
[ https://issues.apache.org/jira/browse/FLINK-31017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687118#comment-17687118 ] Martijn Visser commented on FLINK-31017: [~Juntao Hu] Are you committing to fixing this in 1.18? Else I'll remove the fixVersion for now, because that should only be set if indeed it's expected that it will make that version > Early-started partial match timeout not yield completed matches > --- > > Key: FLINK-31017 > URL: https://issues.apache.org/jira/browse/FLINK-31017 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Juntao Hu >Priority: Major > Fix For: 1.18.0 > > > Pattern example: > {code:java} > Pattern.begin("A").where(startsWith("a")).oneOrMore().consecutive().greedy() > .followedBy("B") > .where(count("A") > 2 ? startsWith("b") : startsWith("c")) > .within(Time.seconds(3));{code} > Sequence example, currently without any output: > a1 a2 a3 a4 c1 > When match[a3, a4, c1] completes, partial match[a1, a2, a3, a4] is earlier, > so NFA#processMatchesAccordingToSkipStrategy() won't give any result, which > is the expected behavior. However, when partial match[a1, a2, a3, a4] is > timed-out, completed match[a3, a4, c1] should be "freed" from NFAState to > output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31018) SQL Client -j option does not load user jars to classpath.
[ https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687117#comment-17687117 ] Martijn Visser commented on FLINK-31018: [~KristoffSC] Could this be related to FLINK-15635 ? > SQL Client -j option does not load user jars to classpath. > -- > > Key: FLINK-31018 > URL: https://issues.apache.org/jira/browse/FLINK-31018 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0, 1.16.1 >Reporter: Krzysztof Chmielewski >Priority: Minor > Attachments: image-2023-02-10-15-53-39-330.png, > image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png > > > SQL Client '-j' option does not load custom jars to classpath as it was for > example in Flink 1.15 > As a result Flink 1.16 SQL Client is not able to discover classes through > Flink's Factory discovery mechanism throwing an error like: > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Could not find any factories > that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in > the classpath. > {code} > The same Jar and sample job are working fine with Flink 1.15. > Flink 1.15.2 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-53-39-330.png! > Flink 1.16.1 > ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar > !image-2023-02-10-15-54-32-537.png! > ADD JAR command does not solve " Could not find any factories" issue although > jar seems to be added: > !image-2023-02-10-16-05-12-407.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29825) Improve benchmark stability
[ https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687116#comment-17687116 ] Dong Lin commented on FLINK-29825: -- Thanks [~Yanfei Lei] for the detailed evaluation results! Maybe we can write a blog together based on your evaluation results. > Improve benchmark stability > --- > > Key: FLINK-29825 > URL: https://issues.apache.org/jira/browse/FLINK-29825 > Project: Flink > Issue Type: Improvement > Components: Benchmarks >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > > Currently, regressions are detected by a simple script which may have false > positives and false negatives, especially for benchmarks with small absolute > values, small value changes would cause large percentage changes. see > [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136] > for details. > And all benchmarks are executed on one physical machine, it might happen that > hardware issues affect performance, like "[FLINK-18614] Performance > regression 2020.07.13". > > This ticket aims to improve the precision and recall of the regression-check > script. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31018) SQL Client -j option does not load user jars to classpath.
Krzysztof Chmielewski created FLINK-31018: - Summary: SQL Client -j option does not load user jars to classpath. Key: FLINK-31018 URL: https://issues.apache.org/jira/browse/FLINK-31018 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.16.1, 1.17.0 Reporter: Krzysztof Chmielewski Attachments: image-2023-02-10-15-53-39-330.png, image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png SQL Client '-j' option does not load custom jars to classpath as it was for example in Flink 1.15 As a result Flink 1.16 SQL Client is not able to discover classes through Flink's Factory discovery mechanism throwing an error like: {code:java} [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in the classpath. {code} The same Jar and sample job are working fine with Flink 1.15. Flink 1.15.2 ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar !image-2023-02-10-15-53-39-330.png! Flink 1.16.1 ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar !image-2023-02-10-15-54-32-537.png! ADD JAR command does not solve " Could not find any factories" issue although jar seems to be added: !image-2023-02-10-16-05-12-407.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687111#comment-17687111 ] Leonid Ilyevsky commented on FLINK-30998: - This morning I successfully tested my solution. The changes overall are pretty minor. Please let's discuss so we can implement it. > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Priority: Major > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28911) Elasticsearch connector fails build
[ https://issues.apache.org/jira/browse/FLINK-28911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Niels Basjes closed FLINK-28911. Resolution: Won't Fix > Elasticsearch connector fails build > --- > > Key: FLINK-28911 > URL: https://issues.apache.org/jira/browse/FLINK-28911 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.15.1 >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > Labels: pull-request-available > > When I run the `mvn clean verify` of the ES connector some if the integration > tests fail. > Assesment so far: the SerializationSchema is not opened, triggering an NPE > later on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-18235) Improve the checkpoint strategy for Python UDF execution
[ https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-18235: -- Assignee: (was: Dian Fu) > Improve the checkpoint strategy for Python UDF execution > > > Key: FLINK-18235 > URL: https://issues.apache.org/jira/browse/FLINK-18235 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Priority: Not a Priority > Labels: auto-deprioritized-major, stale-assigned > > Currently, when a checkpoint is triggered for the Python operator, all the > data buffered will be flushed to the Python worker to be processed. This will > increase the overall checkpoint time in case there are a lot of elements > buffered and Python UDF is slow. We should improve the checkpoint strategy to > improve this. One way to implement this is to control the number of data > buffered in the pipeline between Java/Python processes, similar to what > [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment] > does to control the number of data buffered in the network. We can also let > users to config the checkpoint strategy if needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-18235) Improve the checkpoint strategy for Python UDF execution
[ https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-18235: --- Priority: Not a Priority (was: Major) > Improve the checkpoint strategy for Python UDF execution > > > Key: FLINK-18235 > URL: https://issues.apache.org/jira/browse/FLINK-18235 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Not a Priority > Labels: auto-deprioritized-major, stale-assigned > > Currently, when a checkpoint is triggered for the Python operator, all the > data buffered will be flushed to the Python worker to be processed. This will > increase the overall checkpoint time in case there are a lot of elements > buffered and Python UDF is slow. We should improve the checkpoint strategy to > improve this. One way to implement this is to control the number of data > buffered in the pipeline between Java/Python processes, similar to what > [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment] > does to control the number of data buffered in the network. We can also let > users to config the checkpoint strategy if needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18235) Improve the checkpoint strategy for Python UDF execution
[ https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687102#comment-17687102 ] Piotr Nowojski commented on FLINK-18235: [~dianfu], may I ask if you have considered implementing a snapshot strategy similar to one in the {{AsyncWaitOperator}}? Namely, # After serializing incoming records, keep them buffered in the Flink's operator memory (not on the state yet!) # If record has been successfully processed, remove it from the buffer. # If checkpoint happens ({{snapshotState}} call), just copy in-flight records from the buffer, to a {{ListState}} - no need to flush or wait for the in-flight records to finish processing. # During recovery, re process the records from the recovered {{ListState}}. > Improve the checkpoint strategy for Python UDF execution > > > Key: FLINK-18235 > URL: https://issues.apache.org/jira/browse/FLINK-18235 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: auto-deprioritized-major, stale-assigned > > Currently, when a checkpoint is triggered for the Python operator, all the > data buffered will be flushed to the Python worker to be processed. This will > increase the overall checkpoint time in case there are a lot of elements > buffered and Python UDF is slow. We should improve the checkpoint strategy to > improve this. One way to implement this is to control the number of data > buffered in the pipeline between Java/Python processes, similar to what > [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment] > does to control the number of data buffered in the network. We can also let > users to config the checkpoint strategy if needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29825) Improve benchmark stability
[ https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687098#comment-17687098 ] Piotr Nowojski commented on FLINK-29825: Thanks a lot for the very detailed comparison [~Yanfei Lei]. Let's go with the [~lindong]'s proposal! > Improve benchmark stability > --- > > Key: FLINK-29825 > URL: https://issues.apache.org/jira/browse/FLINK-29825 > Project: Flink > Issue Type: Improvement > Components: Benchmarks >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > > Currently, regressions are detected by a simple script which may have false > positives and false negatives, especially for benchmarks with small absolute > values, small value changes would cause large percentage changes. see > [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136] > for details. > And all benchmarks are executed on one physical machine, it might happen that > hardware issues affect performance, like "[FLINK-18614] Performance > regression 2020.07.13". > > This ticket aims to improve the precision and recall of the regression-check > script. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #19970: [FLINK-27970][tests][JUnit5 migration] flink-hadoop-bulk
XComp commented on code in PR #19970: URL: https://github.com/apache/flink/pull/19970#discussion_r1102841851 ## flink-formats/flink-hadoop-bulk/archunit-violations/db4de53e-d09e-4fb0-bdbc-429c1b64686f: ## @@ -0,0 +1,12 @@ +org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameCommitterHDFSITCase does not satisfy: only one of the following predicates match:\ Review Comment: Why is this added? It looks like there is some problem with the archunit tests and subclasses? :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31017) Early-started partial match timeout not yield completed matches
Juntao Hu created FLINK-31017: - Summary: Early-started partial match timeout not yield completed matches Key: FLINK-31017 URL: https://issues.apache.org/jira/browse/FLINK-31017 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.16.1, 1.15.3, 1.17.0 Reporter: Juntao Hu Fix For: 1.18.0 Pattern example: {code:java} Pattern.begin("A").where(startsWith("a")).oneOrMore().consecutive().greedy() .followedBy("B") .where(count("A") > 2 ? startsWith("b") : startsWith("c")) .within(Time.seconds(3));{code} Sequence example, currently without any output: a1 a2 a3 a4 c1 When match[a3, a4, c1] completes, partial match[a1, a2, a3, a4] is earlier, so NFA#processMatchesAccordingToSkipStrategy() won't give any result, which is the expected behavior. However, when partial match[a1, a2, a3, a4] is timed-out, completed match[a3, a4, c1] should be "freed" from NFAState to output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23016) Job client must be a Coordination Request Gateway when submit a job on web ui
[ https://issues.apache.org/jira/browse/FLINK-23016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687090#comment-17687090 ] Krzysztof Chmielewski commented on FLINK-23016: --- FYI, got the same error on Flink 1.16.1 > Job client must be a Coordination Request Gateway when submit a job on web ui > -- > > Key: FLINK-23016 > URL: https://issues.apache.org/jira/browse/FLINK-23016 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.13.1 > Environment: flink: 1.13.1 > flink-cdc: com.alibaba.ververica:flink-connector-postgres-cdc:1.4.0 > jdk:1.8 >Reporter: wen qi >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > auto-deprioritized-minor > Attachments: WechatIMG10.png, WechatIMG11.png, WechatIMG8.png > > > I used flink cdc to collect data,and use table api to transfer data and > write to another table. > That's all ritht when I run code in IDE and submit jar of jobs use cli, but > web ui > When I use StreamTableEnvironment.from('table-path').execute(), it's failed! > please check my attachments , it seems that a bug of web ui bug ? > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on pull request #21813: [hotfix] Fix typo in elastic_scaling.md
wanglijie95 commented on PR #21813: URL: https://github.com/apache/flink/pull/21813#issuecomment-1425854545 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29825) Improve benchmark stability
[ https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687082#comment-17687082 ] Yanfei Lei commented on FLINK-29825: [~pnowojski] I tried to use hunter to detect regression, and [here|https://docs.google.com/document/d/1coI4eJsauBtrlS1Z77bhGf-hNtDEXbzuwacG5ZPCMc8/edit?usp=sharing] are some evaluation results of the three algorithms. I'm not sure I fully understand the usage of hunter, it looks like hunter can only detect regressions in the history sequence, I modified it a little bit to detect regressions in the latest commit, correct me if something is wrong in the document:D. > Improve benchmark stability > --- > > Key: FLINK-29825 > URL: https://issues.apache.org/jira/browse/FLINK-29825 > Project: Flink > Issue Type: Improvement > Components: Benchmarks >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > > Currently, regressions are detected by a simple script which may have false > positives and false negatives, especially for benchmarks with small absolute > values, small value changes would cause large percentage changes. see > [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136] > for details. > And all benchmarks are executed on one physical machine, it might happen that > hardware issues affect performance, like "[FLINK-18614] Performance > regression 2020.07.13". > > This ticket aims to improve the precision and recall of the regression-check > script. > -- This message was sent by Atlassian Jira (v8.20.10#820010)