[jira] [Updated] (FLINK-11257) FlinkKafkaConsumer should support assgin partition
[ https://issues.apache.org/jira/browse/FLINK-11257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shengjk1 updated FLINK-11257: - Priority: Major (was: Minor) > FlinkKafkaConsumer should support assgin partition > --- > > Key: FLINK-11257 > URL: https://issues.apache.org/jira/browse/FLINK-11257 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Affects Versions: 1.7.1 >Reporter: shengjk1 >Priority: Major > > i find flink 1.7 also has universal Kafka connector ,if the kakfa-connector > support assgin partition ,the the kakfa-connector should prefect. such as a > kafka topci has 3 partition, i only use 1 partition,but i should read all > partition then filter.this method Not only waste resources but also > relatively low efficiency.so i suggest FlinkKafkaConsumer should support > assgin partition -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11257) FlinkKafkaConsumer should support assgin partition
shengjk1 created FLINK-11257: Summary: FlinkKafkaConsumer should support assgin partition Key: FLINK-11257 URL: https://issues.apache.org/jira/browse/FLINK-11257 Project: Flink Issue Type: New Feature Components: Kafka Connector Affects Versions: 1.7.1 Reporter: shengjk1 i find flink 1.7 also has universal Kafka connector ,if the kakfa-connector support assgin partition ,the the kakfa-connector should prefect. such as a kafka topci has 3 partition, i only use 1 partition,but i should read all partition then filter.this method Not only waste resources but also relatively low efficiency.so i suggest FlinkKafkaConsumer should support assgin partition -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11226) Lack of getKeySelector in Scala KeyedStream API unlike Java KeyedStream
[ https://issues.apache.org/jira/browse/FLINK-11226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732728#comment-16732728 ] vinoyang edited comment on FLINK-11226 at 1/3/19 7:38 AM: -- [~Zentol] Before implementing, make sure that you approve the solution to add the {{getKeySelector}} method to {{KeyedStream.scala}}? cc [~till.rohrmann] was (Author: yanghua): [~Zentol] Before implementing, make sure that you approve the solution to add the {{getKeySelector}} method to {{KeyedStream.scala}}? > Lack of getKeySelector in Scala KeyedStream API unlike Java KeyedStream > --- > > Key: FLINK-11226 > URL: https://issues.apache.org/jira/browse/FLINK-11226 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.7.1 >Reporter: Albert Bikeev >Assignee: vinoyang >Priority: Major > > There is no simple way to access key via Scala KeyedStream API because there > is no > getKeySelector method, unlike in Java KeyedStream. > Temporary workarounds are appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11226) Lack of getKeySelector in Scala KeyedStream API unlike Java KeyedStream
[ https://issues.apache.org/jira/browse/FLINK-11226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732728#comment-16732728 ] vinoyang commented on FLINK-11226: -- [~Zentol] Before implementing, make sure that you approve the solution to add the {{getKeySelector}} method to {{KeyedStream.scala}}? > Lack of getKeySelector in Scala KeyedStream API unlike Java KeyedStream > --- > > Key: FLINK-11226 > URL: https://issues.apache.org/jira/browse/FLINK-11226 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.7.1 >Reporter: Albert Bikeev >Assignee: vinoyang >Priority: Major > > There is no simple way to access key via Scala KeyedStream API because there > is no > getKeySelector method, unlike in Java KeyedStream. > Temporary workarounds are appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xueyumusic commented on a change in pull request #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT
xueyumusic commented on a change in pull request #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT URL: https://github.com/apache/flink/pull/6445#discussion_r244925100 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala ## @@ -74,6 +74,238 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { "true") } + @Test + def testShiftLeft(): Unit = { +testAllApis( + 3.shiftLeft(3), + "3.shiftLeft(3)", + "SHIFTLEFT(3,3)", + "24" +) + +testAllApis( + 2147483647.shiftLeft(-2147483648), + "2147483647.shiftLeft(-2147483648)", + "SHIFTLEFT(2147483647,-2147483648)", + "2147483647" +) + +testAllApis( + -2147483648.shiftLeft(2147483647), + "-2147483648.shiftLeft(2147483647)", + "SHIFTLEFT(-2147483648,2147483647)", + "0" +) + +testAllApis( + 9223372036854775807L.shiftLeft(-2147483648), + "9223372036854775807L.shiftLeft(-2147483648)", + "SHIFTLEFT(9223372036854775807,-2147483648)", + "9223372036854775807" +) + +testAllApis( + 'f3.shiftLeft(5), + "f3.shiftLeft(5)", + "SHIFTLEFT(f3,5)", + "32" +) + +testAllApis( + 1.shiftLeft(Null(Types.INT)), + "1.shiftLeft(Null(INT))", + "SHIFTLEFT(1, CAST(NULL AS INT))", + "null" +) + +testAllApis( // test tinyint + 'f0.shiftLeft(20), + "f0.shiftLeft(20)", + "SHIFTLEFT(CAST(1 AS TINYINT), 20)", + "1048576" Review comment: Thank you verfy much for review and suggestions, @pnowojski , I treated `TINYINT` as `INT` before fowllowing the behavior of [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-MathematicalFunctions). After looking into the pg9.6 behavior you pointed I think pg9.6's way is interpretable and reasonable. I updated the code according to your feedback, thanks~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732725#comment-16732725 ] vinoyang commented on FLINK-11249: -- [~pnowojski] What do you think we define a new class in the "flink-connector-kafka-base" module and name it {{UniversalNextTransactionalIdHint}}, all versions of the producer depend on it, is this feasible? cc [~tzulitai] > FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer > --- > > Key: FLINK-11249 > URL: https://issues.apache.org/jira/browse/FLINK-11249 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.7.0, 1.7.1 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Blocker > Fix For: 1.7.2, 1.8.0 > > > As reported by a user on the mailing list "How to migrate Kafka Producer ?" > (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to > {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka > producer versions/refactorings. > The issue is that {{ListState > FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using > java serializers and this is causing problems/collisions on > {{FlinkKafkaProducer011.NextTransactionalIdHint}} vs > {{FlinkKafkaProducer.NextTransactionalIdHint}}. > To fix that we probably need to release new versions of those classes, that > will rewrite/upgrade this state field to a new one, that doesn't relay on > java serialization. After this, we could drop the support for the old field > and that in turn will allow users to upgrade from 0.11 connector to the > universal one. > One bright side is that technically speaking our {{FlinkKafkaProducer011}} > has the same compatibility matrix as the universal one (it's also forward & > backward compatible with the same Kafka versions), so for the time being > users can stick to {{FlinkKafkaProducer011}}. > FYI [~tzulitai] [~yanghua] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eaglewatcherwb commented on issue #7378: [FLINK-11232][Webfrontend] Fix empty Start Time of sub-task on web da…
eaglewatcherwb commented on issue #7378: [FLINK-11232][Webfrontend] Fix empty Start Time of sub-task on web da… URL: https://github.com/apache/flink/pull/7378#issuecomment-451069521 > We will have to duplicate the field instead under the new name, in case anyone wrote code against the bugged version. Thanks for the comments, both correct and bugged field names are now kept in the new PR to be compatible with the old version. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732711#comment-16732711 ] jiwei commented on FLINK-11230: --- [~hequn8128],I want to get real-time results and ContinuousEventTimeTrigger is suitable.However by using ContinuousEventTimeTrigger,it will change the result.Is there a good way to resolve the problem? > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11256) Referencing StreamNode objects directly in StreamEdge causes the sizes of JobGraph and TDD to become unnecessarily large
Haibo Suen created FLINK-11256: -- Summary: Referencing StreamNode objects directly in StreamEdge causes the sizes of JobGraph and TDD to become unnecessarily large Key: FLINK-11256 URL: https://issues.apache.org/jira/browse/FLINK-11256 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.7.1, 1.7.0 Reporter: Haibo Suen Assignee: Haibo Suen When a job graph is generated from StreamGraph, StreamEdge(s) on the stream graph are serialized to StreamConfig and stored into the job graph. After that, the serialized bytes will be included in the TDD and distributed to TM. Because StreamEdge directly reference to StreamNode objects including sourceVertex and targetVertex, these objects are also written transitively on serializing StreamEdge. But these StreamNode objects are not needed at runtime. For a large size topology, this will causes JobGraph/TDD to become much larger than that actually need, and more likely to occur rpc timeout when transmitted. In Streamedge, only the ID of StreamNode should be stored to avoid this situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732702#comment-16732702 ] jiwei commented on FLINK-11230: --- [~hequn8128],I have changed EventTimeTrigger to ContinuousEventTimeTrigger. Through testing, we found that it's useful to getting the correct __ result for a single stream.Why it's change the result by using union all and sum? > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732702#comment-16732702 ] jiwei edited comment on FLINK-11230 at 1/3/19 6:30 AM: --- [~hequn8128],I have changed EventTimeTrigger to ContinuousEventTimeTrigger.I think it just changed the fire time. Through testing, we found that it's useful to getting the correct __ result for a single stream.Why it changes the result by using union all and sum? was (Author: jiweiautohome): [~hequn8128],I have changed EventTimeTrigger to ContinuousEventTimeTrigger.I think it just changed the fire time. Through testing, we found that it's useful to getting the correct __ result for a single stream.Why it's change the result by using union all and sum? > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732702#comment-16732702 ] jiwei edited comment on FLINK-11230 at 1/3/19 6:29 AM: --- [~hequn8128],I have changed EventTimeTrigger to ContinuousEventTimeTrigger.I think it just changed the fire time. Through testing, we found that it's useful to getting the correct __ result for a single stream.Why it's change the result by using union all and sum? was (Author: jiweiautohome): [~hequn8128],I have changed EventTimeTrigger to ContinuousEventTimeTrigger. Through testing, we found that it's useful to getting the correct __ result for a single stream.Why it's change the result by using union all and sum? > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11255) RemoteStreamEnvironment should be able to execute in detached mode
[ https://issues.apache.org/jira/browse/FLINK-11255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732691#comment-16732691 ] Benjamin Lee commented on FLINK-11255: -- [~zjffdu] great! will have a look. > RemoteStreamEnvironment should be able to execute in detached mode > -- > > Key: FLINK-11255 > URL: https://issues.apache.org/jira/browse/FLINK-11255 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: Benjamin Lee >Priority: Minor > > h1. Overview > Currently RemoteStreamEnvironment is not capable of submitting a job via > detached mode. > h1. Proposed Changes > * Modify the signature of > {code:java} > StreamExecutionEnvironment#createRemoteEnvironment(...) : > StreamExecutionEnvironment{code} > to > {code:java} > StreamExecutionEnvironment#createRemoteEnvironment(...) : > RemoteStreamEnvironment{code} > * Add an public overloaded _execute_ method in _RemoteStreamEnvironment_ > that allows detached executions. > There was a related Jira FLINK-6224 opened a while ago, but I don't think it > makes sense to change the abstract class _StreamExecutionEnvironment_ since > detached submission only applies to _RemoteStreamEnvironment_? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11253) Incorrect way to stop yarn session described in yarn_setup document
[ https://issues.apache.org/jira/browse/FLINK-11253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732688#comment-16732688 ] Jeff Zhang commented on FLINK-11253: I think this is a bug that needs to be fixed if ctrl+c doesn't kill the yarn app. > Incorrect way to stop yarn session described in yarn_setup document > --- > > Key: FLINK-11253 > URL: https://issues.apache.org/jira/browse/FLINK-11253 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Yang >Assignee: leesf >Priority: Minor > > There are two ways to stop yarn session described in yarn_setup document: > {noformat} > Stop the YARN session by stopping the unix process (using CTRL+C) or by > entering ‘stop’ into the client. > {noformat} > But in fact, yarn session application still can run after stopping the unix > process (using CTRL+C). > We can either update the yarn_setup document to remove this incorrect way or > add ShutdownHook to stop yarn session in FlinkYarnSessionCli to make it > correct. > Looking forward to the feedbacks and would like to work on this ticket. > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11255) RemoteStreamEnvironment should be able to execute in detached mode
[ https://issues.apache.org/jira/browse/FLINK-11255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732687#comment-16732687 ] Jeff Zhang commented on FLINK-11255: [~btleedev] Thanks for reporting this issue, community has some discussion on the flink client api enhancement recently. The detached execution mode is also covered. Welcome to discuss in this mail thread. [http://mail-archives.apache.org/mod_mbox/flink-dev/201812.mbox/%3ccaady7x5xym5qdb_tymz0c-aj9tv+93ebyhnjfj_-5fdmdqz...@mail.gmail.com%3E] > RemoteStreamEnvironment should be able to execute in detached mode > -- > > Key: FLINK-11255 > URL: https://issues.apache.org/jira/browse/FLINK-11255 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: Benjamin Lee >Priority: Minor > > h1. Overview > Currently RemoteStreamEnvironment is not capable of submitting a job via > detached mode. > h1. Proposed Changes > * Modify the signature of > {code:java} > StreamExecutionEnvironment#createRemoteEnvironment(...) : > StreamExecutionEnvironment{code} > to > {code:java} > StreamExecutionEnvironment#createRemoteEnvironment(...) : > RemoteStreamEnvironment{code} > * Add an public overloaded _execute_ method in _RemoteStreamEnvironment_ > that allows detached executions. > There was a related Jira FLINK-6224 opened a while ago, but I don't think it > makes sense to change the abstract class _StreamExecutionEnvironment_ since > detached submission only applies to _RemoteStreamEnvironment_? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11255) RemoteStreamEnvironment should be able to execute in detached mode
[ https://issues.apache.org/jira/browse/FLINK-11255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benjamin Lee updated FLINK-11255: - Summary: RemoteStreamEnvironment should be able to execute in detached mode (was: RemoteStreamEnvironment) > RemoteStreamEnvironment should be able to execute in detached mode > -- > > Key: FLINK-11255 > URL: https://issues.apache.org/jira/browse/FLINK-11255 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: Benjamin Lee >Priority: Minor > > h1. Overview > Currently RemoteStreamEnvironment is not capable of submitting a job via > detached mode. > h1. Proposed Changes > * Modify the signature of > {code:java} > StreamExecutionEnvironment#createRemoteEnvironment(...) : > StreamExecutionEnvironment{code} > to > {code:java} > StreamExecutionEnvironment#createRemoteEnvironment(...) : > RemoteStreamEnvironment{code} > * Add an public overloaded _execute_ method in _RemoteStreamEnvironment_ > that allows detached executions. > There was a related Jira FLINK-6224 opened a while ago, but I don't think it > makes sense to change the abstract class _StreamExecutionEnvironment_ since > detached submission only applies to _RemoteStreamEnvironment_? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11255) RemoteStreamEnvironment
Benjamin Lee created FLINK-11255: Summary: RemoteStreamEnvironment Key: FLINK-11255 URL: https://issues.apache.org/jira/browse/FLINK-11255 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.7.0 Reporter: Benjamin Lee h1. Overview Currently RemoteStreamEnvironment is not capable of submitting a job via detached mode. h1. Proposed Changes * Modify the signature of {code:java} StreamExecutionEnvironment#createRemoteEnvironment(...) : StreamExecutionEnvironment{code} to {code:java} StreamExecutionEnvironment#createRemoteEnvironment(...) : RemoteStreamEnvironment{code} * Add an public overloaded _execute_ method in _RemoteStreamEnvironment_ that allows detached executions. There was a related Jira FLINK-6224 opened a while ago, but I don't think it makes sense to change the abstract class _StreamExecutionEnvironment_ since detached submission only applies to _RemoteStreamEnvironment_? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11181) SimpleRecoveryITCaseBase test error
[ https://issues.apache.org/jira/browse/FLINK-11181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11181. --- Resolution: Fixed Fix Version/s: 1.8.0 1.7.2 Fixed in master : f6995f01ef748e4992a37defd60197d1ab4bd00e Fixed in release-1.7 : ee1ff4b85645b7616c73d3d84fc81741028ad770 > SimpleRecoveryITCaseBase test error > --- > > Key: FLINK-11181 > URL: https://issues.apache.org/jira/browse/FLINK-11181 > Project: Flink > Issue Type: Sub-task >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Run many times always fail. > at > org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.executeAndRunAssertions(SimpleRecoveryITCaseBase.java:124) > at > org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.testRestart(SimpleRecoveryITCaseBase.java:150) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11253) Incorrect way to stop yarn session described in yarn_setup document
[ https://issues.apache.org/jira/browse/FLINK-11253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11253: Assignee: leesf > Incorrect way to stop yarn session described in yarn_setup document > --- > > Key: FLINK-11253 > URL: https://issues.apache.org/jira/browse/FLINK-11253 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Yang >Assignee: leesf >Priority: Minor > > There are two ways to stop yarn session described in yarn_setup document: > {noformat} > Stop the YARN session by stopping the unix process (using CTRL+C) or by > entering ‘stop’ into the client. > {noformat} > But in fact, yarn session application still can run after stopping the unix > process (using CTRL+C). > We can either update the yarn_setup document to remove this incorrect way or > add ShutdownHook to stop yarn session in FlinkYarnSessionCli to make it > correct. > Looking forward to the feedbacks and would like to work on this ticket. > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11178) Check flink-test module for 1.7.1-rc2
[ https://issues.apache.org/jira/browse/FLINK-11178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11178. --- Resolution: Fixed All sub jiars closed. > Check flink-test module for 1.7.1-rc2 > - > > Key: FLINK-11178 > URL: https://issues.apache.org/jira/browse/FLINK-11178 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > Will create some sub JIRAs for flink-test test error.(branch tag: > release-1.7.1-rc2) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11254) Unify serialization format of savepoint for switching state backends
Congxian Qiu created FLINK-11254: Summary: Unify serialization format of savepoint for switching state backends Key: FLINK-11254 URL: https://issues.apache.org/jira/browse/FLINK-11254 Project: Flink Issue Type: Improvement Affects Versions: 1.7.1 Reporter: Congxian Qiu Assignee: Congxian Qiu For the current version, the serialization formats of savepoint between HeapKeyedStateBackend and RocksDBStateBackend are different, so we can not switch state backend when using savepoint. We should unify the serialization formats of the savepoint to support state backend switch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11253) Incorrect way to stop yarn session described in yarn_setup document
Tao Yang created FLINK-11253: Summary: Incorrect way to stop yarn session described in yarn_setup document Key: FLINK-11253 URL: https://issues.apache.org/jira/browse/FLINK-11253 Project: Flink Issue Type: Bug Components: Documentation Reporter: Tao Yang There are two ways to stop yarn session described in yarn_setup document: {noformat} Stop the YARN session by stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client. {noformat} But in fact, yarn session application still can run after stopping the unix process (using CTRL+C). We can either update the yarn_setup document to remove this incorrect way or add ShutdownHook to stop yarn session in FlinkYarnSessionCli to make it correct. Looking forward to the feedbacks and would like to work on this ticket. Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Matrix42 closed pull request #7287: [FLINK-11140][streaming] Fix empty child path check in Buckets
Matrix42 closed pull request #7287: [FLINK-11140][streaming] Fix empty child path check in Buckets URL: https://github.com/apache/flink/pull/7287 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index d08bc2ac0c3..736fd412d55 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -304,7 +304,11 @@ void close() { } private Path assembleBucketPath(BucketID bucketId) { - return new Path(basePath, bucketId.toString()); + String child = bucketId.toString(); + if ("".equals(child)) { + return basePath; + } + return new Path(basePath, child); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java index aee362178a7..81bf3fccf56 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; @@ -396,4 +397,28 @@ public String getBucketId(String element, BucketAssigner.Context context) { restoredBuckets.initializeState(bucketState, partCounterState); return restoredBuckets; } + + @Test + public void testAssembleBucketPath() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path basePath = new Path(outDir.toURI()); + + final RollingPolicy rollingPolicy = + DefaultRollingPolicy + .create() + .withMaxPartSize(7L) + .build(); + + Buckets buckets = new Buckets<>( + basePath, + new BasePathBucketAssigner<>(), + new DefaultBucketFactoryImpl<>(), + new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()), + rollingPolicy, + 0 + ); + long time = System.currentTimeMillis(); + //onElement will invoke Buckets#AssembleBucketPath + buckets.onElement("abc", new TestUtils.MockSinkContext(time, time, time)); + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732637#comment-16732637 ] Hequn Cheng edited comment on FLINK-11230 at 1/3/19 3:52 AM: - [~jiweiautohome] Have you change the code of Flink. It should be an EventTimeTrigger instead of a ContinuousEventTimeTrigger. Since you changed EventTimeTrigger to ContinuousEventTimeTrigger, you changed the behavior of window which against the window semantics. The result has been changed by you, i.e, the result contains more data than it should be. was (Author: hequn8128): [~jiweiautohome] Have you change the code of Flink. It should be an EventTimeTrigger instead of a ContinuousEventTimeTrigger. It is not a bug. Since you changed EventTimeTrigger to ContinuousEventTimeTrigger, you changed the behavior of window which against the window semantics. The result has been changed by you, i.e, the result contains more data than it should be. > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732637#comment-16732637 ] Hequn Cheng commented on FLINK-11230: - [~jiweiautohome] Have you change the code of Flink. It should be an EventTimeTrigger instead of a ContinuousEventTimeTrigger. It is not a bug. Since you changed EventTimeTrigger to ContinuousEventTimeTrigger, you changed the behavior of window which against the window semantics. The result has been changed by you, i.e, the result contains more data than it should be. > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11229) Support ScalarFunction for the where/filter clause.
[ https://issues.apache.org/jira/browse/FLINK-11229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang closed FLINK-11229. Resolution: Won't Do > Support ScalarFunction for the where/filter clause. > --- > > Key: FLINK-11229 > URL: https://issues.apache.org/jira/browse/FLINK-11229 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Currently, the where/filter clause only supports expressions, and we can > enhance its extensibility by letting it support scalar functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11229) Support ScalarFunction for the where/filter clause.
[ https://issues.apache.org/jira/browse/FLINK-11229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732631#comment-16732631 ] vinoyang commented on FLINK-11229: -- [~hequn8128] yes, you are right. > Support ScalarFunction for the where/filter clause. > --- > > Key: FLINK-11229 > URL: https://issues.apache.org/jira/browse/FLINK-11229 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Currently, the where/filter clause only supports expressions, and we can > enhance its extensibility by letting it support scalar functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732607#comment-16732607 ] jiwei commented on FLINK-11230: --- Hi,[~hequn8128].I didn't grouped by other fields along with the window.I use ContinuousEventTimeTrigger by default, so the result has multi rows. But it wouldn't change the result.The result is not what I want.I don't know if it is a bug. > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on issue #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error
sunjincheng121 commented on issue #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error URL: https://github.com/apache/flink/pull/7316#issuecomment-451041718 Thanks for the quick update! @hequn8128 Will be merged. Thanks, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on issue #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error
hequn8128 commented on issue #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error URL: https://github.com/apache/flink/pull/7316#issuecomment-451040746 @sunjincheng121 Thanks a lot for your review and suggestions. I have updated the code. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error
hequn8128 commented on a change in pull request #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error URL: https://github.com/apache/flink/pull/7316#discussion_r244902055 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java ## @@ -59,7 +60,7 @@ public void testFailedRunThenSuccessfulRun() throws Exception { env.generateSequence(1, 10) .rebalance() - .map(new FailingMapper1()) Review comment: Good suggestion! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11243) GroupBy udf() can not be select in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-11243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-11243. --- Resolution: Duplicate > GroupBy udf() can not be select in TableAPI > --- > > Key: FLINK-11243 > URL: https://issues.apache.org/jira/browse/FLINK-11243 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > > The group key can not be select in the follows TableAPI: > {code:java} > val result = order > .groupBy('o_time.substring(1, 16)) > .select('o_time.substring(1, 16), 'o_id.count){code} > Exception: > {code:java} > org.apache.flink.table.api.ValidationException: Cannot resolve field [o_time] > given input [('o_time).substring(1, 16), TMP_0]. > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156){code} > BTW, We can only use the following description to avoid the above problem. > {code:java} > val result = order > .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'key) > .groupBy('key) > .select('key, 'o_id.count) > {code} > But I think it is better to make `groupby udf()` can be select in TableAPI. > What do you think? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error
sunjincheng121 commented on a change in pull request #7316: [FLINK-11181][tests] Fix SimpleRecoveryFailureRateStrategyITBase test error URL: https://github.com/apache/flink/pull/7316#discussion_r244901140 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java ## @@ -59,7 +60,7 @@ public void testFailedRunThenSuccessfulRun() throws Exception { env.generateSequence(1, 10) .rebalance() - .map(new FailingMapper1()) Review comment: Add finally reset will work well. ``` finally { FailingMapper1.failuresBeforeSuccess = 1; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11239) Enhance SQL-Client to recursively list UDFs
[ https://issues.apache.org/jira/browse/FLINK-11239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11239: Assignee: vinoyang > Enhance SQL-Client to recursively list UDFs > --- > > Key: FLINK-11239 > URL: https://issues.apache.org/jira/browse/FLINK-11239 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Eron Wright >Assignee: vinoyang >Priority: Major > Fix For: 1.8.0 > > > The SQL Client provides a "SHOW FUNCTIONS" to show all registered functions. > Enhance it to show functions produced by an external catalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11249: Assignee: vinoyang > FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer > --- > > Key: FLINK-11249 > URL: https://issues.apache.org/jira/browse/FLINK-11249 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.7.0, 1.7.1 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Blocker > Fix For: 1.7.2, 1.8.0 > > > As reported by a user on the mailing list "How to migrate Kafka Producer ?" > (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to > {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka > producer versions/refactorings. > The issue is that {{ListState > FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using > java serializers and this is causing problems/collisions on > {{FlinkKafkaProducer011.NextTransactionalIdHint}} vs > {{FlinkKafkaProducer.NextTransactionalIdHint}}. > To fix that we probably need to release new versions of those classes, that > will rewrite/upgrade this state field to a new one, that doesn't relay on > java serialization. After this, we could drop the support for the old field > and that in turn will allow users to upgrade from 0.11 connector to the > universal one. > One bright side is that technically speaking our {{FlinkKafkaProducer011}} > has the same compatibility matrix as the universal one (it's also forward & > backward compatible with the same Kafka versions), so for the time being > users can stick to {{FlinkKafkaProducer011}}. > FYI [~tzulitai] [~yanghua] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
[ https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732558#comment-16732558 ] leesf commented on FLINK-9700: -- [~pnowojski], I will add it to the documentation soon. > Document FlinkKafkaProducer behaviour for Kafka versions > 0.11 > --- > > Key: FLINK-9700 > URL: https://issues.apache.org/jira/browse/FLINK-9700 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.0 >Reporter: Ufuk Celebi >Assignee: leesf >Priority: Minor > > FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API > limitations of the Kafka client. Using reflection breaks with newer versions > of the Kafka client (due to internal changes of the client). > The documentation does not mention newer Kafka versions. We should add the > following notes: > - Only package Kafka connector with kafka.version property set to 0.11.*.* > - Mention that it is possible to use the 0.11 connector with newer versions > of Kafka as the protocol seems to be backwards compatible (double check that > this is correct) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lamber-ken edited a comment on issue #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
lamber-ken edited a comment on issue #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#issuecomment-451015990 Hi, @zentol, I think just lazy init streamRecordWriters will solve the bug before, but the `StreamTaskCancellationBarrierTest#testEmitCancellationBarrierWhenNotReady` test failed caused the travis failure. So I checked what wrong. I find that because of the rework of Flink’s Network Stack from flink-1.5.0 version. before flink-1.5.0 version, the `CancelCheckpointMarker message` was send by `ResultPartitionWriter` after flink-1.5.0 version, the `CancelCheckpointMarker message` was uniformly broadcasted by `StreamRecordWriter` which inited in StreamTask's constructor. So, I need to rethink how to solve this problem, It would be nice if I could give me some advice. Best, lamber-ken This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken commented on issue #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
lamber-ken commented on issue #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#issuecomment-451015990 Hi, @zentol, I think just lazy init streamRecordWriters will solve the bug, but the `StreamTaskCancellationBarrierTest#testEmitCancellationBarrierWhenNotReady` test failed caused the travis failure. So I checked what wrong. I find that because of the rework of Flink’s Network Stack from flink-1.5.0 version. before flink-1.5.0 version, the `CancelCheckpointMarker message` was send by `ResultPartitionWriter` after flink-1.5.0 version, the `CancelCheckpointMarker message` was uniformly broadcasted by `StreamRecordWriter` which inited in StreamTask's constructor. So, I need to rethink how to solve this problem, It would be nice if I could give me some advice. Best, lamber-ken This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken opened a new pull request #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
lamber-ken opened a new pull request #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396 ## What is the purpose of the change begin flink-1.5.x version, streamRecordWriters was created in StreamTask's constructor, which start OutputFlusher daemon thread. so when task switched from DEPLOYING to CANCELING state, the daemon thread will be lacked. ## Brief change log lazy init streamRecordWriters, streamRecordWriters are created in invoke method ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken closed pull request #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
lamber-ken closed pull request #7396: [FLINK-11250][streaming] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 9ee8892cf95..ab6ad19137c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -177,7 +177,7 @@ /** Wrapper for synchronousCheckpointExceptionHandler to deal with rethrown exceptions. Used in the async part. */ private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler; - private final List>>> streamRecordWriters; + private List>>> streamRecordWriters; // @@ -209,7 +209,6 @@ protected StreamTask( this.timerService = timeProvider; this.configuration = new StreamConfig(getTaskConfiguration()); this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); - this.streamRecordWriters = createStreamRecordWriters(configuration, environment); } // @@ -264,6 +263,7 @@ public final void invoke() throws Exception { timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } + streamRecordWriters = createStreamRecordWriters(configuration, getEnvironment()); operatorChain = new OperatorChain<>(this, streamRecordWriters); headOperator = operatorChain.getHeadOperator(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT
pnowojski commented on a change in pull request #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT URL: https://github.com/apache/flink/pull/6445#discussion_r244787482 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala ## @@ -74,6 +74,238 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { "true") } + @Test + def testShiftLeft(): Unit = { +testAllApis( + 3.shiftLeft(3), + "3.shiftLeft(3)", + "SHIFTLEFT(3,3)", + "24" +) + +testAllApis( + 2147483647.shiftLeft(-2147483648), + "2147483647.shiftLeft(-2147483648)", + "SHIFTLEFT(2147483647,-2147483648)", + "2147483647" +) + +testAllApis( + -2147483648.shiftLeft(2147483647), + "-2147483648.shiftLeft(2147483647)", + "SHIFTLEFT(-2147483648,2147483647)", + "0" +) + +testAllApis( + 9223372036854775807L.shiftLeft(-2147483648), + "9223372036854775807L.shiftLeft(-2147483648)", + "SHIFTLEFT(9223372036854775807,-2147483648)", + "9223372036854775807" +) + +testAllApis( + 'f3.shiftLeft(5), + "f3.shiftLeft(5)", + "SHIFTLEFT(f3,5)", + "32" +) + +testAllApis( + 1.shiftLeft(Null(Types.INT)), + "1.shiftLeft(Null(INT))", + "SHIFTLEFT(1, CAST(NULL AS INT))", + "null" +) + +testAllApis( // test tinyint + 'f0.shiftLeft(20), + "f0.shiftLeft(20)", + "SHIFTLEFT(CAST(1 AS TINYINT), 20)", + "1048576" Review comment: This doesn't seem right or at least it seems weird: http://sqlfiddle.com/#!17/d9601/2/1 Why is this this `TINYINT` treated as some larger data type? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-11249: -- Fix Version/s: 1.7.2 > FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer > --- > > Key: FLINK-11249 > URL: https://issues.apache.org/jira/browse/FLINK-11249 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.7.0, 1.7.1 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.7.2, 1.8.0 > > > As reported by a user on the mailing list "How to migrate Kafka Producer ?" > (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to > {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka > producer versions/refactorings. > The issue is that {{ListState > FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using > java serializers and this is causing problems/collisions on > {{FlinkKafkaProducer011.NextTransactionalIdHint}} vs > {{FlinkKafkaProducer.NextTransactionalIdHint}}. > To fix that we probably need to release new versions of those classes, that > will rewrite/upgrade this state field to a new one, that doesn't relay on > java serialization. After this, we could drop the support for the old field > and that in turn will allow users to upgrade from 0.11 connector to the > universal one. > One bright side is that technically speaking our {{FlinkKafkaProducer011}} > has the same compatibility matrix as the universal one (it's also forward & > backward compatible with the same Kafka versions), so for the time being > users can stick to {{FlinkKafkaProducer011}}. > FYI [~tzulitai] [~yanghua] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732106#comment-16732106 ] Piotr Nowojski commented on FLINK-9717: --- What do you [~kisimple] mean by: > When considering about joins with bounded data, I would prefer the side input > solution in which there is no `MAX_WATERMARK` if I understand correctly. ? > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9717: -- Description: Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins (both normal and versioned joins) could flush the state from other side. This highly useful optimisation that would speed up versioned joins and would allow normal joins of large unbounded streams with bounded tables (for example some static data). edit: Currently problem is that Flink doesn't keep & restore the last previous watermark after restoring from checkpoint and this is hard to workaround. In other words, now we can easily "flush" one side of the join when we receive MAX_WATERMARK, but what should happen after restoring from checkpoint? There is no easy way to store the information that MAX_WATERMARK was previously reached. As far as I have thought about this, it can not be stored on the state of the Join operator and even if it could be done this way, it's probably not the proper/elegant solution. Probably the correct solution is to store MAX_WATERMARK in the state around watermark emitter/source operator and the last previously emitted watermark should be re-emitted when the job is restored. was: Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins (both normal and versioned joins) could flush the state from other side. This highly useful optimisation that would speed up versioned joins and would allow normal joins of large unbounded streams with bounded tables (for example some static data). > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). > edit: > Currently problem is that Flink doesn't keep & restore the last previous > watermark after restoring from checkpoint and this is hard to workaround. > In other words, now we can easily "flush" one side of the join when we > receive MAX_WATERMARK, but what should happen after restoring from > checkpoint? There is no easy way to store the information that MAX_WATERMARK > was previously reached. As far as I have thought about this, it can not be > stored on the state of the Join operator and even if it could be done this > way, it's probably not the proper/elegant solution. Probably the correct > solution is to store MAX_WATERMARK in the state around watermark > emitter/source operator and the last previously emitted watermark should be > re-emitted when the job is restored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10629) FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-10629. -- Resolution: Won't Fix Fix Version/s: (was: 1.7.2) This test is about to be removed for other reasons: https://issues.apache.org/jira/browse/FLINK-11042 so probably there is no point in investigating this failure. > FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify > failed on Travis > -- > > Key: FLINK-10629 > URL: https://issues.apache.org/jira/browse/FLINK-10629 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The > {{FlinkKafkaProducerITCase.testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify}} > failed on Travis. > https://api.travis-ci.org/v3/job/443777257/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tony810430 edited a comment on issue #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 edited a comment on issue #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#issuecomment-450877310 @zentol Have already addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tony810430 commented on issue #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 commented on issue #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#issuecomment-450877310 @zentol Have already addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tony810430 edited a comment on issue #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 edited a comment on issue #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#issuecomment-450877310 @zentol Have already addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11035) Notify data available to network stack immediately after finishing BufferBuilder
[ https://issues.apache.org/jira/browse/FLINK-11035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732072#comment-16732072 ] Piotr Nowojski commented on FLINK-11035: I would be in favour of postponing this discussion until we confirm that this indeed is a problem. As I wrote before, adding {{notifyBufferFinished}} would increase the number of notifications sent between the network threads and this is/can be quite costly. > Notify data available to network stack immediately after finishing > BufferBuilder > > > Key: FLINK-11035 > URL: https://issues.apache.org/jira/browse/FLINK-11035 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The data availability notification for network relies on whether there are > finished _BufferBuilder_ or flush triggered. If flush is not triggered and > the first _BufferBuilder_ enqueues into the subpartition, although this > _BufferBuilder_ is finished on _RecordWriter_ side, it has to rely on > enqueuing the second _BufferBuilder_ to trigger notification available. It > may bring some delays for transporting the finished _BufferBuilder_ in > network, especially there has a blocking operation for requesting the second > _BufferBuilder_ from pool. > Supposing there is only one available buffer in LocalBufferPool in extreme > scenarios, if the first _BufferBuilder_ is not transported and recycled, the > requesting for second _BufferBuilder_ will be blocked all the time. > I propose to add a _notifyBufferFinished_ method in _ResultPartitionWriter_ > interface, then _RecordWriter_ can notify via it after _BufferBuilder_ > finished_._ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
pnowojski commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r244741323 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: Ops, right 2nd option was pretty stupid on my part 😳 Isn't having separate optimised `BroadcastRecordWriter` our goal here and whole point of your interest in this area? From this perspective won't this if check: ``` public void emit(T record) throws IOException, InterruptedException { if (isBroadcastSelector) { broadcastEmit(record); } else { emit(record, channelSelector.selectChannels(record)); } } ``` become a dead code/disappear shortly? In that case indeed, maybe indeed the best way to solve this issue it so actually introduce simple `BroadcastRecordWriter` (which would extend `RecordWriter` and overwrite only one method ``` public class BroadcastRecordWriter extends RecordWriter { // some constructors... @Override public void emit(T record) throws IOException, InterruptedException { broadcastEmit(record, broadcastChannels); } } ``` The only tricky part would be that that would require us to create either some builder method (or even `RecordWriterBuilder` class) something like: ``` public static RecordWriter createRecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { // might return either BroadcastRecordWriter or RecordWriter based on passed channelSelector } ``` ? After that the next step would be to optimize `BroadcastRecordWriter`. What do you think? Does it make sense or have I missed something again 😳? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
pnowojski commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r244741323 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: Ops, right 2nd option was pretty stupid on my part 😳 Isn't having separate optimised `BroadcastRecordWriter` our goal here and whole point of your interest in this area? From this perspective won't this if check: ``` public void emit(T record) throws IOException, InterruptedException { if (isBroadcastSelector) { broadcastEmit(record); } else { emit(record, channelSelector.selectChannels(record)); } } ``` become a dead code/disappear shortly? In that case indeed, maybe indeed the best way to solve this issue it so actually introduce simple `BroadcastRecordWriter` (which would extend `RecordWriter` and overwrite only one method ``` @Override public void emit(T record) throws IOException, InterruptedException { broadcastEmit(record, broadcastChannels); } ``` The only tricky part would be that that would require us to create either some builder method (or even `RecordWriterBuilder` class) something like: ``` public static RecordWriter createRecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { // might return either BroadcastRecordWriter or RecordWriter based on passed channelSelector } ``` ? After that the next step would be to optimize `BroadcastRecordWriter`. What do you think? Does it make sense or have I missed something again 😳? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11252) Download page contains irrelevant "Scala 2.11" column
[ https://issues.apache.org/jira/browse/FLINK-11252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11252: --- Labels: pull-request-available (was: ) > Download page contains irrelevant "Scala 2.11" column > - > > Key: FLINK-11252 > URL: https://issues.apache.org/jira/browse/FLINK-11252 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Attachments: downloads.png > > > The download page has a "Scala 2.11" column, that was used in the past to > provide distinct download links for different scala versions. > We currently however list releases separately for each scala version. > We should either remove the column title or refactor the download page to > also have a "Scala 2.12" column. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
[ https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732057#comment-16732057 ] Piotr Nowojski edited comment on FLINK-9700 at 1/2/19 1:36 PM: --- Yes, it would be good to add this note to our compatibility matrix/table. [~xleesf] or [~yanghua] will one of you add it? Btw, it might be also a good idea to explicitly state in the documentation, that our 0.11 connector is also compatible with kafka versions >= 0.11 and not just with 0.11. was (Author: pnowojski): Yes, it would be good to add this note to our compatibility matrix/table. [~xleesf] or [~yanghua] will one of you add it? > Document FlinkKafkaProducer behaviour for Kafka versions > 0.11 > --- > > Key: FLINK-9700 > URL: https://issues.apache.org/jira/browse/FLINK-9700 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.0 >Reporter: Ufuk Celebi >Assignee: leesf >Priority: Minor > > FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API > limitations of the Kafka client. Using reflection breaks with newer versions > of the Kafka client (due to internal changes of the client). > The documentation does not mention newer Kafka versions. We should add the > following notes: > - Only package Kafka connector with kafka.version property set to 0.11.*.* > - Mention that it is possible to use the 0.11 connector with newer versions > of Kafka as the protocol seems to be backwards compatible (double check that > this is correct) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
[ https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732057#comment-16732057 ] Piotr Nowojski commented on FLINK-9700: --- Yes, it would be good to add this note to our compatibility matrix/table. [~xleesf] or [~yanghua] will one of you add it? > Document FlinkKafkaProducer behaviour for Kafka versions > 0.11 > --- > > Key: FLINK-9700 > URL: https://issues.apache.org/jira/browse/FLINK-9700 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.0 >Reporter: Ufuk Celebi >Assignee: leesf >Priority: Minor > > FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API > limitations of the Kafka client. Using reflection breaks with newer versions > of the Kafka client (due to internal changes of the client). > The documentation does not mention newer Kafka versions. We should add the > following notes: > - Only package Kafka connector with kafka.version property set to 0.11.*.* > - Mention that it is possible to use the 0.11 connector with newer versions > of Kafka as the protocol seems to be backwards compatible (double check that > this is correct) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8823) Add network profiling/diagnosing metrics.
[ https://issues.apache.org/jira/browse/FLINK-8823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732054#comment-16732054 ] Piotr Nowojski commented on FLINK-8823: --- [~kisimple] it's great that you are interested in this ticket! Before implementing could you describe what metrics would you like to add? > Add network profiling/diagnosing metrics. > - > > Key: FLINK-8823 > URL: https://issues.apache.org/jira/browse/FLINK-8823 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732053#comment-16732053 ] Piotr Nowojski commented on FLINK-11082: > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. I think that wouldn't be an issue, if floating vs exclusive were abstract concept. Like if we assign a floating credit to a channel (for example bumping number of assigned buffers from 2 up to 3), once we release ANY buffer from this channel and the number goes back down to "2", it could be interpreted/be an equivalent of saying that the channel released floating buffer and currently holds only to two exclusive buffers. As far as I remember, currently that's not the case, right? If we have assigned 2 exclusive buffers, then we assign one floating buffer, floating buffer is released only we process all three of those buffers, right? Maybe that's the root problem that we should fix? I think I vaguely remember this same behaviour causing some other problems. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] DreamsZM opened a new pull request #7399: Update IterateExample.java
DreamsZM opened a new pull request #7399: Update IterateExample.java URL: https://github.com/apache/flink/pull/7399 ##What is the purpose of the change Fix typos ##Verifying this change This change is a trivial rework / code cleanup without any test coverage. ##Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ##Documentation Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KarmaGYZ opened a new pull request #7400: [hotfix][docs] Update the conditions when Make state checkpointing As…
KarmaGYZ opened a new pull request #7400: [hotfix][docs] Update the conditions when Make state checkpointing As… URL: https://github.com/apache/flink/pull/7400 …ynchronous ## What is the purpose of the change According to [FLINK-6048](https://issues.apache.org/jira/browse/FLINK-6048), it seems that operator state could be snapshotted asynchronous. This PR update the conditions when we want to make state checkpointing asynchronous. ## Brief change log - This PR update the conditions when we want to make state checkpointing asynchronous. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts(all no): - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? no cc @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] DreamsZM closed pull request #7399: Update IterateExample.java
DreamsZM closed pull request #7399: Update IterateExample.java URL: https://github.com/apache/flink/pull/7399 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index d123615c7fc..13db58ae187 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -115,7 +115,7 @@ public static void main(String[] args) throws Exception { // * /** -* Generate BOUND number of random integer pairs from the range from 0 to BOUND/2. +* Generate BOUND number of random integer pairs from the range from 1 to BOUND/2. */ private static class RandomFibonacciSource implements SourceFunction> { private static final long serialVersionUID = 1L; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] DreamsZM opened a new pull request #7399: Update IterateExample.java
DreamsZM opened a new pull request #7399: Update IterateExample.java URL: https://github.com/apache/flink/pull/7399 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11252) Download page contains irrelevant "Scala 2.11" column
Chesnay Schepler created FLINK-11252: Summary: Download page contains irrelevant "Scala 2.11" column Key: FLINK-11252 URL: https://issues.apache.org/jira/browse/FLINK-11252 Project: Flink Issue Type: Bug Components: Project Website Reporter: Chesnay Schepler Assignee: Chesnay Schepler Attachments: downloads.png The download page has a "Scala 2.11" column, that was used in the past to provide distinct download links for different scala versions. We currently however list releases separately for each scala version. We should either remove the column title or refactor the download page to also have a "Scala 2.12" column. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11230) Sum of FlinkSql after two table union all.The value is too large.
[ https://issues.apache.org/jira/browse/FLINK-11230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16731981#comment-16731981 ] Hequn Cheng commented on FLINK-11230: - [~jiweiautohome] It is wired that the global window outputs multi rows for every minute. For example, there should be only one output for 201901021305 according to your sql. Have you grouped by other fields along with the window? > Sum of FlinkSql after two table union all.The value is too large. > - > > Key: FLINK-11230 > URL: https://issues.apache.org/jira/browse/FLINK-11230 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: jiwei >Priority: Blocker > Labels: test > Attachments: image-2019-01-02-14-18-33-890.png, > image-2019-01-02-14-18-43-710.png, screenshot-1.png > > > SELECT k AS KEY, SUM(p) AS pv > FROM ( > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test1 > GROUP BY tumble(stime, INTERVAL '1' minute) > UNION ALL > SELECT tumble_start(stime, INTERVAL '1' minute) AS k > , COUNT(*) AS p > FROM flink_test2 > GROUP BY tumble(stime, INTERVAL '1' minute) > ) t > GROUP BY k > The Result of executing this sql is about 7000 per minute and keeping > increasing.But the result is 60 per minute for per table.Is there an error in > my SQL statement? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11235) Elasticsearch connector leaks threads if no connection could be established
[ https://issues.apache.org/jira/browse/FLINK-11235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11235: - Affects Version/s: 1.8.0 1.6.3 > Elasticsearch connector leaks threads if no connection could be established > --- > > Key: FLINK-11235 > URL: https://issues.apache.org/jira/browse/FLINK-11235 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.3, 1.7.1, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > Attachments: image-2018-12-31-22-31-19-081.png, > image-2018-12-31-22-31-53-635.png > > Time Spent: 20m > Remaining Estimate: 0h > > h2. *elasticsearch transport sink init steps* > 1, create client thread > 2, try to check every host:port > 3, if each host:port is unreachable, while throw RuntimeException > but, because of throw RuntimeException{color:#FF}, the client can not > close{color}, so causing thread leak > h2. *transport client code* > ``` > TransportClient transportClient = {color:#80}new > {color}PreBuiltTransportClient(settings); > {color:#80}for {color}(TransportAddress transport : > ElasticsearchUtils.convertInetSocketAddresses({color:#660e7a}transportAddresses{color})) > { > transportClient.addTransportAddress(transport); > } > {color:#808080}// verify that we actually are connected to a cluster > {color}{color:#80}if {color}(transportClient.connectedNodes().isEmpty()) { > {color:#80}throw new > {color}RuntimeException({color:#008000}"Elasticsearch client is not connected > to any Elasticsearch nodes!"{color}); > } > {color:#80}return {color}transportClient; > } > ``` > h2. *thread leak* > *!image-2018-12-31-22-31-19-081.png!* > h2. *thread dump* > !image-2018-12-31-22-31-53-635.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11235) Elasticsearch connector leaks threads if no connection could be established
[ https://issues.apache.org/jira/browse/FLINK-11235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11235: - Summary: Elasticsearch connector leaks threads if no connection could be established (was: fix thread lack when elasticsearch transport client init failed) > Elasticsearch connector leaks threads if no connection could be established > --- > > Key: FLINK-11235 > URL: https://issues.apache.org/jira/browse/FLINK-11235 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > Attachments: image-2018-12-31-22-31-19-081.png, > image-2018-12-31-22-31-53-635.png > > Time Spent: 20m > Remaining Estimate: 0h > > h2. *elasticsearch transport sink init steps* > 1, create client thread > 2, try to check every host:port > 3, if each host:port is unreachable, while throw RuntimeException > but, because of throw RuntimeException{color:#FF}, the client can not > close{color}, so causing thread leak > h2. *transport client code* > ``` > TransportClient transportClient = {color:#80}new > {color}PreBuiltTransportClient(settings); > {color:#80}for {color}(TransportAddress transport : > ElasticsearchUtils.convertInetSocketAddresses({color:#660e7a}transportAddresses{color})) > { > transportClient.addTransportAddress(transport); > } > {color:#808080}// verify that we actually are connected to a cluster > {color}{color:#80}if {color}(transportClient.connectedNodes().isEmpty()) { > {color:#80}throw new > {color}RuntimeException({color:#008000}"Elasticsearch client is not connected > to any Elasticsearch nodes!"{color}); > } > {color:#80}return {color}transportClient; > } > ``` > h2. *thread leak* > *!image-2018-12-31-22-31-19-081.png!* > h2. *thread dump* > !image-2018-12-31-22-31-53-635.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11235) Elasticsearch connector leaks threads if no connection could be established
[ https://issues.apache.org/jira/browse/FLINK-11235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-11235. Resolution: Fixed master: 4febcdc289bc0dce1090d831c7f99848a26f6465 1.7: 5ac632cf63098bb5f1c3692fec59a89a48534ea5 1.6: 1702c9902c9e03e98473035bf82f4b07fffa3bb6 > Elasticsearch connector leaks threads if no connection could be established > --- > > Key: FLINK-11235 > URL: https://issues.apache.org/jira/browse/FLINK-11235 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.3, 1.7.1, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Attachments: image-2018-12-31-22-31-19-081.png, > image-2018-12-31-22-31-53-635.png > > Time Spent: 20m > Remaining Estimate: 0h > > h2. *elasticsearch transport sink init steps* > 1, create client thread > 2, try to check every host:port > 3, if each host:port is unreachable, while throw RuntimeException > but, because of throw RuntimeException{color:#FF}, the client can not > close{color}, so causing thread leak > h2. *transport client code* > ``` > TransportClient transportClient = {color:#80}new > {color}PreBuiltTransportClient(settings); > {color:#80}for {color}(TransportAddress transport : > ElasticsearchUtils.convertInetSocketAddresses({color:#660e7a}transportAddresses{color})) > { > transportClient.addTransportAddress(transport); > } > {color:#808080}// verify that we actually are connected to a cluster > {color}{color:#80}if {color}(transportClient.connectedNodes().isEmpty()) { > {color:#80}throw new > {color}RuntimeException({color:#008000}"Elasticsearch client is not connected > to any Elasticsearch nodes!"{color}); > } > {color:#80}return {color}transportClient; > } > ``` > h2. *thread leak* > *!image-2018-12-31-22-31-19-081.png!* > h2. *thread dump* > !image-2018-12-31-22-31-53-635.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11235) Elasticsearch connector leaks threads if no connection could be established
[ https://issues.apache.org/jira/browse/FLINK-11235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11235: - Fix Version/s: 1.8.0 1.6.4 > Elasticsearch connector leaks threads if no connection could be established > --- > > Key: FLINK-11235 > URL: https://issues.apache.org/jira/browse/FLINK-11235 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.3, 1.7.1, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Attachments: image-2018-12-31-22-31-19-081.png, > image-2018-12-31-22-31-53-635.png > > Time Spent: 20m > Remaining Estimate: 0h > > h2. *elasticsearch transport sink init steps* > 1, create client thread > 2, try to check every host:port > 3, if each host:port is unreachable, while throw RuntimeException > but, because of throw RuntimeException{color:#FF}, the client can not > close{color}, so causing thread leak > h2. *transport client code* > ``` > TransportClient transportClient = {color:#80}new > {color}PreBuiltTransportClient(settings); > {color:#80}for {color}(TransportAddress transport : > ElasticsearchUtils.convertInetSocketAddresses({color:#660e7a}transportAddresses{color})) > { > transportClient.addTransportAddress(transport); > } > {color:#808080}// verify that we actually are connected to a cluster > {color}{color:#80}if {color}(transportClient.connectedNodes().isEmpty()) { > {color:#80}throw new > {color}RuntimeException({color:#008000}"Elasticsearch client is not connected > to any Elasticsearch nodes!"{color}); > } > {color:#80}return {color}transportClient; > } > ``` > h2. *thread leak* > *!image-2018-12-31-22-31-19-081.png!* > h2. *thread dump* > !image-2018-12-31-22-31-53-635.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #7387: [FLINK-11235] fix thread lack when elasticsearch transport client init failed
zentol closed pull request #7387: [FLINK-11235] fix thread lack when elasticsearch transport client init failed URL: https://github.com/apache/flink/pull/7387 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java index 73a69ebde34..34f2ad3a49e 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.elasticsearch.action.bulk.BackoffPolicy; @@ -72,6 +73,10 @@ public TransportClient createClient(Map clientConfig) { // verify that we actually are connected to a cluster if (transportClient.connectedNodes().isEmpty()) { + + // close the transportClient here + IOUtils.closeQuietly(transportClient); + throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!"); } diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java index a3453ec4445..d59b8e9f912 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.elasticsearch.action.bulk.BackoffPolicy; @@ -78,6 +79,10 @@ public TransportClient createClient(Map clientConfig) { // verify that we actually are connected to a cluster if (transportClient.connectedNodes().isEmpty()) { + + // close the transportClient here + IOUtils.closeQuietly(transportClient); + throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!"); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10780) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7
[ https://issues.apache.org/jira/browse/FLINK-10780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10780. Resolution: Fixed master: 7bb63f68e29ac5cb5fbed8d33949e3302faf869c > Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7 > - > > Key: FLINK-10780 > URL: https://issues.apache.org/jira/browse/FLINK-10780 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Update Java / Scala {{StatefulJobWBroadcastStateMigrationITCase}} so that it > covers restoring from Flink 1.7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #7241: [FLINK-10780][tests] Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7
zentol closed pull request #7241: [FLINK-10780][tests] Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7 URL: https://github.com/apache/flink/pull/7241 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java index 3f49d84d595..da63b9b3096 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java @@ -65,7 +65,9 @@ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); } private final MigrationVersion testMigrateVersion; diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata new file mode 100644 index 000..8ef80ec4cfe Binary files /dev/null and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.7-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.7-savepoint/_metadata new file mode 100644 index 000..645f735a046 Binary files /dev/null and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.7-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata new file mode 100644 index 000..88cee1bf675 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata new file mode 100644 index 000..e4b058a45d7 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata differ diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala index 4f236e7c8ca..4d9af6fca3f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala @@ -56,13 +56,15 @@ object StatefulJobWBroadcastStateMigrationITCase { (MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), (MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (MigrationVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) + (MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) } // TODO to generate savepoints for a specific Flink version / backend type, // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, // TODO set as (Mi
[jira] [Closed] (FLINK-10781) Update BucketingSinkMigrationTest for Flink 1.7
[ https://issues.apache.org/jira/browse/FLINK-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10781. Resolution: Fixed master: 50f41965800f90fed5a6399b6172161c75b8c764 > Update BucketingSinkMigrationTest for Flink 1.7 > --- > > Key: FLINK-10781 > URL: https://issues.apache.org/jira/browse/FLINK-10781 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Update {{BucketingSinkMigrationTest}} so that it covers restoring from 1.7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10779) Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7
[ https://issues.apache.org/jira/browse/FLINK-10779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10779. Resolution: Fixed master: a4c09d337a0699f650740cc483e6ed30179ac7df > Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7 > --- > > Key: FLINK-10779 > URL: https://issues.apache.org/jira/browse/FLINK-10779 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Update Java / Scala {{StatefulJobSavepointMigrationITCase}} so that it covers > recovering from Flink 1.7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #7240: [FLINK-10779][tests] Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7
zentol closed pull request #7240: [FLINK-10779][tests] Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7 URL: https://github.com/apache/flink/pull/7240 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java index fd107058b8a..4f3d274a381 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java @@ -82,7 +82,9 @@ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); } private final MigrationVersion testMigrateVersion; diff --git a/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata new file mode 100644 index 000..4acb9a97718 Binary files /dev/null and b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.7-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.7-savepoint/_metadata new file mode 100644 index 000..c450c0c6130 Binary files /dev/null and b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.7-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata new file mode 100644 index 000..b421e64f31c Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata new file mode 100644 index 000..53e493cb78e Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata differ diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala index db9d5b40981..42db4e47264 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala @@ -58,7 +58,9 @@ object StatefulJobSavepointMigrationITCase { (MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), (MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (MigrationVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) + (MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) } // TODO to generate savepoints for a specific Flink version / backend type, This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol closed pull request #7242: [FLINK-10781][fs] Update BucketingSinkMigrationTest for Flink 1.7
zentol closed pull request #7242: [FLINK-10781][fs] Update BucketingSinkMigrationTest for Flink 1.7 URL: https://github.com/apache/flink/pull/7242 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java index 981a529ddd2..fb0796aec6c 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java @@ -90,7 +90,8 @@ public static void verifyOS() { Tuple2.of(MigrationVersion.v1_3, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95hgn/T/junit4273542175898623023/junit3801102997056424640/1970-01-01--01/part-0-"), Tuple2.of(MigrationVersion.v1_4, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95hgn/T/junit3198043255809479705/junit8947526563966405708/1970-01-01--01/part-0-"), Tuple2.of(MigrationVersion.v1_5, "/tmp/junit4927100426019463155/junit2465610012100182280/1970-01-01--00/part-0-"), - Tuple2.of(MigrationVersion.v1_6, "/tmp/junit3459711376354834545/junit5114611885650086135/1970-01-01--00/part-0-")); + Tuple2.of(MigrationVersion.v1_6, "/tmp/junit3459711376354834545/junit5114611885650086135/1970-01-01--00/part-0-"), + Tuple2.of(MigrationVersion.v1_7, "/var/folders/r2/tdhx810x7yxb7q9_brnp49x4gp/T/junit4288325607215628863/junit8132783417241536320/1970-01-01--08/part-0-")); } private final MigrationVersion testMigrateVersion; diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.7-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.7-snapshot new file mode 100644 index 000..9aeb1c28ee9 Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.7-snapshot differ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken commented on issue #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
lamber-ken commented on issue #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#issuecomment-450834240 @zentol, thanks for review. sorry maybe make a mistake. I'll check This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
zentol commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#discussion_r244706046 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ## @@ -216,6 +219,28 @@ public void testNameCollisionAfterKeyValueGroup() { assertFalse("Value is present in logical scope.", logicalScope.contains(value)); } + /** +* Verifies that calling {@link AbstractMetricGroup#getLogicalScope(CharacterFilter, char, int)} on {@link GenericValueMetricGroup} +* should ignore value as well. +*/ + @Test + public void testLogicalScopeShouldIgnoreValueGroupName() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); Review comment: correct This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tony810430 commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#discussion_r244705142 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ## @@ -216,6 +219,28 @@ public void testNameCollisionAfterKeyValueGroup() { assertFalse("Value is present in logical scope.", logicalScope.contains(value)); } + /** +* Verifies that calling {@link AbstractMetricGroup#getLogicalScope(CharacterFilter, char, int)} on {@link GenericValueMetricGroup} +* should ignore value as well. +*/ + @Test + public void testLogicalScopeShouldIgnoreValueGroupName() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); Review comment: Do you mean `registry` should be shutdown in `finally` clause? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fatjoshua commented on a change in pull request #7078: [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
fatjoshua commented on a change in pull request #7078: [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success URL: https://github.com/apache/flink/pull/7078#discussion_r244704879 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -361,7 +361,8 @@ public void onContainersAllocated(List containers) { "Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); - + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( Review comment: maybe we can move this removeContainerRequest before this for loop, use a indepedent for loop to do this remove staff. ```suggestion resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wuxizao removed a comment on issue #7345: [FLINK-11163][tests] Use random port in RestClusterClientTest
wuxizao removed a comment on issue #7345: [FLINK-11163][tests] Use random port in RestClusterClientTest URL: https://github.com/apache/flink/pull/7345#issuecomment-449330409 hi,flink on yarn mode,how to specify jobmanager http ip range(rest.port) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7378: [FLINK-11232][Webfrontend] Fix empty Start Time of sub-task on web da…
zentol commented on issue #7378: [FLINK-11232][Webfrontend] Fix empty Start Time of sub-task on web da… URL: https://github.com/apache/flink/pull/7378#issuecomment-450832313 We will have to duplicate the field instead under the new name, in case anyone wrote code against the bugged version. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11251) Incompatible metric name on prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-11251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11251: - Affects Version/s: (was: 1.7.0) > Incompatible metric name on prometheus reporter > --- > > Key: FLINK-11251 > URL: https://issues.apache.org/jira/browse/FLINK-11251 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.6, 1.6.3 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > > {code} > # HELP > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets > currentOffsets (scope: > taskmanager_job_task_operator_KafkaConsumer_topic_partition_4) > # TYPE > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets > gauge > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets{task_attempt_id="5137e35cf7319787f6cd627621fd2ea7",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="4",topic="rt_lookback_state",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="d7b1ad914351f9ee527267f51160",operator_id="d7b1ad914351f9ee527267f51160",operator_name="Source:_kafka_lookback_state_source",task_name="Source:_kafka_lookback_state_source",job_name="FlinkRuleMatchPipeline",subtask_index="7",} > 1.456090927E9 > # HELP > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets > committedOffsets (scope: > taskmanager_job_task_operator_KafkaConsumer_topic_partition_24) > # TYPE > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets > gauge > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets{task_attempt_id="9b666af68ec4734b25937b8b94cc5c84",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="24",topic="rt_event",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="61252f73469d3ffba207c548d29a0267",operator_id="61252f73469d3ffba207c548d29a0267",operator_name="Source:_kafka_source",task_name="Source:_kafka_sourcesamplingparse_and_filter",job_name="FlinkRuleMatchPipeline",subtask_index="27",} > 3.001186523E9 > {code} > This is a snippet from my flink prometheus reporter. It showed that kafka > current offsets and committed offsets metric names changed after I migrated > my flink job from 1.6.0 to 1.6.3. > The origin metrics name should not contain {{partition index}} in metric > name, i.e. the metric name should be > {{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_currentOffsets}} > and > {{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_committedOffsets}}. > After digging into the source code, I found that the incompatibility started > from this [PR|https://github.com/apache/flink/pull/7095], because it > overloaded a new {{getLogicalScope(CharacterFilter, char, int)}} and didn't > override in {{GenericValueMetricGroup}} class. > When the tail metric group from a metric is {{GenericValueMetricGroup}} and > this new {{getLogicalScope}} is called, i.e. calling > {{FrontMetricGroup#getLogicalScope}}, the value group name will not be > ignored, but it should be in previous released version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703662 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -748,10 +753,10 @@ private StateBackend createStateBackend() throws Exception { final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader()); return StateBackendLoader.fromApplicationOrConfigOrDefault( - fromApplication, - getEnvironment().getTaskManagerInfo().getConfiguration(), - getUserCodeClassLoader(), Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703698 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -1178,11 +1183,11 @@ public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, } private static StreamRecordWriter>> createStreamRecordWriter( - StreamEdge edge, - int outputIndex, - Environment environment, - String taskName, - long bufferTimeout) { + StreamEdge edge, Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703609 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -632,9 +637,9 @@ private boolean performCheckpoint( // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp(), - checkpointOptions); + checkpointMetaData.getCheckpointId(), Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703535 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -114,8 +114,8 @@ */ @Internal public abstract class StreamTask> - extends AbstractInvokable - implements AsyncExceptionHandler { Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703571 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -558,8 +563,8 @@ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, Checkpoi try { // No alignment if we inject a checkpoint CheckpointMetrics checkpointMetrics = new CheckpointMetrics() - .setBytesBufferedInAlignment(0L) - .setAlignmentDurationNanos(0L); + .setBytesBufferedInAlignment(0L) Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703693 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -1158,8 +1163,8 @@ public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, @VisibleForTesting public static List>>> createStreamRecordWriters( - StreamConfig configuration, - Environment environment) { + StreamConfig configuration, Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703673 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -1032,11 +1037,11 @@ public CloseableRegistry getCancelables() { private final Map operatorSnapshotsInProgress; public CheckpointingOperation( - StreamTask owner, - CheckpointMetaData checkpointMetaData, - CheckpointOptions checkpointOptions, - CheckpointStreamFactory checkpointStorageLocation, - CheckpointMetrics checkpointMetrics) { + StreamTask owner, + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointStreamFactory checkpointStorageLocation, Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703650 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -711,13 +716,13 @@ private void tryShutdownTimerService() { } private void checkpointState( - CheckpointMetaData checkpointMetaData, - CheckpointOptions checkpointOptions, - CheckpointMetrics checkpointMetrics) throws Exception { + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703600 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -610,9 +615,9 @@ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws } private boolean performCheckpoint( - CheckpointMetaData checkpointMetaData, - CheckpointOptions checkpointOptions, - CheckpointMetrics checkpointMetrics) throws Exception { Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703682 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -,10 +1116,10 @@ private void checkpointStreamOperator(StreamOperator op) throws Exception { if (null != op) { OperatorSnapshotFutures snapshotInProgress = op.snapshotState( - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp(), - checkpointOptions, - storageLocation); + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
zentol commented on a change in pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396#discussion_r244703587 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -578,16 +583,16 @@ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, Checkpoi @Override public void triggerCheckpointOnBarrier( - CheckpointMetaData checkpointMetaData, - CheckpointOptions checkpointOptions, - CheckpointMetrics checkpointMetrics) throws Exception { + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
zentol commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#discussion_r244702785 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ## @@ -216,6 +219,28 @@ public void testNameCollisionAfterKeyValueGroup() { assertFalse("Value is present in logical scope.", logicalScope.contains(value)); } + /** +* Verifies that calling {@link AbstractMetricGroup#getLogicalScope(CharacterFilter, char, int)} on {@link GenericValueMetricGroup} +* should ignore value as well. +*/ + @Test + public void testLogicalScopeShouldIgnoreValueGroupName() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); Review comment: should be wrapped in a try-catch block This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
zentol commented on a change in pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398#discussion_r244703027 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ## @@ -216,6 +219,28 @@ public void testNameCollisionAfterKeyValueGroup() { assertFalse("Value is present in logical scope.", logicalScope.contains(value)); } + /** +* Verifies that calling {@link AbstractMetricGroup#getLogicalScope(CharacterFilter, char, int)} on {@link GenericValueMetricGroup} +* should ignore value as well. +*/ + @Test + public void testLogicalScopeShouldIgnoreValueGroupName() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key = "key"; + String value = "value"; + + MetricGroup group = root.addGroup(key, value); + + String logicalScope = ((AbstractMetricGroup) group) + .getLogicalScope(new DummyCharacterFilter(), registry.getDelimiter(), 0); + assertTrue("Key is missing from logical scope.", logicalScope.contains(key)); Review comment: use hamcrest matchers instead: ``` assertThat(logicalScope, containsString(key)); assertThat(logicalScope, not(containsString(value))); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tony810430 opened a new pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 opened a new pull request #7398: [FLINK-11251] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7398 ## What is the purpose of the change This PR fixes an issue in the metric system where value from user-defined variable wasn't ignored in logic scope in some situations. As a result, some reporters using logic scope as metric name, like prometheus reporter, will get an incompatible metric name. ## Brief change log - override createLogicalScope in GenericMetricGroup ## Verifying this change This change added tests and can be verified as follows: - run `MetricGroupTest#testLogicalScopeShouldIgnoreValueGroupName` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11251) Incompatible metric name on prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-11251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11251: --- Labels: pull-request-available (was: ) > Incompatible metric name on prometheus reporter > --- > > Key: FLINK-11251 > URL: https://issues.apache.org/jira/browse/FLINK-11251 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.6, 1.6.3, 1.7.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > > {code} > # HELP > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets > currentOffsets (scope: > taskmanager_job_task_operator_KafkaConsumer_topic_partition_4) > # TYPE > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets > gauge > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets{task_attempt_id="5137e35cf7319787f6cd627621fd2ea7",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="4",topic="rt_lookback_state",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="d7b1ad914351f9ee527267f51160",operator_id="d7b1ad914351f9ee527267f51160",operator_name="Source:_kafka_lookback_state_source",task_name="Source:_kafka_lookback_state_source",job_name="FlinkRuleMatchPipeline",subtask_index="7",} > 1.456090927E9 > # HELP > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets > committedOffsets (scope: > taskmanager_job_task_operator_KafkaConsumer_topic_partition_24) > # TYPE > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets > gauge > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets{task_attempt_id="9b666af68ec4734b25937b8b94cc5c84",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="24",topic="rt_event",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="61252f73469d3ffba207c548d29a0267",operator_id="61252f73469d3ffba207c548d29a0267",operator_name="Source:_kafka_source",task_name="Source:_kafka_sourcesamplingparse_and_filter",job_name="FlinkRuleMatchPipeline",subtask_index="27",} > 3.001186523E9 > {code} > This is a snippet from my flink prometheus reporter. It showed that kafka > current offsets and committed offsets metric names changed after I migrated > my flink job from 1.6.0 to 1.6.3. > The origin metrics name should not contain {{partition index}} in metric > name, i.e. the metric name should be > {{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_currentOffsets}} > and > {{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_committedOffsets}}. > After digging into the source code, I found that the incompatibility started > from this [PR|https://github.com/apache/flink/pull/7095], because it > overloaded a new {{getLogicalScope(CharacterFilter, char, int)}} and didn't > override in {{GenericValueMetricGroup}} class. > When the tail metric group from a metric is {{GenericValueMetricGroup}} and > this new {{getLogicalScope}} is called, i.e. calling > {{FrontMetricGroup#getLogicalScope}}, the value group name will not be > ignored, but it should be in previous released version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11251) Incompatible metric name on prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-11251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11251: - Affects Version/s: 1.7.0 > Incompatible metric name on prometheus reporter > --- > > Key: FLINK-11251 > URL: https://issues.apache.org/jira/browse/FLINK-11251 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.6, 1.6.3, 1.7.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > > {code} > # HELP > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets > currentOffsets (scope: > taskmanager_job_task_operator_KafkaConsumer_topic_partition_4) > # TYPE > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets > gauge > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets{task_attempt_id="5137e35cf7319787f6cd627621fd2ea7",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="4",topic="rt_lookback_state",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="d7b1ad914351f9ee527267f51160",operator_id="d7b1ad914351f9ee527267f51160",operator_name="Source:_kafka_lookback_state_source",task_name="Source:_kafka_lookback_state_source",job_name="FlinkRuleMatchPipeline",subtask_index="7",} > 1.456090927E9 > # HELP > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets > committedOffsets (scope: > taskmanager_job_task_operator_KafkaConsumer_topic_partition_24) > # TYPE > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets > gauge > flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets{task_attempt_id="9b666af68ec4734b25937b8b94cc5c84",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="24",topic="rt_event",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="61252f73469d3ffba207c548d29a0267",operator_id="61252f73469d3ffba207c548d29a0267",operator_name="Source:_kafka_source",task_name="Source:_kafka_sourcesamplingparse_and_filter",job_name="FlinkRuleMatchPipeline",subtask_index="27",} > 3.001186523E9 > {code} > This is a snippet from my flink prometheus reporter. It showed that kafka > current offsets and committed offsets metric names changed after I migrated > my flink job from 1.6.0 to 1.6.3. > The origin metrics name should not contain {{partition index}} in metric > name, i.e. the metric name should be > {{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_currentOffsets}} > and > {{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_committedOffsets}}. > After digging into the source code, I found that the incompatibility started > from this [PR|https://github.com/apache/flink/pull/7095], because it > overloaded a new {{getLogicalScope(CharacterFilter, char, int)}} and didn't > override in {{GenericValueMetricGroup}} class. > When the tail metric group from a metric is {{GenericValueMetricGroup}} and > this new {{getLogicalScope}} is called, i.e. calling > {{FrontMetricGroup#getLogicalScope}}, the value group name will not be > ignored, but it should be in previous released version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tony810430 closed pull request #7397: [FLINK-10857] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 closed pull request #7397: [FLINK-10857] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7397 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 4400b14a10d..4b70986a00f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -176,7 +176,7 @@ String getLogicalScope(CharacterFilter filter, char delimiter, int reporterIndex } } - private String createLogicalScope(CharacterFilter filter, char delimiter) { + protected String createLogicalScope(CharacterFilter filter, char delimiter) { final String groupName = getGroupName(filter); return parent == null ? groupName diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java index ef8e6e8bee8..41e9a91d386 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java @@ -51,7 +51,7 @@ protected void putVariables(Map variables) { } @Override - public String getLogicalScope(CharacterFilter filter, char delimiter) { + protected String createLogicalScope(CharacterFilter filter, char delimiter) { return parent.getLogicalScope(filter, delimiter); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 71ae7f18ecb..9daa72d3610 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; @@ -31,6 +33,7 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; +import org.apache.flink.runtime.metrics.util.TestReporter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; @@ -216,6 +219,28 @@ public void testNameCollisionAfterKeyValueGroup() { assertFalse("Value is present in logical scope.", logicalScope.contains(value)); } + /** +* Verifies that calling {@link AbstractMetricGroup#getLogicalScope(CharacterFilter, char, int)} on {@link GenericValueMetricGroup} +* should ignore value as well. +*/ + @Test + public void testLogicalScopeShouldIgnoreValueGroupName() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root"); + + String key = "key"; + String value = "value"; + + MetricGroup group = root.addGroup(key, value); + + String logicalScope = ((AbstractMetricGroup) group) + .getLogicalScope(new DummyCharacterFilter(), registry.getDelimiter(), 0); + assertTrue("Key is missing from logical scope.", logicalScope.contains(key)); + assertFalse("Value is present in logical scope.", logicalScope.contains(value)); + } + @Test public void closedGroupDoesNotRegisterMetrics() { GenericMetricGroup group = new GenericMetricGroup( This is an automated message from the Apache Git
[jira] [Updated] (FLINK-11250) fix thread lack when StreamTask switched from DEPLOYING to CANCELING
[ https://issues.apache.org/jira/browse/FLINK-11250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11250: --- Labels: pull-request-available (was: ) > fix thread lack when StreamTask switched from DEPLOYING to CANCELING > > > Key: FLINK-11250 > URL: https://issues.apache.org/jira/browse/FLINK-11250 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Streaming >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > > begin flink-1.5.x version, streamRecordWriters was created in StreamTask's > constructor, which start OutputFlusher daemon thread. so when task switched > from DEPLOYING to CANCELING state, the daemon thread will be lacked. > > *reproducible example* > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000); > env > .addSource(new SourceFunction() { > @Override > public void run(SourceContext ctx) throws Exception { > for (int i = 0; i < 1; i++) { > Thread.sleep(100); > ctx.collect("data " + i); > } > } > @Override > public void cancel() { > } > }) > .addSink(new RichSinkFunction() { > @Override > public void open(Configuration parameters) throws Exception { > System.out.println(1 / 0); > } > @Override > public void invoke(String value, Context context) throws > Exception { > } > }).setParallelism(2); > env.execute(); > }{code} > *some useful log* > {code:java} > 2019-01-02 03:03:47.525 [thread==> jobmanager-future-thread-2] > executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) > (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to SCHEDULED. > 2019-01-02 03:03:47.526 [thread==> flink-akka.actor.default-dispatcher-5] > slotpool.SlotPool#allocateSlot:326 Received slot request > [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] for task: Attempt #1 > (Source: Custom Source (1/1)) @ (unassigned) - [SCHEDULED] > 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] > slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot > [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] in slot > [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}]. > 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] > slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create > single task slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] in multi > task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group > bc764cd8ddf7a0cff126f51c16239658. > 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] > slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create > single task slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] in multi > task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group > 0a448493b4782967b150582570326227. > 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] > slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot > [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] in slot > [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}]. > 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] > slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create > single task slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] in multi > task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] for group > 0a448493b4782967b150582570326227. > 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] > executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) > (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from SCHEDULED to DEPLOYING. > 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] > executiongraph.Execution#deploy:576 Deploying Source: Custom Source (1/1) > (attempt #1) to localhost > 2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] > state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:162 > Registered new local state store with configuration > LocalRecoveryConfig{localRecoveryMode=false, > localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories
[GitHub] tony810430 opened a new pull request #7397: [FLINK-10857] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope
tony810430 opened a new pull request #7397: [FLINK-10857] [metrics] GenericValueMetricGroup should always ignore its group name in logical scope URL: https://github.com/apache/flink/pull/7397 ## What is the purpose of the change This PR fixes an issue in the metric system where value from user-defined variable wasn't ignored in logic scope in some situations. As a result, some reporters using logic scope as metric name, like prometheus reporter, will get an incompatible metric name. ## Brief change log - override createLogicalScope in GenericMetricGroup ## Verifying this change This change added tests and can be verified as follows: - run `MetricGroupTest#testLogicalScopeShouldIgnoreValueGroupName` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken opened a new pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING
lamber-ken opened a new pull request #7396: [FLINK-11250] fix thread lack when StreamTask switched from DEPLOYING to CANCELING URL: https://github.com/apache/flink/pull/7396 ## What is the purpose of the change begin flink-1.5.x version, streamRecordWriters was created in StreamTask's constructor, which start OutputFlusher daemon thread. so when task switched from DEPLOYING to CANCELING state, the daemon thread will be lacked. ## Brief change log lazy init streamRecordWriters, streamRecordWriters are created in invoke method ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services