[GitHub] [flink] zjuwangg commented on issue #10092: [FLINK-14580][hive] add HiveModuleFactory, HiveModuleDescriptor, and HiveModuleDescriptorValidator
zjuwangg commented on issue #10092: [FLINK-14580][hive] add HiveModuleFactory, HiveModuleDescriptor, and HiveModuleDescriptorValidator URL: https://github.com/apache/flink/pull/10092#issuecomment-550192726 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values
zhuzhurk commented on a change in pull request #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values URL: https://github.com/apache/flink/pull/10079#discussion_r342958883 ## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ## @@ -145,13 +148,15 @@ private ResourceSpec() { * @param other Reference to resource to merge in. * @return The new resource with merged values. */ - public ResourceSpec merge(ResourceSpec other) { + public ResourceSpec merge(final ResourceSpec other) { + checkNotNull(other, "Cannot merge with null resources"); + if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) { return UNKNOWN; } ResourceSpec target = new ResourceSpec( - this.cpuCores + other.cpuCores, + this.cpuCores.merge(other.cpuCores).getValue(), Review comment: It's for backward compatibility. But I think you are right we can change the constructor to directly accept ResourceValue. The change is made in 9627a892cd7076abf01174376dbcdfc9baf41c13. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14627) Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
Zili Chen created FLINK-14627: - Summary: Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder Key: FLINK-14627 URL: https://issues.apache.org/jira/browse/FLINK-14627 Project: Flink Issue Type: Improvement Components: Tests Reporter: Zili Chen Assignee: Zili Chen Fix For: 1.10.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14627) Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
[ https://issues.apache.org/jira/browse/FLINK-14627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14627: --- Labels: pull-request-available (was: ) > Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder > - > > Key: FLINK-14627 > URL: https://issues.apache.org/jira/browse/FLINK-14627 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Zili Chen >Assignee: Zili Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] TisonKun opened a new pull request #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
TisonKun opened a new pull request #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder URL: https://github.com/apache/flink/pull/10100 ## What is the purpose of the change Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder ## Brief change log ## Verifying this change This change is a code refactor without functionality changes so that covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#discussion_r342962212 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -211,26 +215,27 @@ private void checkTakeStateConditions() { @Override public void quiesce() { + final ReentrantLock lock = this.lock; Review comment: Ok, let's keep it as a black magic. I've read the explanation why it actually may help (`final` fields are useless in JVM, but `final` local variables can actually work as they are truly immutable). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder URL: https://github.com/apache/flink/pull/10100#issuecomment-550197124 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 2ffe594419c91e32291507339f4fc4844dbb6c57 (Wed Nov 06 08:16:49 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14602) Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.
[ https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968156#comment-16968156 ] vinoyang commented on FLINK-14602: -- [~gjy] Any opinion? If you do not have a comment. I will open a PR to fix it. > Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap > type. > > > Key: FLINK-14602 > URL: https://issues.apache.org/jira/browse/FLINK-14602 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Priority: Major > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of tasks to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14596) Update MapView and ListView interface to new type system
[ https://issues.apache.org/jira/browse/FLINK-14596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968158#comment-16968158 ] hailong wang commented on FLINK-14596: -- [~jark] Thank you all the same. > Update MapView and ListView interface to new type system > > > Key: FLINK-14596 > URL: https://issues.apache.org/jira/browse/FLINK-14596 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Fix For: 1.10.0 > > > MapView and ListView are still using TypeInformation as constructor > parameters. We should deprecate it and expose the new type system through > constructor. > And we can also make the data type member field as private, and use > refelction to access it, to make it user invisible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull URL: https://github.com/apache/flink/pull/10074#issuecomment-549322411 ## CI report: * 4807c59ae20273d5b4611395ca5467217d7d0ca3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134842985) * fedd3f2abd43a67d6461b895145964fcdf98af0d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134961727) * 5c077aaf9166fbbd068f80eb61294d9cfea2eddc : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values
flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values URL: https://github.com/apache/flink/pull/10079#issuecomment-549401980 ## CI report: * 5b7e9e832ed26f476d0149663b2f0dec9f1bc427 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134874614) * 6915be3cf16f83c6850eb18115a333a4a1206080 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135161948) * 9627a892cd7076abf01174376dbcdfc9baf41c13 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition
[ https://issues.apache.org/jira/browse/FLINK-14613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hailong wang updated FLINK-14613: - Description: In Temporal Table Join, We don't support using UDF in tempral table join key. For we can't analyze LookupKeys when call is an expression. When users use like this, the program run normally, and the result will be wrong. So we should add validation to prevent it. The SQL as following: {code:java} INSERT INTO A SELECT B.amount, B.currency, C.amount, C.product FROM B join C FOR SYSTEM_TIME AS OF B.proctime on B.amount = cancat(C.amount, 'r') and C.product = '1' {code} was: In Temporal Table Join, We don't support using UDF in tempral table join key. For we can't analyze LookupKeys when call is an expression. When users use like this, the program run normally, and the result will be wrong. So we should add validation to prevent it. The SQL as following: {code:java} INSERT INTO A SELECT B.amount, B.currency, C.amount, C.product FROM B join C FOR SYSTEM_TIME AS OF B.proctime on B.amount = C.amount and C.product = '1' {code} > Add validation check when applying UDF to tempral table key in Temporal > Table Join condition > --- > > Key: FLINK-14613 > URL: https://issues.apache.org/jira/browse/FLINK-14613 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.9.1 >Reporter: hailong wang >Priority: Major > Fix For: 1.10.0 > > > In Temporal Table Join, We don't support using UDF in tempral table join > key. For we can't analyze LookupKeys when call is an expression. When users > use like this, the program run normally, and the result will be wrong. So we > should add validation to prevent it. > The SQL as following: > {code:java} > INSERT INTO A > SELECT B.amount, B.currency, C.amount, C.product > FROM B join C FOR SYSTEM_TIME AS OF B.proctime > on B.amount = cancat(C.amount, 'r') and C.product = '1' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10094: [FLINK-14611][runtime] Move allVerticesInSameSlotSharingGroupByDefault setting from ExecutionConfig to StreamGraph
flinkbot edited a comment on issue #10094: [FLINK-14611][runtime] Move allVerticesInSameSlotSharingGroupByDefault setting from ExecutionConfig to StreamGraph URL: https://github.com/apache/flink/pull/10094#issuecomment-550114409 ## CI report: * 8d20db4cdea3e4e2a69590362675e7bf7ec1a913 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135157539) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the potential buffer consumers if the size of LocalBufferPool has been expanded.
flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the potential buffer consumers if the size of LocalBufferPool has been expanded. URL: https://github.com/apache/flink/pull/10097#issuecomment-550166959 ## CI report: * f72b6f5096ae65beb1647edc0e1db84ae9d320a9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135180539) * d4ee0f8a5f87b5837ed2bcdaa70f5d64f6e675b4 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135191058) * c973b6a83812e8bd548d0fcd2247b87efa1edf83 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10099: [FLINK-14589] [Runtime/Coordination] Redundant slot requests with the same AllocationID lead…
flinkbot commented on issue #10099: [FLINK-14589] [Runtime/Coordination] Redundant slot requests with the same AllocationID lead… URL: https://github.com/apache/flink/pull/10099#issuecomment-550199907 ## CI report: * 6bde76855e2ac3583581b28115db01901a87bc65 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition
[ https://issues.apache.org/jira/browse/FLINK-14613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968164#comment-16968164 ] hailong wang commented on FLINK-14613: -- Hi [~jark], I sorry for giving a wrong example. I have updated. In short, when using UDF in tempral table join key, the result is error. For the join condition is still B.amount = C.amount, but not B.amount = cancat(C.amount, 'r') .So when used like this, we should throw error when validation? > Add validation check when applying UDF to tempral table key in Temporal > Table Join condition > --- > > Key: FLINK-14613 > URL: https://issues.apache.org/jira/browse/FLINK-14613 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.9.1 >Reporter: hailong wang >Priority: Major > Fix For: 1.10.0 > > > In Temporal Table Join, We don't support using UDF in tempral table join > key. For we can't analyze LookupKeys when call is an expression. When users > use like this, the program run normally, and the result will be wrong. So we > should add validation to prevent it. > The SQL as following: > {code:java} > INSERT INTO A > SELECT B.amount, B.currency, C.amount, C.product > FROM B join C FOR SYSTEM_TIME AS OF B.proctime > on B.amount = cancat(C.amount, 'r') and C.product = '1' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942 ## CI report: * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128133485) * 977ccb5d91869e37027069d8b2b490bf850253ed : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/129659424) * 8347093d4cb32ed752bc01f5cd98abb2d803df94 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130842273) * 796de65585c861a67c46ba8c578e08302ade2cdc : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133371242) * 5817aa535fb834889eebb96478b7a40f936fb3c3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134200223) * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134418855) * 32120e401687204ef737ebe01875e293be71d720 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134550254) * bb787cffdb56629b880c50edbf368fa81f11db58 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134797060) * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135018920) * 560c6f43ae1b26dd81b360d5f32207c459824def : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135182722) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-14602) Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.
[ https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-14602: Assignee: vinoyang > Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap > type. > > > Key: FLINK-14602 > URL: https://issues.apache.org/jira/browse/FLINK-14602 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of tasks to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14602) Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.
[ https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968166#comment-16968166 ] Gary Yao commented on FLINK-14602: -- I assigned the ticket to you, [~yanghua] > Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap > type. > > > Key: FLINK-14602 > URL: https://issues.apache.org/jira/browse/FLINK-14602 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of tasks to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#discussion_r342643086 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -193,6 +197,11 @@ public SchedulerBase( this.failoverTopology = executionGraph.getFailoverTopology(); this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph); + + // Use the counter from execution graph to avoid modifying execution graph interfaces + // Can be a new SimpleCounter created here after the legacy scheduler is removed. + this.numberOfRestartsCounter = executionGraph.getNumberOfRestartsCounter(); + jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new MeterView(numberOfRestartsCounter)); Review comment: Now that I think about it, I find _restarts per second_ to be an awkward unit because: 1. It will be normally very small (by default < 1/60) 1. It is hard to come up with reasonable alerting thresholds other than _"> 0"_. For example, alerting on _number of restarts > 10 in the past hour_ is impossible. If a user had a time series database such as InfluxDB in place, the total number of restarts would suffice because the database can calculate the difference. I know that the requirement to introduce a meter comes from the user mailing list. I don't see a good solution at the moment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14243) flink hiveudf needs some check when it is using cache
[ https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968168#comment-16968168 ] jackylau commented on FLINK-14243: -- [~lzljs3620320] [~jark] yep, i read this HIVE-16196 and related issue . And it is hive bug, and my hive is hive-1.1.0-cdh5.7.0-src which is affected from the issue. thanks! > flink hiveudf needs some check when it is using cache > - > > Key: FLINK-14243 > URL: https://issues.apache.org/jira/browse/FLINK-14243 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Priority: Major > Fix For: 1.10.0 > > Attachments: Snipaste_2019-10-30_15-34-09.png > > > Flink1.9 brings in hive connector, but it will have some problem when the > original hive udf using cache. We konw that hive is processed level parallel > based on jvm, while flink/spark is task level parallel. If flink just calls > the hive udf, it wll exists thread-safe problem when using cache. > So it may need check the hive udf code and if it is not thread-safe, and set > the flink parallize=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#discussion_r342643086 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -193,6 +197,11 @@ public SchedulerBase( this.failoverTopology = executionGraph.getFailoverTopology(); this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph); + + // Use the counter from execution graph to avoid modifying execution graph interfaces + // Can be a new SimpleCounter created here after the legacy scheduler is removed. + this.numberOfRestartsCounter = executionGraph.getNumberOfRestartsCounter(); + jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new MeterView(numberOfRestartsCounter)); Review comment: Now that I think about it, I find _restarts per second_ to be an awkward unit because: 1. It will be normally very small (by default < 1/60) 1. It is hard to come up with reasonable alerting thresholds other than _"> 0"_. For example, alerting on _number of restarts > 10 in the past hour_ is impossible. If a user had a time series database such as InfluxDB in place, the total number of restarts would suffice because the database can calculate the difference. I know that the requirement to introduce a meter [comes from the user mailing list](http://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/%3cCAOmjRb2ti9MXOD2jFy0XzWViwoNM6tvU4DB5hSnG_=zbvec...@mail.gmail.com%3e). I don't see a good solution at the moment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14243) flink hiveudf needs some check when it is using cache
[ https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968170#comment-16968170 ] Jark Wu commented on FLINK-14243: - Thanks for verifying this. I will close this issue then. > flink hiveudf needs some check when it is using cache > - > > Key: FLINK-14243 > URL: https://issues.apache.org/jira/browse/FLINK-14243 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Priority: Major > Fix For: 1.10.0 > > Attachments: Snipaste_2019-10-30_15-34-09.png > > > Flink1.9 brings in hive connector, but it will have some problem when the > original hive udf using cache. We konw that hive is processed level parallel > based on jvm, while flink/spark is task level parallel. If flink just calls > the hive udf, it wll exists thread-safe problem when using cache. > So it may need check the hive udf code and if it is not thread-safe, and set > the flink parallize=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14243) flink hiveudf needs some check when it is using cache
[ https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-14243. --- Fix Version/s: (was: 1.10.0) Resolution: Not A Bug This is a Hive bug. > flink hiveudf needs some check when it is using cache > - > > Key: FLINK-14243 > URL: https://issues.apache.org/jira/browse/FLINK-14243 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Priority: Major > Attachments: Snipaste_2019-10-30_15-34-09.png > > > Flink1.9 brings in hive connector, but it will have some problem when the > original hive udf using cache. We konw that hive is processed level parallel > based on jvm, while flink/spark is task level parallel. If flink just calls > the hive udf, it wll exists thread-safe problem when using cache. > So it may need check the hive udf code and if it is not thread-safe, and set > the flink parallize=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-14243) flink hiveudf needs some check when it is using cache
[ https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-14243: Comment: was deleted (was: This is a Hive bug.) > flink hiveudf needs some check when it is using cache > - > > Key: FLINK-14243 > URL: https://issues.apache.org/jira/browse/FLINK-14243 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Priority: Major > Attachments: Snipaste_2019-10-30_15-34-09.png > > > Flink1.9 brings in hive connector, but it will have some problem when the > original hive udf using cache. We konw that hive is processed level parallel > based on jvm, while flink/spark is task level parallel. If flink just calls > the hive udf, it wll exists thread-safe problem when using cache. > So it may need check the hive udf code and if it is not thread-safe, and set > the flink parallize=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133733201) * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133738302) * 462d65ad2a87e998174481c51b7d38abced291c0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133968232) * 8ee3b9a975676432e034362a602e898531806df7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135039943) * c8b5ad6d1e2b0f9aea258199b780d9449376f648 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135092421) * 011484f925fa1bbd4ea1c1c33f1689913acf032c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135110538) * 28cf3bdd960cf1b5fe7595504b10f3cb7cdf558c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135126818) * c7c84c8da069c79f9fcc841bbd50ad6c7193b0f0 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14602) Change Type of Field tasks from ConcurrentHashMap to HashMap
[ https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-14602: - Summary: Change Type of Field tasks from ConcurrentHashMap to HashMap (was: Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.) > Change Type of Field tasks from ConcurrentHashMap to HashMap > > > Key: FLINK-14602 > URL: https://issues.apache.org/jira/browse/FLINK-14602 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of tasks to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] yanghua opened a new pull request #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap
yanghua opened a new pull request #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10101 ## What is the purpose of the change *This pull request changes Type of Field tasks from ConcurrentHashMap to HashMap* ## Brief change log - *Change Type of Field tasks from ConcurrentHashMap to HashMap* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14602) Change Type of Field tasks from ConcurrentHashMap to HashMap
[ https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14602: --- Labels: pull-request-available (was: ) > Change Type of Field tasks from ConcurrentHashMap to HashMap > > > Key: FLINK-14602 > URL: https://issues.apache.org/jira/browse/FLINK-14602 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of tasks to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values
flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values URL: https://github.com/apache/flink/pull/10079#issuecomment-549401980 ## CI report: * 5b7e9e832ed26f476d0149663b2f0dec9f1bc427 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134874614) * 6915be3cf16f83c6850eb18115a333a4a1206080 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135161948) * 9627a892cd7076abf01174376dbcdfc9baf41c13 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135207817) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull URL: https://github.com/apache/flink/pull/10074#issuecomment-549322411 ## CI report: * 4807c59ae20273d5b4611395ca5467217d7d0ca3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134842985) * fedd3f2abd43a67d6461b895145964fcdf98af0d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134961727) * 5c077aaf9166fbbd068f80eb61294d9cfea2eddc : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135207803) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#discussion_r342974956 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -193,6 +197,11 @@ public SchedulerBase( this.failoverTopology = executionGraph.getFailoverTopology(); this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph); + + // Use the counter from execution graph to avoid modifying execution graph interfaces + // Can be a new SimpleCounter created here after the legacy scheduler is removed. + this.numberOfRestartsCounter = executionGraph.getNumberOfRestartsCounter(); + jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new MeterView(numberOfRestartsCounter)); Review comment: cc: @stevenzwu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap
flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10101#issuecomment-550208670 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit d5de7837dc84014509fe5603bb9f5d379c597309 (Wed Nov 06 08:50:09 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
zhuzhurk commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#discussion_r342911786 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -193,6 +197,11 @@ public SchedulerBase( this.failoverTopology = executionGraph.getFailoverTopology(); this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph); + + // Use the counter from execution graph to avoid modifying execution graph interfaces + // Can be a new SimpleCounter created here after the legacy scheduler is removed. + this.numberOfRestartsCounter = executionGraph.getNumberOfRestartsCounter(); + jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new MeterView(numberOfRestartsCounter)); Review comment: Yes the rate is awkward if the event happens in a very low frequency. I think a counter `numberOfRestarts` is needed to enable users to build alerts in a more flexible way. And the question is: Whether to introduce a meter `numberOfRestartsPerSecond`? - Pros: The meter enables users to build alerts for restarts even if their monitoring system does not support variations of values. - Cons: The integral of rate value is not accurate so that users cannot use it to build reliable alerts other than ">0". This is limited by the time interval used to sample metrics in Flink, as well as in the external metric collecting system. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14612) Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
[ https://issues.apache.org/jira/browse/FLINK-14612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-14612: - Summary: Change Type of Field intermediateResults from ConcurrentHashMap to HashMap (was: Degenerate the current ConcurrentHashMap type of intermediateResults to a normal HashMap type.) > Change Type of Field intermediateResults from ConcurrentHashMap to HashMap > -- > > Key: FLINK-14612 > URL: https://issues.apache.org/jira/browse/FLINK-14612 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Priority: Major > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of intermediateResults to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the potential buffer consumers if the size of LocalBufferPool has been expanded.
flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the potential buffer consumers if the size of LocalBufferPool has been expanded. URL: https://github.com/apache/flink/pull/10097#issuecomment-550166959 ## CI report: * f72b6f5096ae65beb1647edc0e1db84ae9d320a9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135180539) * d4ee0f8a5f87b5837ed2bcdaa70f5d64f6e675b4 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/135191058) * c973b6a83812e8bd548d0fcd2247b87efa1edf83 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135207848) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#discussion_r342974709 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutor.java ## @@ -30,7 +27,44 @@ import java.util.concurrent.RejectedExecutionException; /** - * Interface for an {@link Executor} build around a {@link Mailbox}-based execution model. + * Interface for an {@link Executor} build around a mailbox-based execution model (see {@link TaskMailbox}). {@code + * MailboxExecutor} can also execute downstream messages of a mailbox by yielding control from the task thread. + * + * All submission functions can be called from any thread and will enqueue the action for further processing in a + * FIFO fashion. + * + * The yielding functions avoid the following situation: One operator cannot fully process an input record and + * blocks the task thread until some resources are available. However, since the introduction of the mailbox model + * blocking the task thread will not only block new inputs but also all events from being processed. If the resources + * depend on downstream operators being able to process such events (e.g., timers), then we may easily arrive at some + * livelocks. + * + * The yielding functions will only process events from the operator itself and any downstream operator. Events of upstream + * operators are only processed when the input has been fully processed or if they yield themselves. This method avoid + * congestion and potential deadlocks, but will process {@link Mail}s slightly out-of-order, effectively creating a view + * on the mailbox that contains no message from upstream operators. + * + * All yielding functions must be called in the mailbox thread (see {@link TaskMailbox#isMailboxThread()}) to not + * violate the single-threaded execution model. There are two typical cases, both waiting until the resource is + * available. The main difference is if the resource becomes available through a mailbox message itself or not. + * + * If the resource becomes available through a mailbox mail, we can effectively block the task thread. + * Implicitly, this requires the mail to be enqueued by a different thread. + * {@code + * while (resource not available) { + * mailboxExecutor.yield(); + * } + * } + * + * If the resource becomes available through an external mechanism or the corresponding mail needs to be enqueued + * in the task thread, we cannot block. + * {@code + * while (resource not available) { + * if (!mailboxExecutor.tryYield()) { + * do stuff or sleep for a small amount of time + * } + * } + * } */ Review comment: aren't we missing `@PublicEvolving` annotation here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder URL: https://github.com/apache/flink/pull/10100#issuecomment-550210232 ## CI report: * 2ffe594419c91e32291507339f4fc4844dbb6c57 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#discussion_r342963223 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -185,31 +171,19 @@ private Mail takeHeadInternal(int priority) throws IllegalStateException { } } - private boolean isEmpty() { - return count == 0; - } - - private boolean isPutAbleState() { - return state == State.OPEN; - } - - private boolean isTakeAbleState() { - return state != State.CLOSED; - } - private void checkPutStateConditions() { final State state = this.state; - if (!isPutAbleState()) { + if (this.state != OPEN) { Review comment: I mean instead `if (condition) throw new IllegalStateException(msg)` you can just replace it with `checkState(condition, msg)? But ok, `checkPutStateConditions` could be on the hot path, so it might be a good idea to hide creating the `msg` inside the `if` branch, so the message is not created every time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#discussion_r342970858 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -77,22 +112,30 @@ public boolean hasMail() { @Override public Optional tryTake(int priority) throws IllegalStateException { + Optional head = tryTakeFromBatch(); Review comment: Ok, but in that case: - please add a comment about this contract somewhere in this class - and either please add an ITCase coverage for the case when two operators are ping ponging themselves with adding mails and yielding, which will test that `tryTake` & `take` do not expand the batch. - or add additional comment inside the existing unit test, why is it testing for that contract This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10099: [FLINK-14589] [Runtime/Coordination] Redundant slot requests with the same AllocationID lead…
flinkbot edited a comment on issue #10099: [FLINK-14589] [Runtime/Coordination] Redundant slot requests with the same AllocationID lead… URL: https://github.com/apache/flink/pull/10099#issuecomment-550199907 ## CI report: * 6bde76855e2ac3583581b28115db01901a87bc65 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135207867) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14628) Wordcount on Docker test (custom fs plugin) fails on Travis
Gary Yao created FLINK-14628: Summary: Wordcount on Docker test (custom fs plugin) fails on Travis Key: FLINK-14628 URL: https://issues.apache.org/jira/browse/FLINK-14628 Project: Flink Issue Type: Bug Components: FileSystems, Tests Affects Versions: 1.10.0 Reporter: Gary Yao https://api.travis-ci.org/v3/job/607616429/log.txt {noformat} Successfully tagged test_docker_embedded_job:latest ~/build/apache/flink sort: cannot read: '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-53405131685/out/docker_wc_out*': No such file or directory FAIL WordCount: Output hash mismatch. Got d41d8cd98f00b204e9800998ecf8427e, expected 72a690412be8928ba239c2da967328a5. head hexdump of actual: head: cannot open '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-53405131685/out/docker_wc_out*' for reading: No such file or directory [FAIL] Test script contains errors. Checking for errors... No errors in log files. Checking for exceptions... No exceptions in log files. Checking for non-empty .out files... grep: /home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/log/*.out: No such file or directory No non-empty .out files. {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] yanghua opened a new pull request #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
yanghua opened a new pull request #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10102 ## What is the purpose of the change *This pull request changes Type of Field intermediateResults from ConcurrentHashMap to HashMap* ## Brief change log - *Change Type of Field intermediateResults from ConcurrentHashMap to HashMap* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14612) Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
[ https://issues.apache.org/jira/browse/FLINK-14612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14612: --- Labels: pull-request-available (was: ) > Change Type of Field intermediateResults from ConcurrentHashMap to HashMap > -- > > Key: FLINK-14612 > URL: https://issues.apache.org/jira/browse/FLINK-14612 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Priority: Major > Labels: pull-request-available > > After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no > longer be plagued by concurrency issues. So, we can degenerate the current > ConcurrentHashMap type of intermediateResults to a normal HashMap type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14555) Streaming File Sink s3 end-to-end test stalls
[ https://issues.apache.org/jira/browse/FLINK-14555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968189#comment-16968189 ] Gary Yao commented on FLINK-14555: -- Another instance https://api.travis-ci.org/v3/job/607616431/log.txt > Streaming File Sink s3 end-to-end test stalls > - > > Key: FLINK-14555 > URL: https://issues.apache.org/jira/browse/FLINK-14555 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Priority: Critical > Labels: test-stability > > [https://api.travis-ci.org/v3/job/603882577/log.txt] > {noformat} > == > Running 'Streaming File Sink s3 end-to-end test' > == > TEST_DATA_DIR: > /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-36388677539 > Flink dist directory: > /home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT > Found AWS bucket [secure], running the e2e test. > Found AWS access key, running the e2e test. > Found AWS secret key, running the e2e test. > Executing test with dynamic openSSL linkage (random selection between > 'dynamic' and 'static') > Setting up SSL with: internal OPENSSL dynamic > Using SAN > dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1 > Certificate was added to keystore > Certificate was added to keystore > Certificate reply was installed in keystore > MAC verified OK > Setting up SSL with: rest OPENSSL dynamic > Using SAN > dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1 > Certificate was added to keystore > Certificate was added to keystore > Certificate reply was installed in keystore > MAC verified OK > Mutual ssl auth: true > Use s3 output > Starting cluster. > Starting standalonesession daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Dispatcher REST endpoint is up. > [INFO] 1 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > [INFO] 2 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > [INFO] 3 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Submitting job. > Job (c3a9bb7d3f47d63ebccbec5acb1342cb) is running. > Waiting for job (c3a9bb7d3f47d63ebccbec5acb1342cb) to have at least 3 > completed checkpoints ... > Killing TM > TaskManager 9227 killed. > Starting TM > [INFO] 3 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Waiting for restart to happen > Killing 2 TMs > TaskManager 8618 killed. > TaskManager 9658 killed. > Starting 2 TMs > [INFO] 2 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > [INFO] 3 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Waiting for restart to happen > Waiting until all values have been produced > Number of produced values 18080/6 > No output has been received in the last 10m0s, this potentially indicates a > stalled build or something wrong with the build itself. > Check the details on how to adjust your build configuration on: > https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-rec
[jira] [Closed] (FLINK-14555) Streaming File Sink s3 end-to-end test stalls
[ https://issues.apache.org/jira/browse/FLINK-14555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-14555. Resolution: Duplicate > Streaming File Sink s3 end-to-end test stalls > - > > Key: FLINK-14555 > URL: https://issues.apache.org/jira/browse/FLINK-14555 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Priority: Critical > Labels: test-stability > > [https://api.travis-ci.org/v3/job/603882577/log.txt] > {noformat} > == > Running 'Streaming File Sink s3 end-to-end test' > == > TEST_DATA_DIR: > /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-36388677539 > Flink dist directory: > /home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT > Found AWS bucket [secure], running the e2e test. > Found AWS access key, running the e2e test. > Found AWS secret key, running the e2e test. > Executing test with dynamic openSSL linkage (random selection between > 'dynamic' and 'static') > Setting up SSL with: internal OPENSSL dynamic > Using SAN > dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1 > Certificate was added to keystore > Certificate was added to keystore > Certificate reply was installed in keystore > MAC verified OK > Setting up SSL with: rest OPENSSL dynamic > Using SAN > dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1 > Certificate was added to keystore > Certificate was added to keystore > Certificate reply was installed in keystore > MAC verified OK > Mutual ssl auth: true > Use s3 output > Starting cluster. > Starting standalonesession daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Dispatcher REST endpoint is up. > [INFO] 1 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > [INFO] 2 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > [INFO] 3 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Submitting job. > Job (c3a9bb7d3f47d63ebccbec5acb1342cb) is running. > Waiting for job (c3a9bb7d3f47d63ebccbec5acb1342cb) to have at least 3 > completed checkpoints ... > Killing TM > TaskManager 9227 killed. > Starting TM > [INFO] 3 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Waiting for restart to happen > Killing 2 TMs > TaskManager 8618 killed. > TaskManager 9658 killed. > Starting 2 TMs > [INFO] 2 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > [INFO] 3 instance(s) of taskexecutor are already running on > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Starting taskexecutor daemon on host > travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866. > Waiting for restart to happen > Waiting until all values have been produced > Number of produced values 18080/6 > No output has been received in the last 10m0s, this potentially indicates a > stalled build or something wrong with the build itself. > Check the details on how to adjust your build configuration on: > https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-received > The build has been terminated > {noformat} > -- This message was sent by Atlassian Jira (
[jira] [Commented] (FLINK-14311) Streaming File Sink end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-14311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968191#comment-16968191 ] Gary Yao commented on FLINK-14311: -- Another instance https://api.travis-ci.org/v3/job/607616431/log.txt > Streaming File Sink end-to-end test failed on Travis > > > Key: FLINK-14311 > URL: https://issues.apache.org/jira/browse/FLINK-14311 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{Streaming File Sink end-to-end test}} fails on Travis because it does > not produce output for 10 minutes. > https://api.travis-ci.org/v3/job/591992274/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14311) Streaming File Sink end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-14311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968192#comment-16968192 ] Gary Yao commented on FLINK-14311: -- Another instance https://api.travis-ci.org/v3/job/603882577/log.txt > Streaming File Sink end-to-end test failed on Travis > > > Key: FLINK-14311 > URL: https://issues.apache.org/jira/browse/FLINK-14311 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{Streaming File Sink end-to-end test}} fails on Travis because it does > not produce output for 10 minutes. > https://api.travis-ci.org/v3/job/591992274/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14505) SQL Client end-to-end test for Kafka 0.10 nightly end-to-end test failed on travis
[ https://issues.apache.org/jira/browse/FLINK-14505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968194#comment-16968194 ] Gary Yao commented on FLINK-14505: -- Another instance: https://api.travis-ci.org/v3/job/607616437/log.txt > SQL Client end-to-end test for Kafka 0.10 nightly end-to-end test failed on > travis > -- > > Key: FLINK-14505 > URL: https://issues.apache.org/jira/browse/FLINK-14505 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Client, Tests >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{SQL Client end-to-end test for Kafka 0.10}} nightly end-to-end test > failed on Travis with > {code} > [FAIL] 'SQL Client end-to-end test for Kafka 0.10' failed after 0 minutes and > 37 seconds! Test exited with exit code 1 > No taskexecutor daemon (pid: 26336) is running anymore on > travis-job-2c704099-0645-4182-942d-3fb5c2e10e54. > No standalonesession daemon to stop on host > travis-job-2c704099-0645-4182-942d-3fb5c2e10e54. > {code} > https://api.travis-ci.org/v3/job/600710614/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10102#issuecomment-550212774 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ab01afead96b82972e207dbdc5fa3f37e7515292 (Wed Nov 06 09:00:46 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-14612).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo opened a new pull request #10103: [FLINK-14506][python][build] Improve the release script for Python API release package
HuangXingBo opened a new pull request #10103: [FLINK-14506][python][build] Improve the release script for Python API release package URL: https://github.com/apache/flink/pull/10103 ## What is the purpose of the change *This pr improve the release script for Python API release package* ## Brief change log - *Support specifying install components in lint-python.sh* - *Support choose miniconda default python env to build python package in create_binary_release.sh* ## Verifying this change *This change is a release script changes without any test coverage.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14506) Improve the release script for Python API release package
[ https://issues.apache.org/jira/browse/FLINK-14506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14506: --- Labels: pull-request-available (was: ) > Improve the release script for Python API release package > - > > Key: FLINK-14506 > URL: https://issues.apache.org/jira/browse/FLINK-14506 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Build System >Reporter: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > > Currently it assumes that the required Python environment(i.e. Python 3.5+, > setuptools, etc) are available in the local machine when executing > create_binary_release.sh to perform the release. > This could be improved and there are two options in my mind: > 1) Reuse the script defined in flink-python module(lint-python.sh) to create > the required virtual environment and build the Python package using the > created virtual environment. > 2) Document the dependencies at the page "[Create a Flink > Release|https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1]"; > and add validation check in create_binary_release.sh to throw an meaningful > error with hints how to fix it. > Personally I prefer to option 1) as it's transparent for users. Welcome any > feedback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the release script for Python API release package
flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the release script for Python API release package URL: https://github.com/apache/flink/pull/10103#issuecomment-550214576 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 7c842be4f1d79ff3206322d72f4cb1dcebd52c7e (Wed Nov 06 09:05:16 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-14506).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-14375) Avoid to notify ineffective state updates to scheduler
[ https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-14375. Resolution: Fixed 1.10: 213ddb8f4b6e2c6ede726b4abfdf29f091ce2713 > Avoid to notify ineffective state updates to scheduler > -- > > Key: FLINK-14375 > URL: https://issues.apache.org/jira/browse/FLINK-14375 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 40m > Remaining Estimate: 0h > > The DefaultScheduler triggers failover if a task is notified to be FAILED. > However, in the case the multiple tasks in the same region fail together, it > will trigger multiple failovers. The later triggered failovers are useless, > lead to concurrent failovers and will increase the restart attempts count. > I think the deep reason for this issue is that some fake state changes are > notified to the DefaultScheduler. > The case above is a FAILED state change from TM will turn a CANCELING vertex > to CANCELED, and the actual state transition is to CANCELED. But a FAILED > state is notified to DefaultScheduler. > And there can be another possible issue caused by it, that a FINISHED state > change is notified from TM when a vertex is CANCELING. The vertex will become > CANCELED, while its FINISHED state change will be notified to > DefaultScheduler which may trigger downstream task scheduling. > I'd propose to fix it by filtering out ineffective state updates in > {{SchedulerBase#updateTaskExecutionState}} and only notify effective ones to > scheduler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133733201) * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133738302) * 462d65ad2a87e998174481c51b7d38abced291c0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133968232) * 8ee3b9a975676432e034362a602e898531806df7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135039943) * c8b5ad6d1e2b0f9aea258199b780d9449376f648 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135092421) * 011484f925fa1bbd4ea1c1c33f1689913acf032c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135110538) * 28cf3bdd960cf1b5fe7595504b10f3cb7cdf558c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135126818) * c7c84c8da069c79f9fcc841bbd50ad6c7193b0f0 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135211148) * efc3d3849f19ea846ec5d5df816e1c9ee7915619 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition
[ https://issues.apache.org/jira/browse/FLINK-14613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968204#comment-16968204 ] Jark Wu commented on FLINK-14613: - For temporary solution, adding an exception is fine. However, even there is an UDF in the join key, if we still have other equi-conditions on field reference (above example), it should still work because we have lookup key. I think if the proper fixing is not a big job, we should fix the root problem. > Add validation check when applying UDF to tempral table key in Temporal > Table Join condition > --- > > Key: FLINK-14613 > URL: https://issues.apache.org/jira/browse/FLINK-14613 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.9.1 >Reporter: hailong wang >Priority: Major > Fix For: 1.10.0 > > > In Temporal Table Join, We don't support using UDF in tempral table join > key. For we can't analyze LookupKeys when call is an expression. When users > use like this, the program run normally, and the result will be wrong. So we > should add validation to prevent it. > The SQL as following: > {code:java} > INSERT INTO A > SELECT B.amount, B.currency, C.amount, C.product > FROM B join C FOR SYSTEM_TIME AS OF B.proctime > on B.amount = cancat(C.amount, 'r') and C.product = '1' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14243) flink hiveudf needs some check when it is using cache
[ https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968209#comment-16968209 ] jackylau commented on FLINK-14243: -- And i found the flink built in function JsonUtil has a lock contention problem at LOG. Although, flink1.9 has removed the call relation . So i think it should be fixed now. > flink hiveudf needs some check when it is using cache > - > > Key: FLINK-14243 > URL: https://issues.apache.org/jira/browse/FLINK-14243 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Priority: Major > Attachments: Snipaste_2019-10-30_15-34-09.png > > > Flink1.9 brings in hive connector, but it will have some problem when the > original hive udf using cache. We konw that hive is processed level parallel > based on jvm, while flink/spark is task level parallel. If flink just calls > the hive udf, it wll exists thread-safe problem when using cache. > So it may need check the hive udf code and if it is not thread-safe, and set > the flink parallize=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull URL: https://github.com/apache/flink/pull/10074#issuecomment-549322411 ## CI report: * 4807c59ae20273d5b4611395ca5467217d7d0ca3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134842985) * fedd3f2abd43a67d6461b895145964fcdf98af0d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134961727) * 5c077aaf9166fbbd068f80eb61294d9cfea2eddc : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135207803) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap
flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10101#issuecomment-55034 ## CI report: * d5de7837dc84014509fe5603bb9f5d379c597309 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
flinkbot edited a comment on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder URL: https://github.com/apache/flink/pull/10100#issuecomment-550210232 ## CI report: * 2ffe594419c91e32291507339f4fc4844dbb6c57 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135211187) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the release script for Python API release package
flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the release script for Python API release package URL: https://github.com/apache/flink/pull/10103#issuecomment-550222346 ## CI report: * 7c842be4f1d79ff3206322d72f4cb1dcebd52c7e : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10102#issuecomment-55078 ## CI report: * ab01afead96b82972e207dbdc5fa3f37e7515292 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342978122 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +import javax.annotation.Nonnegative; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Samples the output buffer availability of tasks for back pressure tracking. + * + * @see org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker + */ +class TaskBackPressureSampleService { Review comment: TaskBackPressureSampleService -> BackPressureSampleService, because it is actually a `TaskExecutor` level service. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342988898 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -461,16 +461,15 @@ AbstractInvokable getInvokable() { return invokable; } - public StackTraceElement[] getStackTraceOfExecutingThread() { - final AbstractInvokable invokable = this.invokable; - - if (invokable == null) { - return new StackTraceElement[0]; + public boolean isAvailableForOutput() { + if (invokable == null || consumableNotifyingPartitionWriters.length == 0) { + return true; } - - return invokable.getExecutingThread() - .orElse(executingThread) - .getStackTrace(); + final CompletableFuture[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length]; + for(int i = 0; i < outputFutures.length; ++i) { Review comment: whitespace after `for` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342968437 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java ## @@ -295,42 +292,30 @@ int getNumberOfPendingSamples() { // /** -* A pending stack trace sample, which collects stack traces and owns a -* {@link StackTraceSample} promise. +* A pending task back pressure stats, which collects task back pressure Review comment: `A pending task back pressure sample` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342958896 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java ## @@ -46,22 +46,20 @@ String getAddress(); /** -* Request a stack trace sample from the given task. +* Request to sample the back pressure ratio from the given task. * * @param executionAttemptID identifying the task to sample -* @param sampleId of the sample -* @param numSamples to take from the given task -* @param delayBetweenSamples to wait for -* @param maxStackTraceDepth of the returned sample -* @param timeout of the request -* @return Future of stack trace sample response +* @param sampleId id of the sample +* @param numSamples number of samples to take +* @param delayBetweenSamples time to wait between samples +* @param timeout rpc request timeout +* @return Future containing the task back pressure sampling results */ - CompletableFuture requestStackTraceSample( + CompletableFuture sampleTaskBackPressure( Review comment: sampleTaskBackPressure -> requestTaskBackPressure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342972014 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java ## @@ -50,82 +51,60 @@ * Back pressure statistics tracker. * * Back pressure is determined by sampling running tasks. If a task is - * slowed down by back pressure it will be stuck in memory requests to a - * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}. - * - * The back pressured stack traces look like this: - * - * - * java.lang.Object.wait(Native Method) - * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) - * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING - * request - * [...] - * + * slowed down by back pressure, there should be no free buffers in output + * {@link org.apache.flink.runtime.io.network.buffer.BufferPool} of the task. */ public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker { private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class); - /** Maximum stack trace depth for samples. */ - static final int MAX_STACK_TRACE_DEPTH = 8; - - /** Expected class name for back pressure indicating stack trace element. */ - static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool"; - - /** Expected method name for back pressure indicating stack trace element. */ - static final String EXPECTED_METHOD_NAME = "requestBufferBuilderBlocking"; - /** Lock guarding trigger operations. */ private final Object lock = new Object(); - /* Stack trace sample coordinator. */ - private final StackTraceSampleCoordinator coordinator; + /** Back pressure sample coordinator. */ Review comment: This comment can be removed because of useless information This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342966862 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java ## @@ -156,46 +154,43 @@ public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) { // Trigger all samples for (Execution execution: executions) { - final CompletableFuture stackTraceSampleFuture = execution.requestStackTraceSample( - sampleId, - numSamples, - delayBetweenSamples, - maxStackTraceDepth, - timeout); - - stackTraceSampleFuture.handleAsync( - (StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> { - if (stackTraceSampleResponse != null) { - collectStackTraces( - stackTraceSampleResponse.getSampleId(), - stackTraceSampleResponse.getExecutionAttemptID(), - stackTraceSampleResponse.getSamples()); + final CompletableFuture taskBackPressureFuture = + execution.sampleTaskBackPressure(sampleId, numSamples, delayBetweenSamples, timeout); + + taskBackPressureFuture.handleAsync( + (TaskBackPressureSampleResponse taskBackPressureSampleResponse, Throwable throwable) -> { + if (taskBackPressureSampleResponse != null) { + collectTaskBackPressureStat( Review comment: `collectTaskBackPressureStat` -> `collectTaskBackPressureSample`. It is better to unify some terms here. E.g. `cancelTaskBackPressureSample`, `collectTaskBackPressureSample`, `TaskBackPressureSampleResponse` Also for the below class `PendingTaskBackPressureStats` -> `PendingTaskBackPressureSample` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342984468 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskOutputAvailabilitySampleableTaskAdapter.java ## @@ -26,18 +26,18 @@ import static java.util.Objects.requireNonNull; /** - * Adapts {@link Task} to {@link StackTraceSampleableTask}. + * Adapts {@link Task} to {@link OutputAvailabilitySampleableTask}. */ -class TaskStackTraceSampleableTaskAdapter implements StackTraceSampleableTask { +class TaskOutputAvailabilitySampleableTaskAdapter implements OutputAvailabilitySampleableTask { Review comment: We could simplify to make `Task` class implement `BackPressureSampleableTask` directly, then to delete this class. Furthermore we could remove `#isRunning` method from the interface, because task can judge the state internally and return the boolean value via `BackPressureSampleableTask#isBackPressured`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342956700 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -925,33 +925,30 @@ public void fail(Throwable t) { } /** -* Request a stack trace sample from the task of this execution. +* Request to sample the back pressure ratio from the task of this execution. * -* @param sampleId of the stack trace sample -* @param numSamples the sample should contain -* @param delayBetweenSamples to wait -* @param maxStackTraceDepth of the samples -* @param timeout until the request times out -* @return Future stack trace sample response +* @param sampleId id of the sample +* @param numSamples the number of samples to take +* @param delayBetweenSamples time to wait between samples +* @param timeout the request times out +* @return Future containing the task back pressure sampling results */ - public CompletableFuture requestStackTraceSample( + public CompletableFuture sampleTaskBackPressure( Review comment: nit : sampleTaskBackPressure -> requestTaskBackPressure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342965035 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java ## @@ -63,57 +64,55 @@ /** Time out after the expected sampling duration. */ private final long sampleTimeout; - /** In progress samples (guarded by lock). */ - private final Map pendingSamples = new HashMap<>(); + /** In progress samples. */ + @GuardedBy("lock") + private final Map pendingSamples = new HashMap<>(); /** A list of recent sample IDs to identify late messages vs. invalid ones. */ private final ArrayDeque recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS); - /** Sample ID counter (guarded by lock). */ + /** Sample ID counter. */ + @GuardedBy("lock") private int sampleIdCounter; /** -* Flag indicating whether the coordinator is still running (guarded by -* lock). +* Flag indicating whether the coordinator is still running. */ + @GuardedBy("lock") private boolean isShutDown; /** * Creates a new coordinator for the job. * -* @param executor to use to execute the futures +* @param executor Used to execute the futures. * @param sampleTimeout Time out after the expected sampling duration. * This is added to the expected duration of a * sample, which is determined by the number of * samples and the delay between each sample. */ - public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) { + public BackPressureSampleCoordinator(Executor executor, long sampleTimeout) { checkArgument(sampleTimeout >= 0L); - this.executor = Preconditions.checkNotNull(executor); + this.executor = checkNotNull(executor); this.sampleTimeout = sampleTimeout; } /** -* Triggers a stack trace sample to all tasks. +* Triggers a task back pressure stats sample to all tasks. * * @param tasksToSample Tasks to sample. -* @param numSamples Number of stack trace samples to collect. +* @param numSamples Number of samples per task. * @param delayBetweenSamples Delay between consecutive samples. -* @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates -*no maximum and keeps the complete stack trace. -* @return A future of the completed stack trace sample +* @return A future of the completed task back pressure stats sample */ @SuppressWarnings("unchecked") - public CompletableFuture triggerStackTraceSample( + CompletableFuture triggerTaskBackPressureSample( ExecutionVertex[] tasksToSample, int numSamples, Review comment: `numSamples` and `delayBetweenSamples` should be passed into constructor of this class instead of passing into constructor of `BackPressureStatsTrackerImpl`. Because these arguments are only used inside coordinator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342979874 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +import javax.annotation.Nonnegative; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Samples the output buffer availability of tasks for back pressure tracking. + * + * @see org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker + */ +class TaskBackPressureSampleService { + + private final ScheduledExecutor scheduledExecutor; + + TaskBackPressureSampleService(final ScheduledExecutor scheduledExecutor) { + this.scheduledExecutor = requireNonNull(scheduledExecutor, "The scheduledExecutor must not be null."); + } + + /** +* Returns a future that completes with the back pressure ratio of a task. +* +* @param taskThe task to be sampled. +* @param numSamples The number of samples. +* @param delayBetweenSamples The time to wait between samples. +* @return A future containing the task back pressure ratio. +*/ + public CompletableFuture sampleTaskBackPressure( + final OutputAvailabilitySampleableTask task, + @Nonnegative final int numSamples, Review comment: `@Nonnegative` is duplicate with below `checkArgument(numSamples > 0, "The numSamples must be positive.");` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342981944 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +import javax.annotation.Nonnegative; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Samples the output buffer availability of tasks for back pressure tracking. + * + * @see org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker + */ +class TaskBackPressureSampleService { + + private final ScheduledExecutor scheduledExecutor; + + TaskBackPressureSampleService(final ScheduledExecutor scheduledExecutor) { + this.scheduledExecutor = requireNonNull(scheduledExecutor, "The scheduledExecutor must not be null."); + } + + /** +* Returns a future that completes with the back pressure ratio of a task. +* +* @param taskThe task to be sampled. +* @param numSamples The number of samples. +* @param delayBetweenSamples The time to wait between samples. +* @return A future containing the task back pressure ratio. +*/ + public CompletableFuture sampleTaskBackPressure( + final OutputAvailabilitySampleableTask task, + @Nonnegative final int numSamples, + final Time delayBetweenSamples) { + + checkNotNull(task, "The task must not be null."); + checkArgument(numSamples > 0, "The numSamples must be positive."); + checkNotNull(delayBetweenSamples, "The delayBetweenSamples must not be null."); + + return sampleTaskBackPressure( + task, + numSamples, + delayBetweenSamples, + new ArrayList<>(numSamples), + new CompletableFuture<>()); + } + + private CompletableFuture sampleTaskBackPressure( + final OutputAvailabilitySampleableTask task, + final int numSamples, + final Time delayBetweenSamples, + final List taskOutputAvailability, + final CompletableFuture resultFuture) { + + final Optional isTaskAvailableForOutput = isTaskAvailableForOutput(task); + if (isTaskAvailableForOutput.isPresent()) { + taskOutputAvailability.add(isTaskAvailableForOutput.get()); + } else if (!taskOutputAvailability.isEmpty()) { + resultFuture.complete(calculateTaskBackPressureRatio(taskOutputAvailability)); + return resultFuture; + } else { + throw new IllegalStateException(String.format("Cannot sample task %s. " + + "Because the task is not running.", task.getExecutionId())); + } + + if (numSamples > 1) { + scheduledExecutor.schedule(() -> sampleTaskBackPressure( + task, + numSamples - 1, + delayBetweenSamples, + taskOutputAvailability, + resultFuture), delayBetweenSamples.getSize(), delayBetweenSamples.getUnit()); Review comment: Make `delayBetweenSamples.getSize(), delayBetweenSamples.getUnit()` as separate line --
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342974445 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java ## @@ -322,41 +266,18 @@ private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) } } - // Ratio of blocked samples to total samples per sub task. Array - // position corresponds to sub task index. - double[] backPressureRatio = new double[traces.size()]; - - for (Entry> entry : traces.entrySet()) { - int backPressureSamples = 0; - - List taskTraces = entry.getValue(); - - for (StackTraceElement[] trace : taskTraces) { - for (int i = trace.length - 1; i >= 0; i--) { - StackTraceElement elem = trace[i]; - - if (elem.getClassName().equals(EXPECTED_CLASS_NAME) && - elem.getMethodName().equals(EXPECTED_METHOD_NAME)) { - - backPressureSamples++; - break; // Continue with next stack trace - } - } - } + // Back pressure ratio of all tasks. Array position corresponds + // to sub task index. + double[] backPressureRatio = new double[backPressureRatioByTask.size()]; Review comment: backPressureRatio -> backPressureRatios This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342977589 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +import javax.annotation.Nonnegative; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Samples the output buffer availability of tasks for back pressure tracking. + * + * @see org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker + */ +class TaskBackPressureSampleService { + + private final ScheduledExecutor scheduledExecutor; + + TaskBackPressureSampleService(final ScheduledExecutor scheduledExecutor) { + this.scheduledExecutor = requireNonNull(scheduledExecutor, "The scheduledExecutor must not be null."); Review comment: `requireNonNull` -> `checkNotNull` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342971189 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java ## @@ -295,42 +292,30 @@ int getNumberOfPendingSamples() { // /** -* A pending stack trace sample, which collects stack traces and owns a -* {@link StackTraceSample} promise. +* A pending task back pressure stats, which collects task back pressure +* ratio and owns a {@link BackPressureStats} promise. * * Access pending sample in lock scope. */ - private static class PendingStackTraceSample { + private static class PendingTaskBackPressureStats { Review comment: Class naming consistent between `PendingTaskBackPressureStats` and `BackPressureStats`. I mean whether to retain term `Task`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342972877 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java ## @@ -250,25 +201,18 @@ public void shutDown() { } /** -* Invalidates the cache (irrespective of clean up interval). -*/ - void invalidateOperatorStatsCache() { - operatorStatsCache.invalidateAll(); - } - - /** -* Callback on completed stack trace sample. +* Callback on completed task back pressure sample. */ - class StackTraceSampleCompletionCallback implements BiFunction { + class TaskBackPressureSampleCompletionCallback implements BiFunction { Review comment: Also the class naming `TaskBackPressureSampleCompletionCallback`, whether to retain term `Task` can be consistent with other classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342971770 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java ## @@ -89,28 +88,28 @@ public long getStartTime() { } /** -* Returns the time stamp, when all stack traces were collected at the -* JobManager. +* Returns the time stamp, when all back pressure stats were collected at +* the JobManager. * -* @return Time stamp, when all stack traces were collected at the +* @return Time stamp, when all back pressure stats were collected at the * JobManager */ public long getEndTime() { return endTime; } /** -* Returns the a map of stack traces by execution ID. +* Returns the a map of back pressure ratio by execution ID. * -* @return Map of stack traces by execution ID +* @return Map of back pressure ratio by execution ID */ - public Map> getStackTraces() { - return stackTracesByTask; + public Map getBackPressureRatioByTask() { + return backPressureRatioByTask; } @Override public String toString() { - return "StackTraceSample{" + + return "TaskBackPressureStats{" + Review comment: TaskBackPressureStats -> BackPressureStats This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342969477 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java ## @@ -21,44 +21,43 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import java.util.Collections; -import java.util.List; import java.util.Map; import static org.apache.flink.util.Preconditions.checkArgument; /** - * A sample of stack traces for one or more tasks. + * Task back pressure stats for one or more tasks. Review comment: Remove prefix `Task` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342981060 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +import javax.annotation.Nonnegative; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Samples the output buffer availability of tasks for back pressure tracking. + * + * @see org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker + */ +class TaskBackPressureSampleService { + + private final ScheduledExecutor scheduledExecutor; + + TaskBackPressureSampleService(final ScheduledExecutor scheduledExecutor) { + this.scheduledExecutor = requireNonNull(scheduledExecutor, "The scheduledExecutor must not be null."); + } + + /** +* Returns a future that completes with the back pressure ratio of a task. +* +* @param taskThe task to be sampled. +* @param numSamples The number of samples. +* @param delayBetweenSamples The time to wait between samples. +* @return A future containing the task back pressure ratio. +*/ + public CompletableFuture sampleTaskBackPressure( + final OutputAvailabilitySampleableTask task, + @Nonnegative final int numSamples, + final Time delayBetweenSamples) { + + checkNotNull(task, "The task must not be null."); + checkArgument(numSamples > 0, "The numSamples must be positive."); + checkNotNull(delayBetweenSamples, "The delayBetweenSamples must not be null."); + + return sampleTaskBackPressure( + task, + numSamples, + delayBetweenSamples, + new ArrayList<>(numSamples), + new CompletableFuture<>()); + } + + private CompletableFuture sampleTaskBackPressure( + final OutputAvailabilitySampleableTask task, + final int numSamples, + final Time delayBetweenSamples, + final List taskOutputAvailability, + final CompletableFuture resultFuture) { + + final Optional isTaskAvailableForOutput = isTaskAvailableForOutput(task); + if (isTaskAvailableForOutput.isPresent()) { + taskOutputAvailability.add(isTaskAvailableForOutput.get()); + } else if (!taskOutputAvailability.isEmpty()) { + resultFuture.complete(calculateTaskBackPressureRatio(taskOutputAvailability)); + return resultFuture; + } else { + throw new IllegalStateException(String.format("Cannot sample task %s. " + + "Because the task is not running.", task.getExecutionId())); + } + + if (numSamples > 1) { + scheduledExecutor.schedule(() -> sampleTaskBackPressure( + task, + numSamples - 1, + delayBetweenSamples, + taskOutputAvailability, + resultFuture), delayBetweenSamples.getSize(), delayBetweenSamples.getUnit()); + } else { + resultFuture.complete(calculateTaskBackPressureRatio(taskOutputAvailability)); + } +
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342984639 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -461,16 +461,15 @@ AbstractInvokable getInvokable() { return invokable; } - public StackTraceElement[] getStackTraceOfExecutingThread() { - final AbstractInvokable invokable = this.invokable; - - if (invokable == null) { - return new StackTraceElement[0]; + public boolean isAvailableForOutput() { Review comment: `isAvailableForOutput` -> `isBackPressured` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342974206 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java ## @@ -296,21 +240,21 @@ public Void apply(StackTraceSample stackTraceSample, Throwable throwable) { } /** -* Creates the back pressure stats from a stack trace sample. +* Creates operator back pressure stats from task back pressure stats. * -* @param sample Stack trace sample to base stats on. +* @param stats Task back pressure stats. * -* @return Back pressure stats +* @return Operator back pressure stats */ - private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) { - Map> traces = sample.getStackTraces(); + private OperatorBackPressureStats createOperatorBackPressureStats(BackPressureStats stats) { Review comment: TBH I do not like the class naming `OperatorBackPressureStats`, because when compared with `BackPressureStats` class it is not the task/operator level difference. Actually they are indicating the same level, only transforming the map from `BackPressureStats` to array in `OperatorBackPressureStats`. Since we have not touched the class `OperatorBackPressureStats` in this PR, so we can keep it as now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342969795 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java ## @@ -21,44 +21,43 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import java.util.Collections; -import java.util.List; import java.util.Map; import static org.apache.flink.util.Preconditions.checkArgument; /** - * A sample of stack traces for one or more tasks. + * Task back pressure stats for one or more tasks. * - * The sampling is triggered in {@link StackTraceSampleCoordinator}. + * The stats are calculated from sampling triggered in {@link BackPressureSampleCoordinator}. */ -public class StackTraceSample { +public class BackPressureStats { - /** ID of this sample (unique per job). */ + /** ID of the sample (unique per job). */ private final int sampleId; /** Time stamp, when the sample was triggered. */ private final long startTime; - /** Time stamp, when all stack traces were collected at the JobManager. */ + /** Time stamp, when all back pressure stats were collected at the JobManager. */ private final long endTime; - /** Map of stack traces by execution ID. */ - private final Map> stackTracesByTask; + /** Map of back pressure ratio by execution ID. */ + private final Map backPressureRatioByTask; Review comment: `backPressureRatioByTask` -> `backPressureRatiosByTask` or `backPressureRatios` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342959404 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java ## @@ -56,20 +56,18 @@ public String getAddress() { } @Override - public CompletableFuture requestStackTraceSample( - ExecutionAttemptID executionAttemptID, - int sampleId, - int numSamples, - Time delayBetweenSamples, - int maxStackTraceDepth, - Time timeout) { - - return taskExecutorGateway.requestStackTraceSample( + public CompletableFuture sampleTaskBackPressure( + final ExecutionAttemptID executionAttemptID, Review comment: It is not very necessary to add `final` for method arguments if I remembered correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342971685 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java ## @@ -89,28 +88,28 @@ public long getStartTime() { } /** -* Returns the time stamp, when all stack traces were collected at the -* JobManager. +* Returns the time stamp, when all back pressure stats were collected at +* the JobManager. * -* @return Time stamp, when all stack traces were collected at the +* @return Time stamp, when all back pressure stats were collected at the * JobManager */ public long getEndTime() { return endTime; } /** -* Returns the a map of stack traces by execution ID. +* Returns the a map of back pressure ratio by execution ID. * -* @return Map of stack traces by execution ID +* @return Map of back pressure ratio by execution ID */ - public Map> getStackTraces() { - return stackTracesByTask; + public Map getBackPressureRatioByTask() { Review comment: getBackPressureRatioByTask -> getBackPressureRatios This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r342976963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/OutputAvailabilitySampleableTask.java ## @@ -22,13 +22,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; /** - * Task interface used by {@link StackTraceSampleService} for back pressure tracking. + * Task interface used by {@link TaskBackPressureSampleService} for back pressure tracking. */ -interface StackTraceSampleableTask { +interface OutputAvailabilitySampleableTask { Review comment: Maybe call this class `BackPressureSampleableTask`. Also for the below method `isAvailableForOutput` -> `isBackPressured` We do not need to expose the detail way in the interface how to monitor the back pressure. This interface only describes the task can be sampled for back pressure, and the specific implementation can monitor the back pressure via different ways. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342955630 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -209,18 +173,48 @@ public int getFieldCount() { * @param fieldIndex the index of the field */ public Optional getFieldName(int fieldIndex) { - if (fieldIndex < 0 || fieldIndex >= fieldNames.length) { + if (fieldIndex < 0 || fieldIndex >= columns.size()) { + return Optional.empty(); + } + return Optional.of(this.columns.get(fieldIndex).getName()); + } + + /** +* Returns the {@link TableColumn} instance for the given field index. +* +* @param fieldIndex the index of the field +*/ + public Optional getTableColumn(int fieldIndex) { + if (fieldIndex < 0 || fieldIndex >= columns.size()) { return Optional.empty(); } - return Optional.of(fieldNames[fieldIndex]); + return Optional.of(this.columns.get(fieldIndex)); + } + + /** +* Returns the {@link TableColumn} instance for the given field name. +* +* @param fieldName the name of the field +*/ + public Optional getTableColumn(String fieldName) { + return this.columns.stream() + .filter(column -> column.getName().equals(fieldName)) + .findFirst(); + } + + /** +* Returns all the {@link TableColumn}s for this table schema. +*/ + public TableColumn[] getTableColumns() { Review comment: Why not just return the `List`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342949927 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +/** + * A table column represents a table column's structure with + * column name, column data type and computation expression(if it is a computed column). + */ +public class TableColumn { + + //~ Instance fields + + private final String name; + private final DataType type; + @Nullable + private final String expr; + + //~ Constructors --- + + /** +* Creates a {@link TableColumn} instance. +* +* @param name Column name +* @param type Column data type +* @param expr Column computation expression if it is a computed column +*/ + private TableColumn( + String name, + DataType type, + @Nullable String expr) { + this.name = name; + this.type = type; + this.expr = expr; + } + + //~ Methods + + /** +* Creates a table column from given name and data type. +*/ + public static TableColumn of(String name, DataType type) { + Preconditions.checkNotNull(name, "Column name can not be null!"); + Preconditions.checkArgument( + type != null, + "Column type can not be null!"); + return new TableColumn( + name, + type, + null); + } + + /** +* Creates a table column from given name and computation expression. +* +* @param name Name of the column +* @param expression SQL-style expression +*/ + public static TableColumn of(String name, DataType type, String expression) { + Preconditions.checkNotNull(name, "Column name can not be null!"); + Preconditions.checkNotNull( + type, + "Column type can not be null!"); + Preconditions.checkNotNull( + expression, Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342949897 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +/** + * A table column represents a table column's structure with + * column name, column data type and computation expression(if it is a computed column). + */ +public class TableColumn { + + //~ Instance fields + + private final String name; + private final DataType type; + @Nullable + private final String expr; + + //~ Constructors --- + + /** +* Creates a {@link TableColumn} instance. +* +* @param name Column name +* @param type Column data type +* @param expr Column computation expression if it is a computed column +*/ + private TableColumn( + String name, + DataType type, + @Nullable String expr) { + this.name = name; + this.type = type; + this.expr = expr; + } + + //~ Methods + + /** +* Creates a table column from given name and data type. +*/ + public static TableColumn of(String name, DataType type) { + Preconditions.checkNotNull(name, "Column name can not be null!"); + Preconditions.checkArgument( + type != null, + "Column type can not be null!"); + return new TableColumn( + name, + type, + null); + } + + /** +* Creates a table column from given name and computation expression. +* +* @param name Name of the column +* @param expression SQL-style expression +*/ + public static TableColumn of(String name, DataType type, String expression) { + Preconditions.checkNotNull(name, "Column name can not be null!"); + Preconditions.checkNotNull( + type, Review comment: no need to wrap line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342988946 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -423,17 +525,15 @@ public Builder watermark(String rowtimeAttribute, String watermarkExpressionStri throw new IllegalStateException("Multiple watermark definition is not supported yet."); } this.watermarkSpecs.add(new WatermarkSpec(rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType)); + validateWatermarkSpecs(this.columns, this.watermarkSpecs); Review comment: ditto: validate this in `build()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342989289 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ## @@ -76,6 +77,8 @@ public static final String TABLE_SCHEMA_TYPE = "type"; + public static final String TABLE_SCHEMA_EXPR = "expr"; Review comment: `TABLE_SCHEMA_EXPR` looks obscure to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342953235 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -346,22 +318,137 @@ public static Builder builder() { return new Builder(); } + //~ Tools -- + + /** +* Tools method to transform arrays of table names and types +* into a {@link TableColumn} list. +*/ + private static List getTableColumns( + String[] fieldNames, + TypeInformation[] fieldTypes) { + DataType[] fieldDataTypes = fromLegacyInfoToDataType(fieldTypes); + validateFields(fieldNames, fieldDataTypes); Review comment: IIUC your purpose here is validate before/when constructing `TableSchema`, right? I noticed there is only one place calling this method, which is `public TableSchema(String[] fieldNames, TypeInformation[] fieldTypes)`. If this is true, I would suggest move this validation out of this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342991588 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ## @@ -257,6 +267,43 @@ public void putIndexedFixedProperties(String key, List subKeys, ListFor example: +* +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.type = LONG, schema.fields.1.name = test2 +* schema.fields.2.type = LONG, schema.fields.1.expr = test + 1 +* +* +* The arity of each subKeyValues must match the arity of propertyKeys. +*/ + public void putIndexedOptionalProperties(String key, List subKeys, List> subKeyValues) { Review comment: just extend the original `putIndexedFixedProperties` to support null? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342956231 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -249,8 +243,16 @@ public DataType toRowDataType() { public String toString() { final StringBuilder sb = new StringBuilder(); sb.append("root\n"); - for (int i = 0; i < fieldNames.length; i++) { - sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldDataTypes[i]).append('\n'); + for (int i = 0; i < columns.size(); i++) { + sb.append(" |-- ") + .append(getFieldName(i).get()) + .append(": "); + if (columns.get(i).isGenerated()) { + sb.append(columns.get(i).getExpr().get()); Review comment: computed column also should output field type? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342948250 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +/** + * A table column represents a table column's structure with + * column name, column data type and computation expression(if it is a computed column). + */ Review comment: add `@PublicEvolving` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342952024 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -346,22 +318,137 @@ public static Builder builder() { return new Builder(); } + //~ Tools -- + + /** +* Tools method to transform arrays of table names and types +* into a {@link TableColumn} list. +*/ + private static List getTableColumns( Review comment: no need to be static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342956037 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -209,18 +173,48 @@ public int getFieldCount() { * @param fieldIndex the index of the field */ public Optional getFieldName(int fieldIndex) { - if (fieldIndex < 0 || fieldIndex >= fieldNames.length) { + if (fieldIndex < 0 || fieldIndex >= columns.size()) { + return Optional.empty(); + } + return Optional.of(this.columns.get(fieldIndex).getName()); + } + + /** +* Returns the {@link TableColumn} instance for the given field index. +* +* @param fieldIndex the index of the field +*/ + public Optional getTableColumn(int fieldIndex) { + if (fieldIndex < 0 || fieldIndex >= columns.size()) { return Optional.empty(); } - return Optional.of(fieldNames[fieldIndex]); + return Optional.of(this.columns.get(fieldIndex)); + } + + /** +* Returns the {@link TableColumn} instance for the given field name. +* +* @param fieldName the name of the field +*/ + public Optional getTableColumn(String fieldName) { + return this.columns.stream() + .filter(column -> column.getName().equals(fieldName)) + .findFirst(); + } + + /** +* Returns all the {@link TableColumn}s for this table schema. +*/ + public TableColumn[] getTableColumns() { + return this.columns.toArray(new TableColumn[0]); } /** * Converts a table schema into a (nested) data type describing a {@link DataTypes#ROW(Field...)}. Review comment: Adds some notice here to say this would includes all computed columns in this RowType, and the caller should be careful with it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342952283 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -346,22 +318,137 @@ public static Builder builder() { return new Builder(); } + //~ Tools -- + + /** +* Tools method to transform arrays of table names and types +* into a {@link TableColumn} list. +*/ + private static List getTableColumns( + String[] fieldNames, Review comment: nit: 2 tabs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342956790 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -386,9 +486,11 @@ public Builder field(String name, DataType dataType) { public Builder fields(String[] names, DataType[] dataTypes) { Preconditions.checkNotNull(names); Preconditions.checkNotNull(dataTypes); - - fieldNames.addAll(Arrays.asList(names)); - fieldDataTypes.addAll(Arrays.asList(dataTypes)); + validateFields(names, dataTypes); Review comment: We should move this validation logic inside `build()`. It's will be more safer than current approach. For example, you can't really protect the field names are all unique with current solution. I can add column `a` with `field(String name, DataType dataType, String expression)`, and then add `a, b, c` with this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…
KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc… URL: https://github.com/apache/flink/pull/10096#discussion_r342949758 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +/** + * A table column represents a table column's structure with + * column name, column data type and computation expression(if it is a computed column). + */ +public class TableColumn { + + //~ Instance fields + + private final String name; + private final DataType type; + @Nullable + private final String expr; + + //~ Constructors --- + + /** +* Creates a {@link TableColumn} instance. +* +* @param name Column name +* @param type Column data type +* @param expr Column computation expression if it is a computed column +*/ + private TableColumn( + String name, + DataType type, + @Nullable String expr) { + this.name = name; + this.type = type; + this.expr = expr; + } + + //~ Methods + + /** +* Creates a table column from given name and data type. +*/ + public static TableColumn of(String name, DataType type) { + Preconditions.checkNotNull(name, "Column name can not be null!"); + Preconditions.checkArgument( Review comment: use `checkNotNull`? and no need to wrap line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services