[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496174#comment-16496174 ] ASF GitHub Bot commented on FLINK-7789: --- Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r192001515 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { --- End diff -- Updated as suggested, plz have a look :) > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r192001515 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { --- End diff -- Updated as suggested, plz have a look :) ---
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136 ] yanxiaobin edited comment on FLINK-8500 at 5/31/18 5:42 AM: [~tzulitai] thanks. That is great. was (Author: backlight): [~tzulitai] thanks, greate. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136 ] yanxiaobin commented on FLINK-8500: --- [~tzulitai] thanks, greate. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator
[ https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496092#comment-16496092 ] ASF GitHub Bot commented on FLINK-9476: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6104 @bowenli86 thanks for review, i have fix the error according to the comment and add the unit test in CEPITCase, please help review it again. cc @kl0u > Lost sideOutPut Late Elements in CEP Operator > - > > Key: FLINK-9476 > URL: https://issues.apache.org/jira/browse/FLINK-9476 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6104 @bowenli86 thanks for review, i have fix the error according to the comment and add the unit test in CEPITCase, please help review it again. cc @kl0u ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496068#comment-16496068 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: [~Backlight] Yes, we're currently going for a minimal incremental fix for now, so that at least we have a way around the problem. In the long run, the ideal approach is discussed in the comments of this PR: [https://github.com/apache/flink/pull/5958.] Given that [~FredTing]'s latest PR is a "short-term solution", I think it is definitely ok to target that for a merge to the 1.5.x series. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496006#comment-16496006 ] yanxiaobin commented on FLINK-8500: --- So far, it can solve the current problems, but in the long run, there will still be some limitations on the support of future Kafka version. By the way, can we fix this problem in the 1.5.x series? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7525) Add config option to disable Cancel functionality on UI
[ https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441630#comment-16441630 ] Ted Yu edited comment on FLINK-7525 at 5/31/18 1:41 AM: Hopefully FLIP-6 would be released soon . was (Author: yuzhih...@gmail.com): Hopefully FLIP-6 would be released soon. > Add config option to disable Cancel functionality on UI > --- > > Key: FLINK-7525 > URL: https://issues.apache.org/jira/browse/FLINK-7525 > Project: Flink > Issue Type: Improvement > Components: Web Client, Webfrontend >Reporter: Ted Yu >Priority: Major > > In this email thread > http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI > , Raja was asking for a way to control how users cancel Job(s). > Robert proposed adding a config option which disables the Cancel > functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9150) Prepare for Java 10
[ https://issues.apache.org/jira/browse/FLINK-9150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9150: -- Component/s: Build System > Prepare for Java 10 > --- > > Key: FLINK-9150 > URL: https://issues.apache.org/jira/browse/FLINK-9150 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Ted Yu >Priority: Major > > Java 9 is not a LTS release. > When compiling with Java 10, I see the following compilation error: > {code} > [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not > resolve dependencies for project > org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find > artifact jdk.tools:jdk.tools:jar:1.6 at specified path > /a/jdk-10/../lib/tools.jar -> [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7689) Instrument the Flink JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-7689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495654#comment-16495654 ] ASF GitHub Bot commented on FLINK-7689: --- Github user pabloem commented on the issue: https://github.com/apache/flink/pull/4725 @fhueske @asicoe is this PR still current / ongoing? I'm willing to driving it to the end if there's anything left to do.. : ) - Or perhaps it's almost ready to merge? > Instrument the Flink JDBC sink > -- > > Key: FLINK-7689 > URL: https://issues.apache.org/jira/browse/FLINK-7689 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.4.0 >Reporter: Martin Eden >Priority: Minor > Labels: jdbc, metrics > Original Estimate: 24h > Remaining Estimate: 24h > > As confirmed by the Flink community in the following mailing list > [message|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/metrics-for-Flink-sinks-td15200.html] > using off the shelf Flink sinks like the JDBC sink, Redis sink or Cassandra > sink etc does not expose any sink specific metrics. > The purpose of this ticket is to add some relevant metrics to the > JDBCOutputFormat: > - Meters for when a flush is made. > - Histograms for the jdbc batch count and batch execution latency. > These would allow deeper understanding of the runtime behaviour of > performance critical jobs writing to external databases using this generic > interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4725: [FLINK-7689] [Streaming Connectors] Added metrics to JDBC...
Github user pabloem commented on the issue: https://github.com/apache/flink/pull/4725 @fhueske @asicoe is this PR still current / ongoing? I'm willing to driving it to the end if there's anything left to do.. : ) - Or perhaps it's almost ready to merge? ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495589#comment-16495589 ] ASF GitHub Bot commented on FLINK-8500: --- GitHub user FredTing opened a pull request: https://github.com/apache/flink/pull/6105 [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer ## What is the purpose of the change This pull request make the Kafka timestamp and timestampType available in the message deserialisation so one can use it in the business logic processing. ## Brief change log - *added new default method with Timestamp/TimestampType parameters to the interface `KeyedDeserializationSchema` ## Verifying this change This change is already covered by existing tests, such as Kafka Consumer tests and JSONKeyValueDeserializationSchemaTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / no / **don't know**) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/FredTing/flink FLINK-8500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6105.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6105 commit a214481f6722e75a298d87333e4981cb87b9c2b9 Author: Fred Teunissen Date: 2018-05-20T20:45:29Z [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6105: [FLINK-8500] Get the timestamp of the Kafka messag...
GitHub user FredTing opened a pull request: https://github.com/apache/flink/pull/6105 [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer ## What is the purpose of the change This pull request make the Kafka timestamp and timestampType available in the message deserialisation so one can use it in the business logic processing. ## Brief change log - *added new default method with Timestamp/TimestampType parameters to the interface `KeyedDeserializationSchema` ## Verifying this change This change is already covered by existing tests, such as Kafka Consumer tests and JSONKeyValueDeserializationSchemaTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / no / **don't know**) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/FredTing/flink FLINK-8500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6105.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6105 commit a214481f6722e75a298d87333e4981cb87b9c2b9 Author: Fred Teunissen Date: 2018-05-20T20:45:29Z [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer ---
[jira] [Commented] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF
[ https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495510#comment-16495510 ] Rong Rong commented on FLINK-9430: -- Hi [~suez1224], I think [~twalthr] and I had this discussion in a separated doc regrading generic type inference in UDF: https://docs.google.com/document/d/1zKSY1z0lvtQdfOgwcLnCMSRHew3weeJ6QfQjSD0zWas/edit#heading=h.64s92ad5mb1. I am not exactly sure but maybe can we add an ITCase to this JIRA? I think at some point the Object is serialized using Kyro and will significantly reduce the performance. > Support Casting of Object to Primitive types for Flink SQL UDF > -- > > Key: FLINK-9430 > URL: https://issues.apache.org/jira/browse/FLINK-9430 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We want to add a SQL UDF to access specific element in a JSON string using > JSON path. However, the JSON element can be of different types, e.g. Int, > Float, Double, String, Boolean and etc.. Since return type is not part of the > method signature, we can not use overload. So we will end up writing a UDF > for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a > lot of duplication. > One way to unify all these UDF functions is to implement one UDF and return > java.lang.Object, and in the SQL statement, use CAST AS to cast the returned > Object into the correct type. Below is an example: > > {code:java} > object JsonPathUDF extends ScalarFunction { > def eval(jsonStr: String, path: String): Object = { >JSONParser.parse(jsonStr).read(path) > } > }{code} > {code:java} > SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as > bookTitle FROM table1{code} > The current Flink SQL cast implementation does not support casting from > GenericTypeInfo to another type, I have already got a local > branch to fix this. Please comment if there are alternatives to the problem > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator
[ https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495490#comment-16495490 ] ASF GitHub Bot commented on FLINK-9476: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r191857843 --- Diff: docs/dev/libs/cep.md --- @@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed matters. To guarantee that el To guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed. +seen watermark. Late elements are not further processed. Also, you can specify a sideOutput tag to collect the late elements come after the last seen watermark, you can use it like this. + + + + +{% highlight java %} +PatternStream patternStream = CEP.pattern(input, pattern); + +OutputTag lataDataOutputTag = new OutputTag("lata-data""){}; --- End diff -- typo: "lateDataOutputTag" and "late-data" > Lost sideOutPut Late Elements in CEP Operator > - > > Key: FLINK-9476 > URL: https://issues.apache.org/jira/browse/FLINK-9476 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator
[ https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495491#comment-16495491 ] ASF GitHub Bot commented on FLINK-9476: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r191858415 --- Diff: docs/dev/libs/cep.md --- @@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed matters. To guarantee that el To guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed. +seen watermark. Late elements are not further processed. Also, you can specify a sideOutput tag to collect the late elements come after the last seen watermark, you can use it like this. + + + + +{% highlight java %} +PatternStream patternStream = CEP.pattern(input, pattern); + +OutputTag lataDataOutputTag = new OutputTag("lata-data""){}; + +OutputTag outputTag = new OutputTag("side-output""){}; + +SingleOutputStreamOperator result = patternStream +.sideOutputLateData(lataDataOutputTag) +.select( +new PatternTimeoutFunction() {...}, +outputTag, +new PatternSelectFunction() {...} +); + +DataStream lataData = result.getSideOutput(lataDataOutputTag); --- End diff -- typo: lateData > Lost sideOutPut Late Elements in CEP Operator > - > > Key: FLINK-9476 > URL: https://issues.apache.org/jira/browse/FLINK-9476 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r191858415 --- Diff: docs/dev/libs/cep.md --- @@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed matters. To guarantee that el To guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed. +seen watermark. Late elements are not further processed. Also, you can specify a sideOutput tag to collect the late elements come after the last seen watermark, you can use it like this. + + + + +{% highlight java %} +PatternStream patternStream = CEP.pattern(input, pattern); + +OutputTag lataDataOutputTag = new OutputTag("lata-data""){}; + +OutputTag outputTag = new OutputTag("side-output""){}; + +SingleOutputStreamOperator result = patternStream +.sideOutputLateData(lataDataOutputTag) +.select( +new PatternTimeoutFunction() {...}, +outputTag, +new PatternSelectFunction() {...} +); + +DataStream lataData = result.getSideOutput(lataDataOutputTag); --- End diff -- typo: lateData ---
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r191857843 --- Diff: docs/dev/libs/cep.md --- @@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed matters. To guarantee that el To guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed. +seen watermark. Late elements are not further processed. Also, you can specify a sideOutput tag to collect the late elements come after the last seen watermark, you can use it like this. + + + + +{% highlight java %} +PatternStream patternStream = CEP.pattern(input, pattern); + +OutputTag lataDataOutputTag = new OutputTag("lata-data""){}; --- End diff -- typo: "lateDataOutputTag" and "late-data" ---
[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units
[ https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495470#comment-16495470 ] ASF GitHub Bot commented on FLINK-6469: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5448 Will try and take a look at this soon... Sorry for the delay. What I would consider very important is that users who don't change their configuration do not get different behavior all of a sudden. Meaning in the absence of a "unit" we do not always interpret the value as a "byte" but as whatever the config value was measured in before (such as MBs, ...). > Configure Memory Sizes with units > - > > Key: FLINK-6469 > URL: https://issues.apache.org/jira/browse/FLINK-6469 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > Currently, memory sizes are configured by pure numbers, the interpretation is > different from configuration parameter to parameter. > For example, heap sizes are configured in megabytes, network buffer memory is > configured in bytes, alignment thresholds are configured in bytes. > I propose to configure all memory parameters the same way, with units similar > to time. The JVM itself configured heap size similarly: {{Xmx5g}} or > {{Xmx2000m}}. > {code} > 1 -> bytes > 10 kb > 64 mb > 1 gb > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5448 Will try and take a look at this soon... Sorry for the delay. What I would consider very important is that users who don't change their configuration do not get different behavior all of a sudden. Meaning in the absence of a "unit" we do not always interpret the value as a "byte" but as whatever the config value was measured in before (such as MBs, ...). ---
[jira] [Commented] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
[ https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495448#comment-16495448 ] ASF GitHub Bot commented on FLINK-8873: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5649 When I was developing KeyedProcessFunction, I initially wondered why there's no tests for KeyedStream, and researched and realized that they were actually mixed with DataStream tests. I think having that clarity by separating those tests would be great. Well, I also agree it doesn't hurt that much to keep them as-is. If you feel strongly against it, I can close this PR > move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest > - > > Key: FLINK-8873 > URL: https://issues.apache.org/jira/browse/FLINK-8873 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Tests >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.6.0 > > > move unit tests of KeyedStream.scala from DataStreamTest.scala to > KeyedStreamTest.scala, in order to have clearer separation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5649 When I was developing KeyedProcessFunction, I initially wondered why there's no tests for KeyedStream, and researched and realized that they were actually mixed with DataStream tests. I think having that clarity by separating those tests would be great. Well, I also agree it doesn't hurt that much to keep them as-is. If you feel strongly against it, I can close this PR ---
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495400#comment-16495400 ] ASF GitHub Bot commented on FLINK-7789: --- Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191835482 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala --- @@ -71,6 +71,9 @@ object AsyncDataStream { override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture)) } + override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { --- End diff -- I haven't found any tests for `AsyncDataStream.scala` or `AsyncFunction.scala`, I am not sure whether it is missing or unnecessary. What do you think? > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191835482 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala --- @@ -71,6 +71,9 @@ object AsyncDataStream { override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture)) } + override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { --- End diff -- I haven't found any tests for `AsyncDataStream.scala` or `AsyncFunction.scala`, I am not sure whether it is missing or unnecessary. What do you think? ---
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495395#comment-16495395 ] ASF GitHub Bot commented on FLINK-7789: --- Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191834614 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { --- End diff -- Good point :) > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191834614 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { --- End diff -- Good point :) ---
[jira] [Commented] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator
[ https://issues.apache.org/jira/browse/FLINK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495355#comment-16495355 ] ASF GitHub Bot commented on FLINK-9476: --- GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6104 [FLINK-9476]Emit late elements in CEP as sideOutPut Now, when use with Eventtime in CEP library, elements come later than watermark will be dropped,we can put it in side Output with outPutTag You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-9476 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6104 commit c0b04fadacd3e9f3f403b3adfdbadf8d4aac79e4 Author: minwenjun Date: 2018-05-30T15:32:15Z Add loseDataOutputTag in cep deal with event time dropped data commit 373e376fc182c32fe69765aa564e93057954ff44 Author: minwenjun Date: 2018-05-30T15:50:01Z add scala api > Lost sideOutPut Late Elements in CEP Operator > - > > Key: FLINK-9476 > URL: https://issues.apache.org/jira/browse/FLINK-9476 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6104 [FLINK-9476]Emit late elements in CEP as sideOutPut Now, when use with Eventtime in CEP library, elements come later than watermark will be dropped,we can put it in side Output with outPutTag You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-9476 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6104 commit c0b04fadacd3e9f3f403b3adfdbadf8d4aac79e4 Author: minwenjun Date: 2018-05-30T15:32:15Z Add loseDataOutputTag in cep deal with event time dropped data commit 373e376fc182c32fe69765aa564e93057954ff44 Author: minwenjun Date: 2018-05-30T15:50:01Z add scala api ---
[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495353#comment-16495353 ] ASF GitHub Bot commented on FLINK-7836: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5593 > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5593: [FLINK-7836][Client] specifying node label for fli...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5593 ---
[jira] [Resolved] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7836. -- Resolution: Fixed Fix Version/s: 1.6.0 Fixed via b6448015f485b5051e5e920b82b4b3ee71c55974 > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-5789: - Assignee: Kostas Kloudas > Make Bucketing Sink independent of Hadoop's FileSystem > -- > > Key: FLINK-5789 > URL: https://issues.apache.org/jira/browse/FLINK-5789 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0, 1.1.4 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.6.0 > > > The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's > file system abstraction. > This causes several issues: > - The bucketing sink will behave different than other file sinks with > respect to configuration > - Directly supported file systems (not through hadoop) like the MapR File > System does not work in the same way with the BuketingSink as other file > systems > - The previous point is all the more problematic in the effort to make > Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, > AWS, GCE, Azure) with ideally no Hadoop dependency. > We should port the {{BucketingSink}} to use Flink's FileSystem classes. > To support the *truncate* functionality that is needed for the exactly-once > semantics of the Bucketing Sink, we should extend Flink's FileSystem > abstraction to have the methods > - {{boolean supportsTruncate()}} > - {{void truncate(Path, long)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long
[ https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495303#comment-16495303 ] ASF GitHub Bot commented on FLINK-9413: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6103#discussion_r191814673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java --- @@ -103,9 +103,7 @@ public String toString() { // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && (producerState == ExecutionState.RUNNING || - producerState == ExecutionState.FINISHED || - producerState == ExecutionState.SCHEDULED || - producerState == ExecutionState.DEPLOYING)) { --- End diff -- Thank you till. But where is legacy code ? I found a lot of code belongs to legacy mode.omgI do not think I should remove all legacy mode code... Could you tell me please ? Lol. > Tasks can fail with PartitionNotFoundException if consumer deployment takes > too long > > > Key: FLINK-9413 > URL: https://issues.apache.org/jira/browse/FLINK-9413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0, 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Critical > > {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of > the producer takes too long. More specifically, if it takes longer than the > {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up > and fail. > The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a > consuming task once the producer has been assigned a slot but we do not wait > until it is actually running. The problem should be fixed if we wait until > the task is in state {{RUNNING}} before assigning the result partition to the > consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6103#discussion_r191814673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java --- @@ -103,9 +103,7 @@ public String toString() { // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && (producerState == ExecutionState.RUNNING || - producerState == ExecutionState.FINISHED || - producerState == ExecutionState.SCHEDULED || - producerState == ExecutionState.DEPLOYING)) { --- End diff -- Thank you till. But where is legacy code ? I found a lot of code belongs to legacy mode.omgI do not think I should remove all legacy mode code... Could you tell me please ? Lol. ---
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495255#comment-16495255 ] ASF GitHub Bot commented on FLINK-9444: --- Github user tragicjun commented on the issue: https://github.com/apache/flink/pull/6082 I encountered another exception working with string type in Avro map/array, any advice whether I should open a separate issue or just reusing this one. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API & SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Priority: Blocker > Labels: patch > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Github user tragicjun commented on the issue: https://github.com/apache/flink/pull/6082 I encountered another exception working with string type in Avro map/array, any advice whether I should open a separate issue or just reusing this one. ---
[jira] [Commented] (FLINK-9464) Clean up pom files
[ https://issues.apache.org/jira/browse/FLINK-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495252#comment-16495252 ] ASF GitHub Bot commented on FLINK-9464: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6093#discussion_r191794378 --- Diff: flink-connectors/flink-connector-filesystem/pom.xml --- @@ -67,13 +67,6 @@ under the License. - - org.apache.flink - flink-test-utils-junit --- End diff -- I think so. But let's wait what Travis says. > Clean up pom files > -- > > Key: FLINK-9464 > URL: https://issues.apache.org/jira/browse/FLINK-9464 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.6.0 > > > Some of Flink module's {{pom.xml}} files contain unnecessary or redundant > information. For example, the {{flink-clients}} {{pom.xml}} specifies twice > the {{maven-jar-plugin}} in the build section. Other modules explicitly > specify the version and scope of the {{flink-test-utils-junit}} module which > is managed by the parent's dependency management section. I propose to clean > these things up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json
[ https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495248#comment-16495248 ] Till Rohrmann commented on FLINK-9461: -- If {{flink-table}} would only contain some interfaces defining the sources, sinks and operators, then it would not be a big problem. But this is obviously not the case. The point I'm trying to make is that it feels a bit odd that a Flink connector depends on a Flink library while it would be so simple to get rid of it by introducing a dedicated module. Assume that we might get rid of the Scala dependency in {{flink-runtime}}, then we could still not remove the Scala suffix from the connectors because they depend on {{flink-table}}. > Disentangle flink-connector-kafka from flink-table and flink-json > - > > Key: FLINK-9461 > URL: https://issues.apache.org/jira/browse/FLINK-9461 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.6.0 > > > Currently, the {{flink-connector-kafka}} module has a dependency on > {{flink-table}} and {{flink-json}}. The reason seems to be that the module > contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though > the {{flink-table}} and {{flink-json}} dependency are marked as optional, the > {{flink-connector-kafka}} will still contain the table sources and sinks. I > think this is not a clean design. > I would propose to move the table sources and sinks into a dedicated module > which depends on {{flink-connector-kafka}}. That way we would better separate > dependencies and could remove {{flink-table}} and {{flink-json}} from > {{flink-connector-kafka}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6093: [FLINK-9464] Various pom.xml file clean ups
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6093#discussion_r191794378 --- Diff: flink-connectors/flink-connector-filesystem/pom.xml --- @@ -67,13 +67,6 @@ under the License. - - org.apache.flink - flink-test-utils-junit --- End diff -- I think so. But let's wait what Travis says. ---
[jira] [Commented] (FLINK-9464) Clean up pom files
[ https://issues.apache.org/jira/browse/FLINK-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495241#comment-16495241 ] ASF GitHub Bot commented on FLINK-9464: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6093#discussion_r191792861 --- Diff: flink-connectors/flink-connector-filesystem/pom.xml --- @@ -67,13 +67,6 @@ under the License. - - org.apache.flink - flink-test-utils-junit --- End diff -- i assume this is unused? > Clean up pom files > -- > > Key: FLINK-9464 > URL: https://issues.apache.org/jira/browse/FLINK-9464 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.6.0 > > > Some of Flink module's {{pom.xml}} files contain unnecessary or redundant > information. For example, the {{flink-clients}} {{pom.xml}} specifies twice > the {{maven-jar-plugin}} in the build section. Other modules explicitly > specify the version and scope of the {{flink-test-utils-junit}} module which > is managed by the parent's dependency management section. I propose to clean > these things up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6093: [FLINK-9464] Various pom.xml file clean ups
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6093#discussion_r191792861 --- Diff: flink-connectors/flink-connector-filesystem/pom.xml --- @@ -67,13 +67,6 @@ under the License. - - org.apache.flink - flink-test-utils-junit --- End diff -- i assume this is unused? ---
[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json
[ https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495233#comment-16495233 ] Fabian Hueske commented on FLINK-9461: -- Why should the API depend on the connector? All our connectors are tied to an API and the Table API / SQL is an API just like the DataStream or DataSet API. Should the DataStream API depend on Kafka (and Kinesis, Cassandra, ES, ...)? The API provides an interface and the connectors implement the interface. We followed that pattern for all APIs and connectors. Moving all connector-related API classes to {{flink-core}} as suggested by [~Zentol] would require a larger refactoring because some API classes reference core classes of the Table API (implemented in Scala). I'm not opposed to move connectors into individual modules but IMO, the question is whether a few additional classes in a JAR file justify fragmenting the connectors into API specific modules. > Disentangle flink-connector-kafka from flink-table and flink-json > - > > Key: FLINK-9461 > URL: https://issues.apache.org/jira/browse/FLINK-9461 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.6.0 > > > Currently, the {{flink-connector-kafka}} module has a dependency on > {{flink-table}} and {{flink-json}}. The reason seems to be that the module > contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though > the {{flink-table}} and {{flink-json}} dependency are marked as optional, the > {{flink-connector-kafka}} will still contain the table sources and sinks. I > think this is not a clean design. > I would propose to move the table sources and sinks into a dedicated module > which depends on {{flink-connector-kafka}}. That way we would better separate > dependencies and could remove {{flink-table}} and {{flink-json}} from > {{flink-connector-kafka}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495219#comment-16495219 ] Alexander Gardner commented on FLINK-8707: -- Hi Piotr Apologies again, all my time on prioritized DEV work for PROD. Integrating my state into the Source checkpoints has worked a treat :) Some new Flink jobs are nearly live so will reproduce data for you for this Jira for standalone & cluster. We never break our ulimit in PROD anyways now. Just want to get to the bottom of why so many handles! Alex > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box >Reporter: Alexander Gardner >Priority: Critical > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9464) Clean up pom files
[ https://issues.apache.org/jira/browse/FLINK-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495208#comment-16495208 ] ASF GitHub Bot commented on FLINK-9464: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6093 I've updated the PR @zentol and removed the version and scope tags from all `flink-test-utils-junit` dependencies in all modules. The scope and version is now defined in the dependency management section of the parent `pom.xml`. > Clean up pom files > -- > > Key: FLINK-9464 > URL: https://issues.apache.org/jira/browse/FLINK-9464 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.6.0 > > > Some of Flink module's {{pom.xml}} files contain unnecessary or redundant > information. For example, the {{flink-clients}} {{pom.xml}} specifies twice > the {{maven-jar-plugin}} in the build section. Other modules explicitly > specify the version and scope of the {{flink-test-utils-junit}} module which > is managed by the parent's dependency management section. I propose to clean > these things up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6093: [FLINK-9464] Various pom.xml file clean ups
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6093 I've updated the PR @zentol and removed the version and scope tags from all `flink-test-utils-junit` dependencies in all modules. The scope and version is now defined in the dependency management section of the parent `pom.xml`. ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495198#comment-16495198 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: +1 to proceed with at least an incremental fix to the issue, for now. The discussion of breaking up concerns of deserialization / meta info enrichment into 2 interfaces can go into a separate thread. For a minimal incremental fix for now, I would prefer this approach: {code} default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException {code} I think in the long run, we'll still very likely break this up into two interfaces, > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException
[ https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-9215. - Resolution: Fixed Fix Version/s: 1.6.0 > TaskManager Releasing - org.apache.flink.util.FlinkException > - > > Key: FLINK-9215 > URL: https://issues.apache.org/jira/browse/FLINK-9215 > Project: Flink > Issue Type: Bug > Components: Cluster Management, ResourceManager, Streaming >Affects Versions: 1.5.0 >Reporter: Bob Lau >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The exception stack is as follows: > {code:java} > //代码占位符 > {"root-exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","timestamp":1524106438997, > "all-exceptions":[{"exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","task":"async wait operator > (14/20)","location":"slave1:60199","timestamp":1524106438996 > }],"truncated":false} > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException
[ https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495193#comment-16495193 ] Dawid Wysakowicz commented on FLINK-9215: - Closed in 1.5.1 via: e1cce3634e09726d7cadd81bab25da288dc0ba49 Closed in 1.6 via: 69135e933f6ac575ff12ef0390b9754a87c5bca2 > TaskManager Releasing - org.apache.flink.util.FlinkException > - > > Key: FLINK-9215 > URL: https://issues.apache.org/jira/browse/FLINK-9215 > Project: Flink > Issue Type: Bug > Components: Cluster Management, ResourceManager, Streaming >Affects Versions: 1.5.0 >Reporter: Bob Lau >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.1 > > > The exception stack is as follows: > {code:java} > //代码占位符 > {"root-exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","timestamp":1524106438997, > "all-exceptions":[{"exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","task":"async wait operator > (14/20)","location":"slave1:60199","timestamp":1524106438996 > }],"truncated":false} > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client
[ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495187#comment-16495187 ] ASF GitHub Bot commented on FLINK-7386: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif do you think it would be possible that with a clean cut using a REST implementation, we no longer need to have separate modules anymore for ES 6.x, 7.x, 8.x or so on? i.e., it would only be a matter for the user of recompiling that REST-based implementation with a different ES client version. If no, then I would still prefer that we continue with the current approach this PR is proposing, since we need this fix in to have Elasticsearch 5.3+ working anyways. > Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ > client > > > Key: FLINK-7386 > URL: https://issues.apache.org/jira/browse/FLINK-7386 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector >Reporter: Dawid Wysakowicz >Assignee: Fang Yong >Priority: Critical > Fix For: 1.6.0 > > > In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and > has no longer the method {{add(ActionRequest)}}. > For more info see: https://github.com/elastic/elasticsearch/pull/20109 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif do you think it would be possible that with a clean cut using a REST implementation, we no longer need to have separate modules anymore for ES 6.x, 7.x, 8.x or so on? i.e., it would only be a matter for the user of recompiling that REST-based implementation with a different ES client version. If no, then I would still prefer that we continue with the current approach this PR is proposing, since we need this fix in to have Elasticsearch 5.3+ working anyways. ---
[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException
[ https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495183#comment-16495183 ] ASF GitHub Bot commented on FLINK-9215: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5879 > TaskManager Releasing - org.apache.flink.util.FlinkException > - > > Key: FLINK-9215 > URL: https://issues.apache.org/jira/browse/FLINK-9215 > Project: Flink > Issue Type: Bug > Components: Cluster Management, ResourceManager, Streaming >Affects Versions: 1.5.0 >Reporter: Bob Lau >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.1 > > > The exception stack is as follows: > {code:java} > //代码占位符 > {"root-exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","timestamp":1524106438997, > "all-exceptions":[{"exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","task":"async wait operator > (14/20)","location":"slave1:60199","timestamp":1524106438996 > }],"truncated":false} > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotP...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5879 ---
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495178#comment-16495178 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191772629 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; +
[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191772629 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + + final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); + final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; + this.deduplicat
[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495172#comment-16495172 ] ASF GitHub Bot commented on FLINK-7836: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5593 Thanks for the reminder @yanghua. I forgot about it and will merge it now. Thanks again for your contributions. Flink wouldn't be what it is without you! > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5593 Thanks for the reminder @yanghua. I forgot about it and will merge it now. Thanks again for your contributions. Flink wouldn't be what it is without you! ---
[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json
[ https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495170#comment-16495170 ] Till Rohrmann commented on FLINK-9461: -- Logically, it does not make sense to couple a connector with its user (here {{flink-table}}). If it all, then it the user should depend on it or ideally we have a common abstraction/interface living for example in {{flink-table-connectors}} which can be implemented by a concrete connector and dropped in dynamically. Not sure whether I buy the argument with too many maven modules. What would be the problem with that? It rather looks to me that making {{flink-connector-kafka}} depend on {{flink-table}} was a kind of short cut. > Disentangle flink-connector-kafka from flink-table and flink-json > - > > Key: FLINK-9461 > URL: https://issues.apache.org/jira/browse/FLINK-9461 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.6.0 > > > Currently, the {{flink-connector-kafka}} module has a dependency on > {{flink-table}} and {{flink-json}}. The reason seems to be that the module > contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though > the {{flink-table}} and {{flink-json}} dependency are marked as optional, the > {{flink-connector-kafka}} will still contain the table sources and sinks. I > think this is not a clean design. > I would propose to move the table sources and sinks into a dedicated module > which depends on {{flink-connector-kafka}}. That way we would better separate > dependencies and could remove {{flink-table}} and {{flink-json}} from > {{flink-connector-kafka}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json
[ https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495170#comment-16495170 ] Till Rohrmann edited comment on FLINK-9461 at 5/30/18 1:38 PM: --- Logically, it does not make sense to couple a connector with its user (here {{flink-table}}). If it all, then it the user should depend on the connector or ideally we have a common abstraction/interface living for example in {{flink-table-connectors}} which can be implemented by a concrete connector and dropped in dynamically. Not sure whether I buy the argument with too many maven modules. What would be the problem with that? It rather looks to me that making {{flink-connector-kafka}} depend on {{flink-table}} was a kind of short cut. was (Author: till.rohrmann): Logically, it does not make sense to couple a connector with its user (here {{flink-table}}). If it all, then it the user should depend on it or ideally we have a common abstraction/interface living for example in {{flink-table-connectors}} which can be implemented by a concrete connector and dropped in dynamically. Not sure whether I buy the argument with too many maven modules. What would be the problem with that? It rather looks to me that making {{flink-connector-kafka}} depend on {{flink-table}} was a kind of short cut. > Disentangle flink-connector-kafka from flink-table and flink-json > - > > Key: FLINK-9461 > URL: https://issues.apache.org/jira/browse/FLINK-9461 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.6.0 > > > Currently, the {{flink-connector-kafka}} module has a dependency on > {{flink-table}} and {{flink-json}}. The reason seems to be that the module > contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though > the {{flink-table}} and {{flink-json}} dependency are marked as optional, the > {{flink-connector-kafka}} will still contain the table sources and sinks. I > think this is not a clean design. > I would propose to move the table sources and sinks into a dedicated module > which depends on {{flink-connector-kafka}}. That way we would better separate > dependencies and could remove {{flink-table}} and {{flink-json}} from > {{flink-connector-kafka}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495164#comment-16495164 ] ASF GitHub Bot commented on FLINK-7789: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191768245 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala --- @@ -71,6 +71,9 @@ object AsyncDataStream { override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture)) } + override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { --- End diff -- Are those changes in `AsyncDataStream.scala` somewhere covered by the tests? If you comment out their method bodies, does at least one test fails? > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495167#comment-16495167 ] ASF GitHub Bot commented on FLINK-7789: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191760364 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -212,6 +212,20 @@ public static void countDown() { } } + private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction { + private static final long serialVersionUID = 1428714561365346128L; + + @Override + public void timeout(Integer input, ResultFuture resultFuture) throws Exception { + if (input != null && input % 2 == 0) { + resultFuture.complete(Collections.singletonList(input * 3)); + } else { + // ignore odd input number when it timeouts --- End diff -- Move this comment to the top of this static class? > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495165#comment-16495165 ] ASF GitHub Bot commented on FLINK-7789: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191768672 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala --- @@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function { * @param resultFuture to be completed with the result data */ def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit + + /** +* [[AsyncFunction.asyncInvoke]] timeout occurred. +* By default, the result future is exceptionally completed with timeout exception. +* +* @param input element coming from an upstream task +* @param resultFuture to be completed with the result data +*/ + def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = { +resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out.")) --- End diff -- same question about the tests. > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495168#comment-16495168 ] ASF GitHub Bot commented on FLINK-7789: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191757079 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -212,6 +212,20 @@ public static void countDown() { } } + private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction { --- End diff -- rename to `IgnoreTimeoutLazyAsyncFunction` ? > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495166#comment-16495166 ] ASF GitHub Bot commented on FLINK-7789: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191767070 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { --- End diff -- Please deduplicate the code of this method with `testAsyncTimeout()` to sth like that: ``` @Test public void testAsyncTimeoutFailure() throws Exception { testAsyncTimeout( new LazyAsyncFunction() Optional.of(TimeoutException.class), new StreamRecord<>(2, 5L)); } public void testAsyncTimeoutIgnore() throws Exception { testAsyncTimeout( new IgnoreTimeoutLazyAsyncFunction() Optional.of(TimeoutException.class), new StreamRecord<>(6, 0L), new StreamRecord<>(4, 5L)); } private void testAsyncTimeout( Optional> expectedException, StreamRecord... expectedRecords) throws Exception { // your current testAsyncTimeoutAware method body adjusted to above parameters } ``` or sth similar. > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191760364 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -212,6 +212,20 @@ public static void countDown() { } } + private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction { + private static final long serialVersionUID = 1428714561365346128L; + + @Override + public void timeout(Integer input, ResultFuture resultFuture) throws Exception { + if (input != null && input % 2 == 0) { + resultFuture.complete(Collections.singletonList(input * 3)); + } else { + // ignore odd input number when it timeouts --- End diff -- Move this comment to the top of this static class? ---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191768245 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala --- @@ -71,6 +71,9 @@ object AsyncDataStream { override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture)) } + override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { --- End diff -- Are those changes in `AsyncDataStream.scala` somewhere covered by the tests? If you comment out their method bodies, does at least one test fails? ---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191757079 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -212,6 +212,20 @@ public static void countDown() { } } + private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction { --- End diff -- rename to `IgnoreTimeoutLazyAsyncFunction` ? ---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191768672 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala --- @@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function { * @param resultFuture to be completed with the result data */ def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit + + /** +* [[AsyncFunction.asyncInvoke]] timeout occurred. +* By default, the result future is exceptionally completed with timeout exception. +* +* @param input element coming from an upstream task +* @param resultFuture to be completed with the result data +*/ + def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = { +resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out.")) --- End diff -- same question about the tests. ---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r191767070 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { --- End diff -- Please deduplicate the code of this method with `testAsyncTimeout()` to sth like that: ``` @Test public void testAsyncTimeoutFailure() throws Exception { testAsyncTimeout( new LazyAsyncFunction() Optional.of(TimeoutException.class), new StreamRecord<>(2, 5L)); } public void testAsyncTimeoutIgnore() throws Exception { testAsyncTimeout( new IgnoreTimeoutLazyAsyncFunction() Optional.of(TimeoutException.class), new StreamRecord<>(6, 0L), new StreamRecord<>(4, 5L)); } private void testAsyncTimeout( Optional> expectedException, StreamRecord... expectedRecords) throws Exception { // your current testAsyncTimeoutAware method body adjusted to above parameters } ``` or sth similar. ---
[jira] [Assigned] (FLINK-9366) Distribute Cache only works for client-accessible files
[ https://issues.apache.org/jira/browse/FLINK-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned FLINK-9366: --- Assignee: Dawid Wysakowicz > Distribute Cache only works for client-accessible files > --- > > Key: FLINK-9366 > URL: https://issues.apache.org/jira/browse/FLINK-9366 > Project: Flink > Issue Type: Bug > Components: Client, Local Runtime >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.6.0 > > > In FLINK-8620 the distributed cache was modified to the distribute files via > the blob store, instead of downloading them from a distributed filesystem. > Previously, taskmanagers would download requested files from the DFS. Now, > they retrieve it form the blob store. This requires the client to > preemptively upload all files used with distributed cache. > As a result it is no longer possible to use the distributed cache for files > that reside in a cluster-internal DFS, as the client cannot download it. This > is a regression from the previous behavior and may break existing setups. > [~aljoscha] [~dawidwys] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495159#comment-16495159 ] ASF GitHub Bot commented on FLINK-9423: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191768328 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; +
[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191768328 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + + final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); + final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; + this.deduplic
[jira] [Commented] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495157#comment-16495157 ] ASF GitHub Bot commented on FLINK-9410: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6087 Thanks for opening the PR @zhangminglei. I agree with @sihuazhou that it's not as easy as replacing `x` with `xAsync`. As @sihuazhou pointed out, we have to react to asynchronous Yarn messages depending on our internal state. So for example, when calling `startContainerAsync` we should store somewhere that we tried to start a container. If this request later fails, we should not fail the complete ResourceManager but rather send a new container request (as it is done in the `YarnResourceManager#onContainersAllocated` method). Additionally, while touching this code, we should check whether we can improve the unit tests for this component. Especially with the asynchronous node manager client, it should be rather simple to write good tests when creating a mock implementation and manually calling the callbacks. > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Critical > Fix For: 1.6.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6087: [FLINK-9410] [yarn] Replace NMClient with NMClientAsync i...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6087 Thanks for opening the PR @zhangminglei. I agree with @sihuazhou that it's not as easy as replacing `x` with `xAsync`. As @sihuazhou pointed out, we have to react to asynchronous Yarn messages depending on our internal state. So for example, when calling `startContainerAsync` we should store somewhere that we tried to start a container. If this request later fails, we should not fail the complete ResourceManager but rather send a new container request (as it is done in the `YarnResourceManager#onContainersAllocated` method). Additionally, while touching this code, we should check whether we can improve the unit tests for this component. Especially with the asynchronous node manager client, it should be rather simple to write good tests when creating a mock implementation and manually calling the callbacks. ---
[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException
[ https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495150#comment-16495150 ] ASF GitHub Bot commented on FLINK-9215: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5879 @dawidwys It's ok with me, the unrelated change should be a minor java doc hot fix. > TaskManager Releasing - org.apache.flink.util.FlinkException > - > > Key: FLINK-9215 > URL: https://issues.apache.org/jira/browse/FLINK-9215 > Project: Flink > Issue Type: Bug > Components: Cluster Management, ResourceManager, Streaming >Affects Versions: 1.5.0 >Reporter: Bob Lau >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.1 > > > The exception stack is as follows: > {code:java} > //代码占位符 > {"root-exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","timestamp":1524106438997, > "all-exceptions":[{"exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","task":"async wait operator > (14/20)","location":"slave1:60199","timestamp":1524106438996 > }],"truncated":false} > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotPool's l...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5879 @dawidwys It's ok with me, the unrelated change should be a minor java doc hot fix. ---
[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException
[ https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495146#comment-16495146 ] ASF GitHub Bot commented on FLINK-9215: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5879 Thanks @sihuazhou for the contribution. LGTM for me though I would remove the unrelated change if it is ok with you. > TaskManager Releasing - org.apache.flink.util.FlinkException > - > > Key: FLINK-9215 > URL: https://issues.apache.org/jira/browse/FLINK-9215 > Project: Flink > Issue Type: Bug > Components: Cluster Management, ResourceManager, Streaming >Affects Versions: 1.5.0 >Reporter: Bob Lau >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.1 > > > The exception stack is as follows: > {code:java} > //代码占位符 > {"root-exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","timestamp":1524106438997, > "all-exceptions":[{"exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","task":"async wait operator > (14/20)","location":"slave1:60199","timestamp":1524106438996 > }],"truncated":false} > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9215) TaskManager Releasing - org.apache.flink.util.FlinkException
[ https://issues.apache.org/jira/browse/FLINK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495143#comment-16495143 ] ASF GitHub Bot commented on FLINK-9215: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5879#discussion_r191764153 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -304,7 +305,8 @@ public int getMaxParallelism() { /** * Sets the maximum parallelism for the task. * -* @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE. +* @param maxParallelism The maximum parallelism to be set. +* Must be between 1 and {@link KeyGroupRangeAssignment#UPPER_BOUND_MAX_PARALLELISM}. --- End diff -- Unrelated change. Will remove while merging, if you are ok with it @sihuazhou . > TaskManager Releasing - org.apache.flink.util.FlinkException > - > > Key: FLINK-9215 > URL: https://issues.apache.org/jira/browse/FLINK-9215 > Project: Flink > Issue Type: Bug > Components: Cluster Management, ResourceManager, Streaming >Affects Versions: 1.5.0 >Reporter: Bob Lau >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.1 > > > The exception stack is as follows: > {code:java} > //代码占位符 > {"root-exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","timestamp":1524106438997, > "all-exceptions":[{"exception":" > org.apache.flink.util.FlinkException: Releasing TaskManager > 0d87aa6fa99a6c12e36775b1d6bceb19. > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManagerInternal(SlotPool.java:1067) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.releaseTaskManager(SlotPool.java:1050) > at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ","task":"async wait operator > (14/20)","location":"slave1:601
[GitHub] flink issue #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotPool's l...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5879 Thanks @sihuazhou for the contribution. LGTM for me though I would remove the unrelated change if it is ok with you. ---
[GitHub] flink pull request #5879: [FLINK-9215][resoucemanager] Reduce noise in SlotP...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5879#discussion_r191764153 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -304,7 +305,8 @@ public int getMaxParallelism() { /** * Sets the maximum parallelism for the task. * -* @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE. +* @param maxParallelism The maximum parallelism to be set. +* Must be between 1 and {@link KeyGroupRangeAssignment#UPPER_BOUND_MAX_PARALLELISM}. --- End diff -- Unrelated change. Will remove while merging, if you are ok with it @sihuazhou . ---
[jira] [Commented] (FLINK-9480) Let local recovery support rescaling
[ https://issues.apache.org/jira/browse/FLINK-9480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495140#comment-16495140 ] Till Rohrmann commented on FLINK-9480: -- I can totally see the benefits of speeding up rescaling operation by first trying to read local state and then falling back to remote state. In the first iteration, it could be a best effort approach as suggested by [~sihuazhou]. Next we could try to make the scheduling a bit smarter and eventually it could mean that we first load the required state to a TM before deploying tasks. I also agree with you two about the priorities wrt rescalable timers and state ttl. > Let local recovery support rescaling > > > Key: FLINK-9480 > URL: https://issues.apache.org/jira/browse/FLINK-9480 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Priority: Major > > Currently, local recovery only support restore from checkpoint and without > rescaling. Maybe we should enable it to support rescaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5721: Update kubernetes.md
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5721 ---
[GitHub] flink issue #5721: Update kubernetes.md
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5721 merging ---
[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long
[ https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495115#comment-16495115 ] ASF GitHub Bot commented on FLINK-9413: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6103#discussion_r191753857 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java --- @@ -103,9 +103,7 @@ public String toString() { // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && (producerState == ExecutionState.RUNNING || - producerState == ExecutionState.FINISHED || - producerState == ExecutionState.SCHEDULED || - producerState == ExecutionState.DEPLOYING)) { --- End diff -- This change will break deployments where `allowLazyDeployment` is set to `false`. In the Flip-6 code, this is per default set to `true`, but for the legacy mode, the value is `false`. Thus, we would have to first remove the legacy mode and `allowLazyDeployment`. > Tasks can fail with PartitionNotFoundException if consumer deployment takes > too long > > > Key: FLINK-9413 > URL: https://issues.apache.org/jira/browse/FLINK-9413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0, 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Critical > > {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of > the producer takes too long. More specifically, if it takes longer than the > {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up > and fail. > The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a > consuming task once the producer has been assigned a slot but we do not wait > until it is actually running. The problem should be fixed if we wait until > the task is in state {{RUNNING}} before assigning the result partition to the > consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6103#discussion_r191753857 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java --- @@ -103,9 +103,7 @@ public String toString() { // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && (producerState == ExecutionState.RUNNING || - producerState == ExecutionState.FINISHED || - producerState == ExecutionState.SCHEDULED || - producerState == ExecutionState.DEPLOYING)) { --- End diff -- This change will break deployments where `allowLazyDeployment` is set to `false`. In the Flip-6 code, this is per default set to `true`, but for the legacy mode, the value is `false`. Thus, we would have to first remove the legacy mode and `allowLazyDeployment`. ---
[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6103 @tillrohrmann Could you take a look on this PR ? Thank you. ---
[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long
[ https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495113#comment-16495113 ] ASF GitHub Bot commented on FLINK-9413: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6103 @tillrohrmann Could you take a look on this PR ? Thank you. > Tasks can fail with PartitionNotFoundException if consumer deployment takes > too long > > > Key: FLINK-9413 > URL: https://issues.apache.org/jira/browse/FLINK-9413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0, 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Critical > > {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of > the producer takes too long. More specifically, if it takes longer than the > {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up > and fail. > The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a > consuming task once the producer has been assigned a slot but we do not wait > until it is actually running. The problem should be fixed if we wait until > the task is in state {{RUNNING}} before assigning the result partition to the > consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495093#comment-16495093 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol PR is updated! > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
[ https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495092#comment-16495092 ] ASF GitHub Bot commented on FLINK-8873: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5649 @bowenli86 What's your opinion? If you are ok with not merging it, could you close this PR? > move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest > - > > Key: FLINK-8873 > URL: https://issues.apache.org/jira/browse/FLINK-8873 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Tests >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.6.0 > > > move unit tests of KeyedStream.scala from DataStreamTest.scala to > KeyedStreamTest.scala, in order to have clearer separation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol PR is updated! ---
[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5649 @bowenli86 What's your opinion? If you are ok with not merging it, could you close this PR? ---
[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long
[ https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495089#comment-16495089 ] ASF GitHub Bot commented on FLINK-9413: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/6103 [FLINK-9413] [distributed coordination] Tasks can fail with Partition… …NotFoundException if consumer deployment takes too long ## What is the purpose of the change Tasks can fail with PartitionNotFoundException if consumer deployment takes too long. And the producer has been assigned a slot but we do not wait until it is actually running. ## Brief change log Change the condition to make the producer wait until it is actually running. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-9413 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6103 commit a52434d14117fde4e911f9a8f81a2e10fdd9ba77 Author: zhangminglei Date: 2018-05-30T12:17:17Z [FLINK-9413] [distributed coordination] Tasks can fail with PartitionNotFoundException if consumer deployment takes too long > Tasks can fail with PartitionNotFoundException if consumer deployment takes > too long > > > Key: FLINK-9413 > URL: https://issues.apache.org/jira/browse/FLINK-9413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0, 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Critical > > {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of > the producer takes too long. More specifically, if it takes longer than the > {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up > and fail. > The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a > consuming task once the producer has been assigned a slot but we do not wait > until it is actually running. The problem should be fixed if we wait until > the task is in state {{RUNNING}} before assigning the result partition to the > consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/6103 [FLINK-9413] [distributed coordination] Tasks can fail with Partition⦠â¦NotFoundException if consumer deployment takes too long ## What is the purpose of the change Tasks can fail with PartitionNotFoundException if consumer deployment takes too long. And the producer has been assigned a slot but we do not wait until it is actually running. ## Brief change log Change the condition to make the producer wait until it is actually running. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-9413 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6103 commit a52434d14117fde4e911f9a8f81a2e10fdd9ba77 Author: zhangminglei Date: 2018-05-30T12:17:17Z [FLINK-9413] [distributed coordination] Tasks can fail with PartitionNotFoundException if consumer deployment takes too long ---
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495082#comment-16495082 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6089#discussion_r191740715 --- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh --- @@ -75,6 +76,8 @@ function verify_result { } function shutdown_elasticsearch_cluster { + local index=$1 --- End diff -- +1 > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495083#comment-16495083 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6089#discussion_r191740903 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -18,29 +18,38 @@ # End to end test for quick starts test. +# Usage: +# FLINK_DIR= flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- End diff -- will change the name > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495084#comment-16495084 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6089#discussion_r191740667 --- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh --- @@ -56,13 +56,14 @@ function verify_elasticsearch_process_exist { function verify_result { local numRecords=$1 +local index=$2 --- End diff -- +1 > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6089#discussion_r191740715 --- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh --- @@ -75,6 +76,8 @@ function verify_result { } function shutdown_elasticsearch_cluster { + local index=$1 --- End diff -- +1 ---
[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6089#discussion_r191740903 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -18,29 +18,38 @@ # End to end test for quick starts test. +# Usage: +# FLINK_DIR= flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- End diff -- will change the name ---
[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6089#discussion_r191740667 --- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh --- @@ -56,13 +56,14 @@ function verify_elasticsearch_process_exist { function verify_result { local numRecords=$1 +local index=$2 --- End diff -- +1 ---
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495079#comment-16495079 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol Thanks! found them :) > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)