[GitHub] [flink] zjuwangg commented on issue #10092: [FLINK-14580][hive] add HiveModuleFactory, HiveModuleDescriptor, and HiveModuleDescriptorValidator

2019-11-06 Thread GitBox
zjuwangg commented on issue #10092: [FLINK-14580][hive] add HiveModuleFactory, 
HiveModuleDescriptor, and HiveModuleDescriptorValidator
URL: https://github.com/apache/flink/pull/10092#issuecomment-550192726
 
 
   LGTM!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values

2019-11-06 Thread GitBox
zhuzhurk commented on a change in pull request #10079: [FLINK-14594] Fix 
matching logics of ResourceSpec/ResourceProfile/Resource considering double 
values
URL: https://github.com/apache/flink/pull/10079#discussion_r342958883
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ##
 @@ -145,13 +148,15 @@ private ResourceSpec() {
 * @param other Reference to resource to merge in.
 * @return The new resource with merged values.
 */
-   public ResourceSpec merge(ResourceSpec other) {
+   public ResourceSpec merge(final ResourceSpec other) {
+   checkNotNull(other, "Cannot merge with null resources");
+
if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) {
return UNKNOWN;
}
 
ResourceSpec target = new ResourceSpec(
-   this.cpuCores + other.cpuCores,
+   this.cpuCores.merge(other.cpuCores).getValue(),
 
 Review comment:
   It's for backward compatibility. But I think you are right we can change the 
constructor to directly accept ResourceValue. The change is made in 
9627a892cd7076abf01174376dbcdfc9baf41c13.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14627) Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder

2019-11-06 Thread Zili Chen (Jira)
Zili Chen created FLINK-14627:
-

 Summary: Refactor ExecutionGraph creation in tests as 
TestingExecutionGraphBuilder
 Key: FLINK-14627
 URL: https://issues.apache.org/jira/browse/FLINK-14627
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Zili Chen
Assignee: Zili Chen
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14627) Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14627:
---
Labels: pull-request-available  (was: )

> Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
> -
>
> Key: FLINK-14627
> URL: https://issues.apache.org/jira/browse/FLINK-14627
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun opened a new pull request #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder

2019-11-06 Thread GitBox
TisonKun opened a new pull request #10100: [FLINK-14627][tests] Refactor 
ExecutionGraph creation in tests as TestingExecutionGraphBuilder
URL: https://github.com/apache/flink/pull/10100
 
 
   ## What is the purpose of the change
   
   Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder
   
   ## Brief change log
   
   
   ## Verifying this change
   
   This change is a code refactor without functionality changes so that covered 
by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox

2019-11-06 Thread GitBox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid 
task starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#discussion_r342962212
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -211,26 +215,27 @@ private void checkTakeStateConditions() {
 
@Override
public void quiesce() {
+   final ReentrantLock lock = this.lock;
 
 Review comment:
   Ok, let's keep it as a black magic. I've read the explanation why it 
actually may help (`final` fields are useless in JVM, but `final` local 
variables can actually work as they are truly immutable).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder

2019-11-06 Thread GitBox
flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor 
ExecutionGraph creation in tests as TestingExecutionGraphBuilder
URL: https://github.com/apache/flink/pull/10100#issuecomment-550197124
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 2ffe594419c91e32291507339f4fc4844dbb6c57 (Wed Nov 06 
08:16:49 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14602) Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.

2019-11-06 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968156#comment-16968156
 ] 

vinoyang commented on FLINK-14602:
--

[~gjy] Any opinion? If you do not have a comment. I will open a PR to fix it.

> Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap 
> type.
> 
>
> Key: FLINK-14602
> URL: https://issues.apache.org/jira/browse/FLINK-14602
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Priority: Major
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of tasks to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14596) Update MapView and ListView interface to new type system

2019-11-06 Thread hailong wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968158#comment-16968158
 ] 

hailong wang commented on FLINK-14596:
--

[~jark] Thank you all the same.

> Update MapView and ListView interface to new type system
> 
>
> Key: FLINK-14596
> URL: https://issues.apache.org/jira/browse/FLINK-14596
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.10.0
>
>
> MapView and ListView are still using TypeInformation as constructor 
> parameters. We should deprecate it and expose the new type system through 
> constructor. 
> And we can also make the data type member field as private, and use 
> refelction to access it, to make it user invisible. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / 
Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
URL: https://github.com/apache/flink/pull/10074#issuecomment-549322411
 
 
   
   ## CI report:
   
   * 4807c59ae20273d5b4611395ca5467217d7d0ca3 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134842985)
   * fedd3f2abd43a67d6461b895145964fcdf98af0d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134961727)
   * 5c077aaf9166fbbd068f80eb61294d9cfea2eddc : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of 
ResourceSpec/ResourceProfile/Resource considering double values
URL: https://github.com/apache/flink/pull/10079#issuecomment-549401980
 
 
   
   ## CI report:
   
   * 5b7e9e832ed26f476d0149663b2f0dec9f1bc427 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134874614)
   * 6915be3cf16f83c6850eb18115a333a4a1206080 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135161948)
   * 9627a892cd7076abf01174376dbcdfc9baf41c13 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition

2019-11-06 Thread hailong wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hailong wang updated FLINK-14613:
-
Description: 
In Temporal Table Join, We don't support using  UDF in tempral table join key. 
For we can't analyze LookupKeys  when call is an expression. When users use 
like this, the program run normally,  and the result will be wrong. So we 
should add validation to prevent it.

The SQL as following:
{code:java}
INSERT INTO A
SELECT B.amount, B.currency, C.amount, C.product 
FROM B join C FOR SYSTEM_TIME AS OF B.proctime 
on B.amount = cancat(C.amount, 'r') and C.product = '1'
{code}

  was:
In Temporal Table Join, We don't support using  UDF in tempral table join key. 
For we can't analyze LookupKeys  when call is an expression. When users use 
like this, the program run normally,  and the result will be wrong. So we 
should add validation to prevent it.

The SQL as following:
{code:java}
INSERT INTO A
SELECT B.amount, B.currency, C.amount, C.product 
FROM B join C FOR SYSTEM_TIME AS OF B.proctime 
on B.amount = C.amount and C.product = '1'
{code}


> Add validation check when applying UDF to  tempral table key in  Temporal 
> Table Join condition 
> ---
>
> Key: FLINK-14613
> URL: https://issues.apache.org/jira/browse/FLINK-14613
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: hailong wang
>Priority: Major
> Fix For: 1.10.0
>
>
> In Temporal Table Join, We don't support using  UDF in tempral table join 
> key. For we can't analyze LookupKeys  when call is an expression. When users 
> use like this, the program run normally,  and the result will be wrong. So we 
> should add validation to prevent it.
> The SQL as following:
> {code:java}
> INSERT INTO A
> SELECT B.amount, B.currency, C.amount, C.product 
> FROM B join C FOR SYSTEM_TIME AS OF B.proctime 
> on B.amount = cancat(C.amount, 'r') and C.product = '1'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10094: [FLINK-14611][runtime] Move allVerticesInSameSlotSharingGroupByDefault setting from ExecutionConfig to StreamGraph

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10094: [FLINK-14611][runtime] Move 
allVerticesInSameSlotSharingGroupByDefault setting from ExecutionConfig to 
StreamGraph
URL: https://github.com/apache/flink/pull/10094#issuecomment-550114409
 
 
   
   ## CI report:
   
   * 8d20db4cdea3e4e2a69590362675e7bf7ec1a913 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135157539)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the potential buffer consumers if the size of LocalBufferPool has been expanded.

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the 
potential buffer consumers if the size of LocalBufferPool has been expanded.
URL: https://github.com/apache/flink/pull/10097#issuecomment-550166959
 
 
   
   ## CI report:
   
   * f72b6f5096ae65beb1647edc0e1db84ae9d320a9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135180539)
   * d4ee0f8a5f87b5837ed2bcdaa70f5d64f6e675b4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135191058)
   * c973b6a83812e8bd548d0fcd2247b87efa1edf83 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10099: [FLINK-14589] [Runtime/Coordination] Redundant slot requests with the same AllocationID lead…

2019-11-06 Thread GitBox
flinkbot commented on issue #10099: [FLINK-14589] [Runtime/Coordination] 
Redundant slot requests with the same AllocationID lead…
URL: https://github.com/apache/flink/pull/10099#issuecomment-550199907
 
 
   
   ## CI report:
   
   * 6bde76855e2ac3583581b28115db01901a87bc65 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition

2019-11-06 Thread hailong wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968164#comment-16968164
 ] 

hailong wang commented on FLINK-14613:
--

Hi [~jark], I sorry for giving a wrong example. I have updated. In short, when 
using UDF in tempral table join key, the result is error. For the join 
condition is still 

B.amount = C.amount, but not B.amount = cancat(C.amount, 'r') .So when used 
like this, we should throw error when validation?

> Add validation check when applying UDF to  tempral table key in  Temporal 
> Table Join condition 
> ---
>
> Key: FLINK-14613
> URL: https://issues.apache.org/jira/browse/FLINK-14613
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: hailong wang
>Priority: Major
> Fix For: 1.10.0
>
>
> In Temporal Table Join, We don't support using  UDF in tempral table join 
> key. For we can't analyze LookupKeys  when call is an expression. When users 
> use like this, the program run normally,  and the result will be wrong. So we 
> should add validation to prevent it.
> The SQL as following:
> {code:java}
> INSERT INTO A
> SELECT B.amount, B.currency, C.amount, C.product 
> FROM B join C FOR SYSTEM_TIME AS OF B.proctime 
> on B.amount = cancat(C.amount, 'r') and C.product = '1'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options 
for flink on yarn to facilitate debugging
URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942
 
 
   
   ## CI report:
   
   * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128133485)
   * 977ccb5d91869e37027069d8b2b490bf850253ed : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/129659424)
   * 8347093d4cb32ed752bc01f5cd98abb2d803df94 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130842273)
   * 796de65585c861a67c46ba8c578e08302ade2cdc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133371242)
   * 5817aa535fb834889eebb96478b7a40f936fb3c3 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134200223)
   * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134418855)
   * 32120e401687204ef737ebe01875e293be71d720 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134550254)
   * bb787cffdb56629b880c50edbf368fa81f11db58 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134797060)
   * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135018920)
   * 560c6f43ae1b26dd81b360d5f32207c459824def : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135182722)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-14602) Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.

2019-11-06 Thread Gary Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-14602:


Assignee: vinoyang

> Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap 
> type.
> 
>
> Key: FLINK-14602
> URL: https://issues.apache.org/jira/browse/FLINK-14602
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of tasks to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14602) Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap type.

2019-11-06 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968166#comment-16968166
 ] 

Gary Yao commented on FLINK-14602:
--

I assigned the ticket to you, [~yanghua]


> Degenerate the current ConcurrentHashMap type of tasks to a normal HashMap 
> type.
> 
>
> Key: FLINK-14602
> URL: https://issues.apache.org/jira/browse/FLINK-14602
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of tasks to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-06 Thread GitBox
GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a 
meter ‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#discussion_r342643086
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -193,6 +197,11 @@ public SchedulerBase(
this.failoverTopology = executionGraph.getFailoverTopology();
 
this.inputsLocationsRetriever = new 
ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+   // Use the counter from execution graph to avoid modifying 
execution graph interfaces
+   // Can be a new SimpleCounter created here after the legacy 
scheduler is removed.
+   this.numberOfRestartsCounter = 
executionGraph.getNumberOfRestartsCounter();
+   jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new 
MeterView(numberOfRestartsCounter));
 
 Review comment:
   Now that I think about it, I find _restarts per second_ to be an awkward 
unit because:
   1. It will be normally very small (by default < 1/60)
   1. It is hard to come up with reasonable alerting thresholds other than _"> 
0"_. For example, alerting on _number of restarts > 10 in the past hour_ is 
impossible.
   
   If a user had a time series database such as InfluxDB in place, the total 
number of restarts would suffice because the database can calculate the 
difference. I know that the requirement to introduce a meter comes from the 
user mailing list. I don't see a good solution at the moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14243) flink hiveudf needs some check when it is using cache

2019-11-06 Thread jackylau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968168#comment-16968168
 ] 

jackylau commented on FLINK-14243:
--

[~lzljs3620320] [~jark] yep, i read this HIVE-16196 and related issue . And it 
is hive bug, and my hive is hive-1.1.0-cdh5.7.0-src which is affected from the 
issue. thanks!

> flink hiveudf needs some check when it is using cache
> -
>
> Key: FLINK-14243
> URL: https://issues.apache.org/jira/browse/FLINK-14243
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Fix For: 1.10.0
>
> Attachments: Snipaste_2019-10-30_15-34-09.png
>
>
> Flink1.9 brings in hive connector, but it will have some problem when the 
> original hive udf using cache. We konw that hive is  processed level parallel 
> based on jvm, while flink/spark is task level parallel. If flink just calls 
> the hive udf, it wll exists thread-safe problem when using cache.
> So it may need check the hive udf code and if it is not thread-safe, and set 
> the flink parallize=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-06 Thread GitBox
GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a 
meter ‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#discussion_r342643086
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -193,6 +197,11 @@ public SchedulerBase(
this.failoverTopology = executionGraph.getFailoverTopology();
 
this.inputsLocationsRetriever = new 
ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+   // Use the counter from execution graph to avoid modifying 
execution graph interfaces
+   // Can be a new SimpleCounter created here after the legacy 
scheduler is removed.
+   this.numberOfRestartsCounter = 
executionGraph.getNumberOfRestartsCounter();
+   jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new 
MeterView(numberOfRestartsCounter));
 
 Review comment:
   Now that I think about it, I find _restarts per second_ to be an awkward 
unit because:
   1. It will be normally very small (by default < 1/60)
   1. It is hard to come up with reasonable alerting thresholds other than _"> 
0"_. For example, alerting on _number of restarts > 10 in the past hour_ is 
impossible.
   
   If a user had a time series database such as InfluxDB in place, the total 
number of restarts would suffice because the database can calculate the 
difference. I know that the requirement to introduce a meter [comes from the 
user mailing 
list](http://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/%3cCAOmjRb2ti9MXOD2jFy0XzWViwoNM6tvU4DB5hSnG_=zbvec...@mail.gmail.com%3e).
 I don't see a good solution at the moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14243) flink hiveudf needs some check when it is using cache

2019-11-06 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968170#comment-16968170
 ] 

Jark Wu commented on FLINK-14243:
-

Thanks for verifying this. I will close this issue then. 

> flink hiveudf needs some check when it is using cache
> -
>
> Key: FLINK-14243
> URL: https://issues.apache.org/jira/browse/FLINK-14243
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Fix For: 1.10.0
>
> Attachments: Snipaste_2019-10-30_15-34-09.png
>
>
> Flink1.9 brings in hive connector, but it will have some problem when the 
> original hive udf using cache. We konw that hive is  processed level parallel 
> based on jvm, while flink/spark is task level parallel. If flink just calls 
> the hive udf, it wll exists thread-safe problem when using cache.
> So it may need check the hive udf code and if it is not thread-safe, and set 
> the flink parallize=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14243) flink hiveudf needs some check when it is using cache

2019-11-06 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-14243.
---
Fix Version/s: (was: 1.10.0)
   Resolution: Not A Bug

This is a Hive bug.

> flink hiveudf needs some check when it is using cache
> -
>
> Key: FLINK-14243
> URL: https://issues.apache.org/jira/browse/FLINK-14243
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Attachments: Snipaste_2019-10-30_15-34-09.png
>
>
> Flink1.9 brings in hive connector, but it will have some problem when the 
> original hive udf using cache. We konw that hive is  processed level parallel 
> based on jvm, while flink/spark is task level parallel. If flink just calls 
> the hive udf, it wll exists thread-safe problem when using cache.
> So it may need check the hive udf code and if it is not thread-safe, and set 
> the flink parallize=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-14243) flink hiveudf needs some check when it is using cache

2019-11-06 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-14243:

Comment: was deleted

(was: This is a Hive bug.)

> flink hiveudf needs some check when it is using cache
> -
>
> Key: FLINK-14243
> URL: https://issues.apache.org/jira/browse/FLINK-14243
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Attachments: Snipaste_2019-10-30_15-34-09.png
>
>
> Flink1.9 brings in hive connector, but it will have some problem when the 
> original hive udf using cache. We konw that hive is  processed level parallel 
> based on jvm, while flink/spark is task level parallel. If flink just calls 
> the hive udf, it wll exists thread-safe problem when using cache.
> So it may need check the hive udf code and if it is not thread-safe, and set 
> the flink parallize=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation with mailbox

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation 
with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133733201)
   * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133738302)
   * 462d65ad2a87e998174481c51b7d38abced291c0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133968232)
   * 8ee3b9a975676432e034362a602e898531806df7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135039943)
   * c8b5ad6d1e2b0f9aea258199b780d9449376f648 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135092421)
   * 011484f925fa1bbd4ea1c1c33f1689913acf032c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135110538)
   * 28cf3bdd960cf1b5fe7595504b10f3cb7cdf558c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135126818)
   * c7c84c8da069c79f9fcc841bbd50ad6c7193b0f0 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14602) Change Type of Field tasks from ConcurrentHashMap to HashMap

2019-11-06 Thread vinoyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-14602:
-
Summary: Change Type of Field tasks from ConcurrentHashMap to HashMap  
(was: Degenerate the current ConcurrentHashMap type of tasks to a normal 
HashMap type.)

> Change Type of Field tasks from ConcurrentHashMap to HashMap
> 
>
> Key: FLINK-14602
> URL: https://issues.apache.org/jira/browse/FLINK-14602
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of tasks to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] yanghua opened a new pull request #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap

2019-11-06 Thread GitBox
yanghua opened a new pull request #10101: [FLINK-14602] Change Type of Field 
tasks from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10101
 
 
   ## What is the purpose of the change
   
   *This pull request changes Type of Field tasks from ConcurrentHashMap to 
HashMap*
   
   ## Brief change log
   
 - *Change Type of Field tasks from ConcurrentHashMap to HashMap*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14602) Change Type of Field tasks from ConcurrentHashMap to HashMap

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14602:
---
Labels: pull-request-available  (was: )

> Change Type of Field tasks from ConcurrentHashMap to HashMap
> 
>
> Key: FLINK-14602
> URL: https://issues.apache.org/jira/browse/FLINK-14602
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of tasks to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of ResourceSpec/ResourceProfile/Resource considering double values

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10079: [FLINK-14594] Fix matching logics of 
ResourceSpec/ResourceProfile/Resource considering double values
URL: https://github.com/apache/flink/pull/10079#issuecomment-549401980
 
 
   
   ## CI report:
   
   * 5b7e9e832ed26f476d0149663b2f0dec9f1bc427 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134874614)
   * 6915be3cf16f83c6850eb18115a333a4a1206080 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135161948)
   * 9627a892cd7076abf01174376dbcdfc9baf41c13 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135207817)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / 
Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
URL: https://github.com/apache/flink/pull/10074#issuecomment-549322411
 
 
   
   ## CI report:
   
   * 4807c59ae20273d5b4611395ca5467217d7d0ca3 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134842985)
   * fedd3f2abd43a67d6461b895145964fcdf98af0d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134961727)
   * 5c077aaf9166fbbd068f80eb61294d9cfea2eddc : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135207803)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-06 Thread GitBox
GJL commented on a change in pull request #10082: [FLINK-14164][runtime] Add a 
meter ‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#discussion_r342974956
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -193,6 +197,11 @@ public SchedulerBase(
this.failoverTopology = executionGraph.getFailoverTopology();
 
this.inputsLocationsRetriever = new 
ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+   // Use the counter from execution graph to avoid modifying 
execution graph interfaces
+   // Can be a new SimpleCounter created here after the legacy 
scheduler is removed.
+   this.numberOfRestartsCounter = 
executionGraph.getNumberOfRestartsCounter();
+   jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new 
MeterView(numberOfRestartsCounter));
 
 Review comment:
   cc: @stevenzwu


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap

2019-11-06 Thread GitBox
flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks 
from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10101#issuecomment-550208670
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit d5de7837dc84014509fe5603bb9f5d379c597309 (Wed Nov 06 
08:50:09 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-06 Thread GitBox
zhuzhurk commented on a change in pull request #10082: [FLINK-14164][runtime] 
Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#discussion_r342911786
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -193,6 +197,11 @@ public SchedulerBase(
this.failoverTopology = executionGraph.getFailoverTopology();
 
this.inputsLocationsRetriever = new 
ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+   // Use the counter from execution graph to avoid modifying 
execution graph interfaces
+   // Can be a new SimpleCounter created here after the legacy 
scheduler is removed.
+   this.numberOfRestartsCounter = 
executionGraph.getNumberOfRestartsCounter();
+   jobManagerJobMetricGroup.meter(NUMBER_OF_RESTARTS, new 
MeterView(numberOfRestartsCounter));
 
 Review comment:
   Yes the rate is awkward if the event happens in a very low frequency.
   I think a counter `numberOfRestarts` is needed to enable users to build 
alerts in a more flexible way.
   And the question is: Whether to introduce a meter 
`numberOfRestartsPerSecond`?
   - Pros: The meter enables users to build alerts for restarts even if their 
monitoring system does not support variations of values. 
   - Cons: The integral of rate value is not accurate so that users cannot use 
it to build reliable alerts other than ">0". This is limited by the time 
interval used to sample metrics in Flink, as well as in the external metric 
collecting system.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14612) Change Type of Field intermediateResults from ConcurrentHashMap to HashMap

2019-11-06 Thread vinoyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-14612:
-
Summary: Change Type of Field intermediateResults from ConcurrentHashMap to 
HashMap  (was: Degenerate the current ConcurrentHashMap type of 
intermediateResults to a normal HashMap type.)

> Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
> --
>
> Key: FLINK-14612
> URL: https://issues.apache.org/jira/browse/FLINK-14612
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Priority: Major
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of intermediateResults to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the potential buffer consumers if the size of LocalBufferPool has been expanded.

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10097: [FLINK-14603][runtime]Notify the 
potential buffer consumers if the size of LocalBufferPool has been expanded.
URL: https://github.com/apache/flink/pull/10097#issuecomment-550166959
 
 
   
   ## CI report:
   
   * f72b6f5096ae65beb1647edc0e1db84ae9d320a9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135180539)
   * d4ee0f8a5f87b5837ed2bcdaa70f5d64f6e675b4 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/135191058)
   * c973b6a83812e8bd548d0fcd2247b87efa1edf83 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135207848)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox

2019-11-06 Thread GitBox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid 
task starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#discussion_r342974709
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutor.java
 ##
 @@ -30,7 +27,44 @@
 import java.util.concurrent.RejectedExecutionException;
 
 /**
- * Interface for an {@link Executor} build around a {@link Mailbox}-based 
execution model.
+ * Interface for an {@link Executor} build around a mailbox-based execution 
model (see {@link TaskMailbox}). {@code
+ * MailboxExecutor} can also execute downstream messages of a mailbox by 
yielding control from the task thread.
+ *
+ * All submission functions can be called from any thread and will enqueue 
the action for further processing in a
+ * FIFO fashion.
+ *
+ * The yielding functions avoid the following situation: One operator 
cannot fully process an input record and
+ * blocks the task thread until some resources are available. However, since 
the introduction of the mailbox model
+ * blocking the task thread will not only block new inputs but also all events 
from being processed. If the resources
+ * depend on downstream operators being able to process such events (e.g., 
timers), then we may easily arrive at some
+ * livelocks.
+ *
+ * The yielding functions will only process events from the operator itself 
and any downstream operator. Events of upstream
+ * operators are only processed when the input has been fully processed or if 
they yield themselves. This method avoid
+ * congestion and potential deadlocks, but will process {@link Mail}s slightly 
out-of-order, effectively creating a view
+ * on the mailbox that contains no message from upstream operators.
+ *
+ * All yielding functions must be called in the mailbox thread (see 
{@link TaskMailbox#isMailboxThread()}) to not
+ * violate the single-threaded execution model. There are two typical cases, 
both waiting until the resource is
+ * available. The main difference is if the resource becomes available through 
a mailbox message itself or not.
+ *
+ * If the resource becomes available through a mailbox mail, we can 
effectively block the task thread.
+ * Implicitly, this requires the mail to be enqueued by a different thread.
+ * {@code
+ * while (resource not available) {
+ * mailboxExecutor.yield();
+ * }
+ * }
+ *
+ * If the resource becomes available through an external mechanism or the 
corresponding mail needs to be enqueued
+ * in the task thread, we cannot block.
+ * {@code
+ * while (resource not available) {
+ * if (!mailboxExecutor.tryYield()) {
+ * do stuff or sleep for a small amount of time
+ * }
+ * }
+ * }
  */
 
 Review comment:
   aren't we missing `@PublicEvolving` annotation here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder

2019-11-06 Thread GitBox
flinkbot commented on issue #10100: [FLINK-14627][tests] Refactor 
ExecutionGraph creation in tests as TestingExecutionGraphBuilder
URL: https://github.com/apache/flink/pull/10100#issuecomment-550210232
 
 
   
   ## CI report:
   
   * 2ffe594419c91e32291507339f4fc4844dbb6c57 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox

2019-11-06 Thread GitBox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid 
task starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#discussion_r342963223
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -185,31 +171,19 @@ private Mail takeHeadInternal(int priority) throws 
IllegalStateException {
}
}
 
-   private boolean isEmpty() {
-   return count == 0;
-   }
-
-   private boolean isPutAbleState() {
-   return state == State.OPEN;
-   }
-
-   private boolean isTakeAbleState() {
-   return state != State.CLOSED;
-   }
-
private void checkPutStateConditions() {
final State state = this.state;
-   if (!isPutAbleState()) {
+   if (this.state != OPEN) {
 
 Review comment:
   I mean instead `if (condition) throw new IllegalStateException(msg)` you can 
just replace it with `checkState(condition, msg)?
   
   But ok, `checkPutStateConditions` could be on the hot path, so it might be a 
good idea to hide creating the `msg` inside the `if` branch, so the message is 
not created every time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid task starvation with mailbox

2019-11-06 Thread GitBox
pnowojski commented on a change in pull request #10009: [FLINK-14304] Avoid 
task starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#discussion_r342970858
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -77,22 +112,30 @@ public boolean hasMail() {
 
@Override
public Optional tryTake(int priority) throws 
IllegalStateException {
+   Optional head = tryTakeFromBatch();
 
 Review comment:
   Ok, but in that case:
   
   - please add a comment about this contract somewhere in this class
   - and either please add an ITCase coverage for the case when two operators 
are ping ponging themselves with adding mails and yielding, which will test 
that `tryTake` & `take` do not expand the batch.
   - or add additional comment inside the existing unit test, why is it testing 
for that contract


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10099: [FLINK-14589] [Runtime/Coordination] Redundant slot requests with the same AllocationID lead…

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10099: [FLINK-14589] [Runtime/Coordination] 
Redundant slot requests with the same AllocationID lead…
URL: https://github.com/apache/flink/pull/10099#issuecomment-550199907
 
 
   
   ## CI report:
   
   * 6bde76855e2ac3583581b28115db01901a87bc65 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135207867)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14628) Wordcount on Docker test (custom fs plugin) fails on Travis

2019-11-06 Thread Gary Yao (Jira)
Gary Yao created FLINK-14628:


 Summary: Wordcount on Docker test (custom fs plugin) fails on 
Travis
 Key: FLINK-14628
 URL: https://issues.apache.org/jira/browse/FLINK-14628
 Project: Flink
  Issue Type: Bug
  Components: FileSystems, Tests
Affects Versions: 1.10.0
Reporter: Gary Yao


https://api.travis-ci.org/v3/job/607616429/log.txt

{noformat}
Successfully tagged test_docker_embedded_job:latest
~/build/apache/flink
sort: cannot read: 
'/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-53405131685/out/docker_wc_out*':
 No such file or directory
FAIL WordCount: Output hash mismatch.  Got d41d8cd98f00b204e9800998ecf8427e, 
expected 72a690412be8928ba239c2da967328a5.
head hexdump of actual:
head: cannot open 
'/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-53405131685/out/docker_wc_out*'
 for reading: No such file or directory
[FAIL] Test script contains errors.
Checking for errors...
No errors in log files.
Checking for exceptions...
No exceptions in log files.
Checking for non-empty .out files...
grep: 
/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/log/*.out:
 No such file or directory
No non-empty .out files.

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] yanghua opened a new pull request #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap

2019-11-06 Thread GitBox
yanghua opened a new pull request #10102: [FLINK-14612] Change Type of Field 
intermediateResults from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10102
 
 
   
   
   ## What is the purpose of the change
   
   *This pull request changes Type of Field intermediateResults from 
ConcurrentHashMap to HashMap*
   
   
   ## Brief change log
   
 - *Change Type of Field intermediateResults from ConcurrentHashMap to 
HashMap*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14612) Change Type of Field intermediateResults from ConcurrentHashMap to HashMap

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14612:
---
Labels: pull-request-available  (was: )

> Change Type of Field intermediateResults from ConcurrentHashMap to HashMap
> --
>
> Key: FLINK-14612
> URL: https://issues.apache.org/jira/browse/FLINK-14612
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
> longer be plagued by concurrency issues. So, we can degenerate the current 
> ConcurrentHashMap type of intermediateResults to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14555) Streaming File Sink s3 end-to-end test stalls

2019-11-06 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968189#comment-16968189
 ] 

Gary Yao commented on FLINK-14555:
--

Another instance https://api.travis-ci.org/v3/job/607616431/log.txt

> Streaming File Sink s3 end-to-end test stalls
> -
>
> Key: FLINK-14555
> URL: https://issues.apache.org/jira/browse/FLINK-14555
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: test-stability
>
> [https://api.travis-ci.org/v3/job/603882577/log.txt]
> {noformat}
> ==
> Running 'Streaming File Sink s3 end-to-end test'
> ==
> TEST_DATA_DIR: 
> /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-36388677539
> Flink dist directory: 
> /home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT
> Found AWS bucket [secure], running the e2e test.
> Found AWS access key, running the e2e test.
> Found AWS secret key, running the e2e test.
> Executing test with dynamic openSSL linkage (random selection between 
> 'dynamic' and 'static')
> Setting up SSL with: internal OPENSSL dynamic
> Using SAN 
> dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1
> Certificate was added to keystore
> Certificate was added to keystore
> Certificate reply was installed in keystore
> MAC verified OK
> Setting up SSL with: rest OPENSSL dynamic
> Using SAN 
> dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1
> Certificate was added to keystore
> Certificate was added to keystore
> Certificate reply was installed in keystore
> MAC verified OK
> Mutual ssl auth: true
> Use s3 output
> Starting cluster.
> Starting standalonesession daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Dispatcher REST endpoint is up.
> [INFO] 1 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> [INFO] 2 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> [INFO] 3 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Submitting job.
> Job (c3a9bb7d3f47d63ebccbec5acb1342cb) is running.
> Waiting for job (c3a9bb7d3f47d63ebccbec5acb1342cb) to have at least 3 
> completed checkpoints ...
> Killing TM
> TaskManager 9227 killed.
> Starting TM
> [INFO] 3 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Waiting for restart to happen
> Killing 2 TMs
> TaskManager 8618 killed.
> TaskManager 9658 killed.
> Starting 2 TMs
> [INFO] 2 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> [INFO] 3 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Waiting for restart to happen
> Waiting until all values have been produced
> Number of produced values 18080/6
> No output has been received in the last 10m0s, this potentially indicates a 
> stalled build or something wrong with the build itself.
> Check the details on how to adjust your build configuration on: 
> https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-rec

[jira] [Closed] (FLINK-14555) Streaming File Sink s3 end-to-end test stalls

2019-11-06 Thread Gary Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-14555.

Resolution: Duplicate

> Streaming File Sink s3 end-to-end test stalls
> -
>
> Key: FLINK-14555
> URL: https://issues.apache.org/jira/browse/FLINK-14555
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: test-stability
>
> [https://api.travis-ci.org/v3/job/603882577/log.txt]
> {noformat}
> ==
> Running 'Streaming File Sink s3 end-to-end test'
> ==
> TEST_DATA_DIR: 
> /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-36388677539
> Flink dist directory: 
> /home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT
> Found AWS bucket [secure], running the e2e test.
> Found AWS access key, running the e2e test.
> Found AWS secret key, running the e2e test.
> Executing test with dynamic openSSL linkage (random selection between 
> 'dynamic' and 'static')
> Setting up SSL with: internal OPENSSL dynamic
> Using SAN 
> dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1
> Certificate was added to keystore
> Certificate was added to keystore
> Certificate reply was installed in keystore
> MAC verified OK
> Setting up SSL with: rest OPENSSL dynamic
> Using SAN 
> dns:travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866,ip:10.20.1.215,ip:172.17.0.1
> Certificate was added to keystore
> Certificate was added to keystore
> Certificate reply was installed in keystore
> MAC verified OK
> Mutual ssl auth: true
> Use s3 output
> Starting cluster.
> Starting standalonesession daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Dispatcher REST endpoint is up.
> [INFO] 1 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> [INFO] 2 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> [INFO] 3 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Submitting job.
> Job (c3a9bb7d3f47d63ebccbec5acb1342cb) is running.
> Waiting for job (c3a9bb7d3f47d63ebccbec5acb1342cb) to have at least 3 
> completed checkpoints ...
> Killing TM
> TaskManager 9227 killed.
> Starting TM
> [INFO] 3 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Waiting for restart to happen
> Killing 2 TMs
> TaskManager 8618 killed.
> TaskManager 9658 killed.
> Starting 2 TMs
> [INFO] 2 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> [INFO] 3 instance(s) of taskexecutor are already running on 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Starting taskexecutor daemon on host 
> travis-job-4624d10f-b0e3-45e1-8615-972ff1d44866.
> Waiting for restart to happen
> Waiting until all values have been produced
> Number of produced values 18080/6
> No output has been received in the last 10m0s, this potentially indicates a 
> stalled build or something wrong with the build itself.
> Check the details on how to adjust your build configuration on: 
> https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-received
> The build has been terminated
> {noformat}
>  



--
This message was sent by Atlassian Jira
(

[jira] [Commented] (FLINK-14311) Streaming File Sink end-to-end test failed on Travis

2019-11-06 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968191#comment-16968191
 ] 

Gary Yao commented on FLINK-14311:
--

Another instance https://api.travis-ci.org/v3/job/607616431/log.txt

> Streaming File Sink end-to-end test failed on Travis
> 
>
> Key: FLINK-14311
> URL: https://issues.apache.org/jira/browse/FLINK-14311
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{Streaming File Sink end-to-end test}} fails on Travis because it does 
> not produce output for 10 minutes.
> https://api.travis-ci.org/v3/job/591992274/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14311) Streaming File Sink end-to-end test failed on Travis

2019-11-06 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968192#comment-16968192
 ] 

Gary Yao commented on FLINK-14311:
--

Another instance https://api.travis-ci.org/v3/job/603882577/log.txt

> Streaming File Sink end-to-end test failed on Travis
> 
>
> Key: FLINK-14311
> URL: https://issues.apache.org/jira/browse/FLINK-14311
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{Streaming File Sink end-to-end test}} fails on Travis because it does 
> not produce output for 10 minutes.
> https://api.travis-ci.org/v3/job/591992274/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14505) SQL Client end-to-end test for Kafka 0.10 nightly end-to-end test failed on travis

2019-11-06 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968194#comment-16968194
 ] 

Gary Yao commented on FLINK-14505:
--

Another instance: https://api.travis-ci.org/v3/job/607616437/log.txt

> SQL Client end-to-end test for Kafka 0.10 nightly end-to-end test failed on 
> travis
> --
>
> Key: FLINK-14505
> URL: https://issues.apache.org/jira/browse/FLINK-14505
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Client, Tests
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{SQL Client end-to-end test for Kafka 0.10}} nightly end-to-end test 
> failed on Travis with
> {code}
> [FAIL] 'SQL Client end-to-end test for Kafka 0.10' failed after 0 minutes and 
> 37 seconds! Test exited with exit code 1
> No taskexecutor daemon (pid: 26336) is running anymore on 
> travis-job-2c704099-0645-4182-942d-3fb5c2e10e54.
> No standalonesession daemon to stop on host 
> travis-job-2c704099-0645-4182-942d-3fb5c2e10e54.
> {code}
> https://api.travis-ci.org/v3/job/600710614/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap

2019-11-06 Thread GitBox
flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field 
intermediateResults from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10102#issuecomment-550212774
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit ab01afead96b82972e207dbdc5fa3f37e7515292 (Wed Nov 06 
09:00:46 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-14612).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo opened a new pull request #10103: [FLINK-14506][python][build] Improve the release script for Python API release package

2019-11-06 Thread GitBox
HuangXingBo opened a new pull request #10103: [FLINK-14506][python][build] 
Improve the release script for Python API release package
URL: https://github.com/apache/flink/pull/10103
 
 
   ## What is the purpose of the change
   
   *This pr improve the release script for Python API release package*
   
   ## Brief change log
   
 - *Support specifying install components in lint-python.sh*
 - *Support choose miniconda default python env to build python package in 
create_binary_release.sh*
   
   ## Verifying this change
   
   *This change is a release script changes without any test coverage.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14506) Improve the release script for Python API release package

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14506:
---
Labels: pull-request-available  (was: )

> Improve the release script for Python API release package
> -
>
> Key: FLINK-14506
> URL: https://issues.apache.org/jira/browse/FLINK-14506
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Reporter: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> Currently it assumes that the required Python environment(i.e. Python 3.5+, 
> setuptools, etc) are available in the local machine when executing 
> create_binary_release.sh to perform the release. 
> This could be improved and there are two options in my mind:
> 1) Reuse the script defined in flink-python module(lint-python.sh) to create 
> the required virtual environment and build the Python package using the 
> created virtual environment.
> 2) Document the dependencies at the page "[Create a Flink 
> Release|https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1]";
>  and add validation check in create_binary_release.sh to throw an meaningful 
> error with hints how to fix it.
> Personally I prefer to option 1) as it's transparent for users. Welcome any 
> feedback. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the release script for Python API release package

2019-11-06 Thread GitBox
flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the 
release script for Python API release package
URL: https://github.com/apache/flink/pull/10103#issuecomment-550214576
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 7c842be4f1d79ff3206322d72f4cb1dcebd52c7e (Wed Nov 06 
09:05:16 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-14506).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14375) Avoid to notify ineffective state updates to scheduler

2019-11-06 Thread Gary Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-14375.

Resolution: Fixed

1.10: 213ddb8f4b6e2c6ede726b4abfdf29f091ce2713

> Avoid to notify ineffective state updates to scheduler
> --
>
> Key: FLINK-14375
> URL: https://issues.apache.org/jira/browse/FLINK-14375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The DefaultScheduler triggers failover if a task is notified to be FAILED. 
> However, in the case the multiple tasks in the same region fail together, it 
> will trigger multiple failovers. The later triggered failovers are useless, 
> lead to concurrent failovers and will increase the restart attempts count.
> I think the deep reason for this issue is that some fake state changes are 
> notified to the DefaultScheduler.
> The case above is a FAILED state change from TM will turn a CANCELING vertex 
> to CANCELED, and the actual state transition is to CANCELED. But a FAILED 
> state is notified to DefaultScheduler.
> And there can be another possible issue caused by it, that a FINISHED state 
> change is notified from TM when a vertex is CANCELING. The vertex will become 
> CANCELED, while its FINISHED state change will be notified to 
> DefaultScheduler which may trigger downstream task scheduling.
> I'd propose to fix it by filtering out ineffective state updates in 
> {{SchedulerBase#updateTaskExecutionState}} and only notify effective ones to 
> scheduler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation with mailbox

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] Avoid task starvation 
with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133733201)
   * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133738302)
   * 462d65ad2a87e998174481c51b7d38abced291c0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133968232)
   * 8ee3b9a975676432e034362a602e898531806df7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135039943)
   * c8b5ad6d1e2b0f9aea258199b780d9449376f648 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135092421)
   * 011484f925fa1bbd4ea1c1c33f1689913acf032c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135110538)
   * 28cf3bdd960cf1b5fe7595504b10f3cb7cdf558c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135126818)
   * c7c84c8da069c79f9fcc841bbd50ad6c7193b0f0 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135211148)
   * efc3d3849f19ea846ec5d5df816e1c9ee7915619 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition

2019-11-06 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968204#comment-16968204
 ] 

Jark Wu commented on FLINK-14613:
-

For temporary solution, adding an exception is fine. However, even there is an 
UDF in the join key, if we still have other equi-conditions on field reference 
(above example), it should still work because we have lookup key.

I think if the proper fixing is not a big job, we should fix the root problem. 

> Add validation check when applying UDF to  tempral table key in  Temporal 
> Table Join condition 
> ---
>
> Key: FLINK-14613
> URL: https://issues.apache.org/jira/browse/FLINK-14613
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: hailong wang
>Priority: Major
> Fix For: 1.10.0
>
>
> In Temporal Table Join, We don't support using  UDF in tempral table join 
> key. For we can't analyze LookupKeys  when call is an expression. When users 
> use like this, the program run normally,  and the result will be wrong. So we 
> should add validation to prevent it.
> The SQL as following:
> {code:java}
> INSERT INTO A
> SELECT B.amount, B.currency, C.amount, C.product 
> FROM B join C FOR SYSTEM_TIME AS OF B.proctime 
> on B.amount = cancat(C.amount, 'r') and C.product = '1'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14243) flink hiveudf needs some check when it is using cache

2019-11-06 Thread jackylau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968209#comment-16968209
 ] 

jackylau commented on FLINK-14243:
--

And i found the flink built in function JsonUtil has a lock contention problem  
at LOG. Although, flink1.9 has removed the call relation .

So i  think it should be fixed now.

> flink hiveudf needs some check when it is using cache
> -
>
> Key: FLINK-14243
> URL: https://issues.apache.org/jira/browse/FLINK-14243
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Attachments: Snipaste_2019-10-30_15-34-09.png
>
>
> Flink1.9 brings in hive connector, but it will have some problem when the 
> original hive udf using cache. We konw that hive is  processed level parallel 
> based on jvm, while flink/spark is task level parallel. If flink just calls 
> the hive udf, it wll exists thread-safe problem when using cache.
> So it may need check the hive udf code and if it is not thread-safe, and set 
> the flink parallize=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10074: [FLINK-14499][Runtime / 
Metrics]MetricRegistry#getMetricQueryServiceGatewayRpcAddress is Nonnull
URL: https://github.com/apache/flink/pull/10074#issuecomment-549322411
 
 
   
   ## CI report:
   
   * 4807c59ae20273d5b4611395ca5467217d7d0ca3 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134842985)
   * fedd3f2abd43a67d6461b895145964fcdf98af0d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134961727)
   * 5c077aaf9166fbbd068f80eb61294d9cfea2eddc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135207803)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks from ConcurrentHashMap to HashMap

2019-11-06 Thread GitBox
flinkbot commented on issue #10101: [FLINK-14602] Change Type of Field tasks 
from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10101#issuecomment-55034
 
 
   
   ## CI report:
   
   * d5de7837dc84014509fe5603bb9f5d379c597309 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10100: [FLINK-14627][tests] Refactor ExecutionGraph creation in tests as TestingExecutionGraphBuilder

2019-11-06 Thread GitBox
flinkbot edited a comment on issue #10100: [FLINK-14627][tests] Refactor 
ExecutionGraph creation in tests as TestingExecutionGraphBuilder
URL: https://github.com/apache/flink/pull/10100#issuecomment-550210232
 
 
   
   ## CI report:
   
   * 2ffe594419c91e32291507339f4fc4844dbb6c57 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135211187)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the release script for Python API release package

2019-11-06 Thread GitBox
flinkbot commented on issue #10103: [FLINK-14506][python][build] Improve the 
release script for Python API release package
URL: https://github.com/apache/flink/pull/10103#issuecomment-550222346
 
 
   
   ## CI report:
   
   * 7c842be4f1d79ff3206322d72f4cb1dcebd52c7e : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field intermediateResults from ConcurrentHashMap to HashMap

2019-11-06 Thread GitBox
flinkbot commented on issue #10102: [FLINK-14612] Change Type of Field 
intermediateResults from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10102#issuecomment-55078
 
 
   
   ## CI report:
   
   * ab01afead96b82972e207dbdc5fa3f37e7515292 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342978122
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import javax.annotation.Nonnegative;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Samples the output buffer availability of tasks for back pressure tracking.
+ *
+ * @see 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
+ */
+class TaskBackPressureSampleService {
 
 Review comment:
   TaskBackPressureSampleService -> BackPressureSampleService, because it is 
actually a `TaskExecutor` level service.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342988898
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -461,16 +461,15 @@ AbstractInvokable getInvokable() {
return invokable;
}
 
-   public StackTraceElement[] getStackTraceOfExecutingThread() {
-   final AbstractInvokable invokable = this.invokable;
-
-   if (invokable == null) {
-   return new StackTraceElement[0];
+   public boolean isAvailableForOutput() {
+   if (invokable == null || 
consumableNotifyingPartitionWriters.length == 0) {
+   return true;
}
-
-   return invokable.getExecutingThread()
-   .orElse(executingThread)
-   .getStackTrace();
+   final CompletableFuture[] outputFutures = new 
CompletableFuture[consumableNotifyingPartitionWriters.length];
+   for(int i = 0; i < outputFutures.length; ++i) {
 
 Review comment:
   whitespace after `for`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342968437
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java
 ##
 @@ -295,42 +292,30 @@ int getNumberOfPendingSamples() {
// 

 
/**
-* A pending stack trace sample, which collects stack traces and owns a
-* {@link StackTraceSample} promise.
+* A pending task back pressure stats, which collects task back pressure
 
 Review comment:
   `A pending task back pressure sample`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342958896
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 ##
 @@ -46,22 +46,20 @@
String getAddress();
 
/**
-* Request a stack trace sample from the given task.
+* Request  to sample the back pressure ratio from the given task.
 *
 * @param executionAttemptID identifying the task to sample
-* @param sampleId of the sample
-* @param numSamples to take from the given task
-* @param delayBetweenSamples to wait for
-* @param maxStackTraceDepth of the returned sample
-* @param timeout of the request
-* @return Future of stack trace sample response
+* @param sampleId id of the sample
+* @param numSamples number of samples to take
+* @param delayBetweenSamples time to wait between samples
+* @param timeout rpc request timeout
+* @return Future containing the task back pressure sampling results
 */
-   CompletableFuture requestStackTraceSample(
+   CompletableFuture 
sampleTaskBackPressure(
 
 Review comment:
   sampleTaskBackPressure -> requestTaskBackPressure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342972014
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
 ##
 @@ -50,82 +51,60 @@
  * Back pressure statistics tracker.
  *
  * Back pressure is determined by sampling running tasks. If a task is
- * slowed down by back pressure it will be stuck in memory requests to a
- * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
- *
- * The back pressured stack traces look like this:
- *
- * 
- * java.lang.Object.wait(Native Method)
- * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 
<--- BLOCKING
- * request
- * [...]
- * 
+ * slowed down by back pressure, there should be no free buffers in output
+ * {@link org.apache.flink.runtime.io.network.buffer.BufferPool} of the task.
  */
 public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {
 
private static final Logger LOG = 
LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class);
 
-   /** Maximum stack trace depth for samples. */
-   static final int MAX_STACK_TRACE_DEPTH = 8;
-
-   /** Expected class name for back pressure indicating stack trace 
element. */
-   static final String EXPECTED_CLASS_NAME = 
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
-
-   /** Expected method name for back pressure indicating stack trace 
element. */
-   static final String EXPECTED_METHOD_NAME = 
"requestBufferBuilderBlocking";
-
/** Lock guarding trigger operations. */
private final Object lock = new Object();
 
-   /* Stack trace sample coordinator. */
-   private final StackTraceSampleCoordinator coordinator;
+   /** Back pressure sample coordinator. */
 
 Review comment:
   This comment can be removed because of useless information


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342966862
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java
 ##
 @@ -156,46 +154,43 @@ public StackTraceSampleCoordinator(Executor executor, 
long sampleTimeout) {
 
// Trigger all samples
for (Execution execution: executions) {
-   final 
CompletableFuture stackTraceSampleFuture = 
execution.requestStackTraceSample(
-   sampleId,
-   numSamples,
-   delayBetweenSamples,
-   maxStackTraceDepth,
-   timeout);
-
-   stackTraceSampleFuture.handleAsync(
-   (StackTraceSampleResponse 
stackTraceSampleResponse, Throwable throwable) -> {
-   if (stackTraceSampleResponse != 
null) {
-   collectStackTraces(
-   
stackTraceSampleResponse.getSampleId(),
-   
stackTraceSampleResponse.getExecutionAttemptID(),
-   
stackTraceSampleResponse.getSamples());
+   final 
CompletableFuture taskBackPressureFuture =
+   
execution.sampleTaskBackPressure(sampleId, numSamples, delayBetweenSamples, 
timeout);
+
+   taskBackPressureFuture.handleAsync(
+   (TaskBackPressureSampleResponse 
taskBackPressureSampleResponse, Throwable throwable) -> {
+   if 
(taskBackPressureSampleResponse != null) {
+   
collectTaskBackPressureStat(
 
 Review comment:
   `collectTaskBackPressureStat` -> `collectTaskBackPressureSample`.  It is 
better to unify some terms here. 
   E.g. `cancelTaskBackPressureSample`, `collectTaskBackPressureSample`, 
`TaskBackPressureSampleResponse`
   Also for the below class `PendingTaskBackPressureStats` -> 
`PendingTaskBackPressureSample`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342984468
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskOutputAvailabilitySampleableTaskAdapter.java
 ##
 @@ -26,18 +26,18 @@
 import static java.util.Objects.requireNonNull;
 
 /**
- * Adapts {@link Task} to {@link StackTraceSampleableTask}.
+ * Adapts {@link Task} to {@link OutputAvailabilitySampleableTask}.
  */
-class TaskStackTraceSampleableTaskAdapter implements StackTraceSampleableTask {
+class TaskOutputAvailabilitySampleableTaskAdapter implements 
OutputAvailabilitySampleableTask {
 
 Review comment:
   We could simplify to make `Task` class implement 
`BackPressureSampleableTask` directly, then to delete this class. Furthermore 
we could remove `#isRunning` method from the interface, because task can judge 
the state internally and return the boolean value via 
`BackPressureSampleableTask#isBackPressured`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342956700
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -925,33 +925,30 @@ public void fail(Throwable t) {
}
 
/**
-* Request a stack trace sample from the task of this execution.
+* Request to sample the back pressure ratio from the task of this 
execution.
 *
-* @param sampleId of the stack trace sample
-* @param numSamples the sample should contain
-* @param delayBetweenSamples to wait
-* @param maxStackTraceDepth of the samples
-* @param timeout until the request times out
-* @return Future stack trace sample response
+* @param sampleId id of the sample
+* @param numSamples the number of samples to take
+* @param delayBetweenSamples time to wait between samples
+* @param timeout the request times out
+* @return Future containing the task back pressure sampling results
 */
-   public CompletableFuture 
requestStackTraceSample(
+   public CompletableFuture 
sampleTaskBackPressure(
 
 Review comment:
   nit : sampleTaskBackPressure -> requestTaskBackPressure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342965035
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java
 ##
 @@ -63,57 +64,55 @@
/** Time out after the expected sampling duration. */
private final long sampleTimeout;
 
-   /** In progress samples (guarded by lock). */
-   private final Map pendingSamples = 
new HashMap<>();
+   /** In progress samples. */
+   @GuardedBy("lock")
+   private final Map pendingSamples 
= new HashMap<>();
 
/** A list of recent sample IDs to identify late messages vs. invalid 
ones. */
private final ArrayDeque recentPendingSamples = new 
ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
 
-   /** Sample ID counter (guarded by lock). */
+   /** Sample ID counter. */
+   @GuardedBy("lock")
private int sampleIdCounter;
 
/**
-* Flag indicating whether the coordinator is still running (guarded by
-* lock).
+* Flag indicating whether the coordinator is still running.
 */
+   @GuardedBy("lock")
private boolean isShutDown;
 
/**
 * Creates a new coordinator for the job.
 *
-* @param executor to use to execute the futures
+* @param executor  Used to execute the futures.
 * @param sampleTimeout Time out after the expected sampling duration.
 *  This is added to the expected duration of a
 *  sample, which is determined by the number of
 *  samples and the delay between each sample.
 */
-   public StackTraceSampleCoordinator(Executor executor, long 
sampleTimeout) {
+   public BackPressureSampleCoordinator(Executor executor, long 
sampleTimeout) {
checkArgument(sampleTimeout >= 0L);
-   this.executor = Preconditions.checkNotNull(executor);
+   this.executor = checkNotNull(executor);
this.sampleTimeout = sampleTimeout;
}
 
/**
-* Triggers a stack trace sample to all tasks.
+* Triggers a task back pressure stats sample to all tasks.
 *
 * @param tasksToSample   Tasks to sample.
-* @param numSamples  Number of stack trace samples to collect.
+* @param numSamples  Number of samples per task.
 * @param delayBetweenSamples Delay between consecutive samples.
-* @param maxStackTraceDepth  Maximum depth of the stack trace. 0 
indicates
-*no maximum and keeps the complete stack 
trace.
-* @return A future of the completed stack trace sample
+* @return A future of the completed task back pressure stats sample
 */
@SuppressWarnings("unchecked")
-   public CompletableFuture triggerStackTraceSample(
+   CompletableFuture triggerTaskBackPressureSample(
ExecutionVertex[] tasksToSample,
int numSamples,
 
 Review comment:
   `numSamples` and `delayBetweenSamples` should be passed into constructor of 
this class instead of passing into constructor of 
`BackPressureStatsTrackerImpl`. Because these arguments are only used inside 
coordinator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342979874
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import javax.annotation.Nonnegative;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Samples the output buffer availability of tasks for back pressure tracking.
+ *
+ * @see 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
+ */
+class TaskBackPressureSampleService {
+
+   private final ScheduledExecutor scheduledExecutor;
+
+   TaskBackPressureSampleService(final ScheduledExecutor 
scheduledExecutor) {
+   this.scheduledExecutor = requireNonNull(scheduledExecutor, "The 
scheduledExecutor must not be null.");
+   }
+
+   /**
+* Returns a future that completes with the back pressure ratio of a 
task.
+*
+* @param taskThe task to be sampled.
+* @param numSamples  The number of samples.
+* @param delayBetweenSamples The time to wait between samples.
+* @return A future containing the task back pressure ratio.
+*/
+   public CompletableFuture sampleTaskBackPressure(
+   final OutputAvailabilitySampleableTask task,
+   @Nonnegative final int numSamples,
 
 Review comment:
   `@Nonnegative` is duplicate with below `checkArgument(numSamples > 0, "The 
numSamples must be positive.");`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342981944
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import javax.annotation.Nonnegative;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Samples the output buffer availability of tasks for back pressure tracking.
+ *
+ * @see 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
+ */
+class TaskBackPressureSampleService {
+
+   private final ScheduledExecutor scheduledExecutor;
+
+   TaskBackPressureSampleService(final ScheduledExecutor 
scheduledExecutor) {
+   this.scheduledExecutor = requireNonNull(scheduledExecutor, "The 
scheduledExecutor must not be null.");
+   }
+
+   /**
+* Returns a future that completes with the back pressure ratio of a 
task.
+*
+* @param taskThe task to be sampled.
+* @param numSamples  The number of samples.
+* @param delayBetweenSamples The time to wait between samples.
+* @return A future containing the task back pressure ratio.
+*/
+   public CompletableFuture sampleTaskBackPressure(
+   final OutputAvailabilitySampleableTask task,
+   @Nonnegative final int numSamples,
+   final Time delayBetweenSamples) {
+
+   checkNotNull(task, "The task must not be null.");
+   checkArgument(numSamples > 0, "The numSamples must be 
positive.");
+   checkNotNull(delayBetweenSamples, "The delayBetweenSamples must 
not be null.");
+
+   return sampleTaskBackPressure(
+   task,
+   numSamples,
+   delayBetweenSamples,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture sampleTaskBackPressure(
+   final OutputAvailabilitySampleableTask task,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final List taskOutputAvailability,
+   final CompletableFuture resultFuture) {
+
+   final Optional isTaskAvailableForOutput = 
isTaskAvailableForOutput(task);
+   if (isTaskAvailableForOutput.isPresent()) {
+   
taskOutputAvailability.add(isTaskAvailableForOutput.get());
+   } else if (!taskOutputAvailability.isEmpty()) {
+   
resultFuture.complete(calculateTaskBackPressureRatio(taskOutputAvailability));
+   return resultFuture;
+   } else {
+   throw new IllegalStateException(String.format("Cannot 
sample task %s. " +
+   "Because the task is not running.", 
task.getExecutionId()));
+   }
+
+   if (numSamples > 1) {
+   scheduledExecutor.schedule(() -> sampleTaskBackPressure(
+   task,
+   numSamples - 1,
+   delayBetweenSamples,
+   taskOutputAvailability,
+   resultFuture), delayBetweenSamples.getSize(), 
delayBetweenSamples.getUnit());
 
 Review comment:
   Make `delayBetweenSamples.getSize(), delayBetweenSamples.getUnit()` as 
separate line

--

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342974445
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
 ##
 @@ -322,41 +266,18 @@ private OperatorBackPressureStats 
createStatsFromSample(StackTraceSample sample)
}
}
 
-   // Ratio of blocked samples to total samples per sub 
task. Array
-   // position corresponds to sub task index.
-   double[] backPressureRatio = new double[traces.size()];
-
-   for (Entry> entry : traces.entrySet()) {
-   int backPressureSamples = 0;
-
-   List taskTraces = 
entry.getValue();
-
-   for (StackTraceElement[] trace : taskTraces) {
-   for (int i = trace.length - 1; i >= 0; 
i--) {
-   StackTraceElement elem = 
trace[i];
-
-   if 
(elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
-   
elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
-
-   backPressureSamples++;
-   break; // Continue with 
next stack trace
-   }
-   }
-   }
+   // Back pressure ratio of all tasks. Array position 
corresponds
+   // to sub task index.
+   double[] backPressureRatio = new 
double[backPressureRatioByTask.size()];
 
 Review comment:
   backPressureRatio -> backPressureRatios


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342977589
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import javax.annotation.Nonnegative;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Samples the output buffer availability of tasks for back pressure tracking.
+ *
+ * @see 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
+ */
+class TaskBackPressureSampleService {
+
+   private final ScheduledExecutor scheduledExecutor;
+
+   TaskBackPressureSampleService(final ScheduledExecutor 
scheduledExecutor) {
+   this.scheduledExecutor = requireNonNull(scheduledExecutor, "The 
scheduledExecutor must not be null.");
 
 Review comment:
   `requireNonNull` -> `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342971189
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java
 ##
 @@ -295,42 +292,30 @@ int getNumberOfPendingSamples() {
// 

 
/**
-* A pending stack trace sample, which collects stack traces and owns a
-* {@link StackTraceSample} promise.
+* A pending task back pressure stats, which collects task back pressure
+* ratio and owns a {@link BackPressureStats} promise.
 *
 * Access pending sample in lock scope.
 */
-   private static class PendingStackTraceSample {
+   private static class PendingTaskBackPressureStats {
 
 Review comment:
   Class naming consistent between `PendingTaskBackPressureStats` and 
`BackPressureStats`. I mean whether to retain term `Task`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342972877
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
 ##
 @@ -250,25 +201,18 @@ public void shutDown() {
}
 
/**
-* Invalidates the cache (irrespective of clean up interval).
-*/
-   void invalidateOperatorStatsCache() {
-   operatorStatsCache.invalidateAll();
-   }
-
-   /**
-* Callback on completed stack trace sample.
+* Callback on completed task back pressure sample.
 */
-   class StackTraceSampleCompletionCallback implements 
BiFunction {
+   class TaskBackPressureSampleCompletionCallback implements 
BiFunction {
 
 Review comment:
   Also the class naming `TaskBackPressureSampleCompletionCallback`, whether to 
retain term `Task` can be consistent with other classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342971770
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java
 ##
 @@ -89,28 +88,28 @@ public long getStartTime() {
}
 
/**
-* Returns the time stamp, when all stack traces were collected at the
-* JobManager.
+* Returns the time stamp, when all back pressure stats were collected 
at
+* the JobManager.
 *
-* @return Time stamp, when all stack traces were collected at the
+* @return Time stamp, when all back pressure stats were collected at 
the
 * JobManager
 */
public long getEndTime() {
return endTime;
}
 
/**
-* Returns the a map of stack traces by execution ID.
+* Returns the a map of back pressure ratio by execution ID.
 *
-* @return Map of stack traces by execution ID
+* @return Map of back pressure ratio by execution ID
 */
-   public Map> 
getStackTraces() {
-   return stackTracesByTask;
+   public Map getBackPressureRatioByTask() {
+   return backPressureRatioByTask;
}
 
@Override
public String toString() {
-   return "StackTraceSample{" +
+   return "TaskBackPressureStats{" +
 
 Review comment:
   TaskBackPressureStats -> BackPressureStats


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342969477
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java
 ##
 @@ -21,44 +21,43 @@
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A sample of stack traces for one or more tasks.
+ * Task back pressure stats for one or more tasks.
 
 Review comment:
   Remove prefix `Task`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342981060
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskBackPressureSampleService.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import javax.annotation.Nonnegative;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Samples the output buffer availability of tasks for back pressure tracking.
+ *
+ * @see 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
+ */
+class TaskBackPressureSampleService {
+
+   private final ScheduledExecutor scheduledExecutor;
+
+   TaskBackPressureSampleService(final ScheduledExecutor 
scheduledExecutor) {
+   this.scheduledExecutor = requireNonNull(scheduledExecutor, "The 
scheduledExecutor must not be null.");
+   }
+
+   /**
+* Returns a future that completes with the back pressure ratio of a 
task.
+*
+* @param taskThe task to be sampled.
+* @param numSamples  The number of samples.
+* @param delayBetweenSamples The time to wait between samples.
+* @return A future containing the task back pressure ratio.
+*/
+   public CompletableFuture sampleTaskBackPressure(
+   final OutputAvailabilitySampleableTask task,
+   @Nonnegative final int numSamples,
+   final Time delayBetweenSamples) {
+
+   checkNotNull(task, "The task must not be null.");
+   checkArgument(numSamples > 0, "The numSamples must be 
positive.");
+   checkNotNull(delayBetweenSamples, "The delayBetweenSamples must 
not be null.");
+
+   return sampleTaskBackPressure(
+   task,
+   numSamples,
+   delayBetweenSamples,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture sampleTaskBackPressure(
+   final OutputAvailabilitySampleableTask task,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final List taskOutputAvailability,
+   final CompletableFuture resultFuture) {
+
+   final Optional isTaskAvailableForOutput = 
isTaskAvailableForOutput(task);
+   if (isTaskAvailableForOutput.isPresent()) {
+   
taskOutputAvailability.add(isTaskAvailableForOutput.get());
+   } else if (!taskOutputAvailability.isEmpty()) {
+   
resultFuture.complete(calculateTaskBackPressureRatio(taskOutputAvailability));
+   return resultFuture;
+   } else {
+   throw new IllegalStateException(String.format("Cannot 
sample task %s. " +
+   "Because the task is not running.", 
task.getExecutionId()));
+   }
+
+   if (numSamples > 1) {
+   scheduledExecutor.schedule(() -> sampleTaskBackPressure(
+   task,
+   numSamples - 1,
+   delayBetweenSamples,
+   taskOutputAvailability,
+   resultFuture), delayBetweenSamples.getSize(), 
delayBetweenSamples.getUnit());
+   } else {
+   
resultFuture.complete(calculateTaskBackPressureRatio(taskOutputAvailability));
+   }
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342984639
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -461,16 +461,15 @@ AbstractInvokable getInvokable() {
return invokable;
}
 
-   public StackTraceElement[] getStackTraceOfExecutingThread() {
-   final AbstractInvokable invokable = this.invokable;
-
-   if (invokable == null) {
-   return new StackTraceElement[0];
+   public boolean isAvailableForOutput() {
 
 Review comment:
   `isAvailableForOutput` -> `isBackPressured`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342974206
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
 ##
 @@ -296,21 +240,21 @@ public Void apply(StackTraceSample stackTraceSample, 
Throwable throwable) {
}
 
/**
-* Creates the back pressure stats from a stack trace sample.
+* Creates operator back pressure stats from task back pressure 
stats.
 *
-* @param sample Stack trace sample to base stats on.
+* @param stats Task back pressure stats.
 *
-* @return Back pressure stats
+* @return Operator back pressure stats
 */
-   private OperatorBackPressureStats 
createStatsFromSample(StackTraceSample sample) {
-   Map> 
traces = sample.getStackTraces();
+   private OperatorBackPressureStats 
createOperatorBackPressureStats(BackPressureStats stats) {
 
 Review comment:
   TBH I do not like the class naming `OperatorBackPressureStats`, because when 
compared with `BackPressureStats` class it is not the task/operator level 
difference. Actually they are indicating the same level, only transforming the 
map from `BackPressureStats` to array in `OperatorBackPressureStats`.
   
   Since we have not touched the class `OperatorBackPressureStats` in this PR, 
so we can keep it as now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342969795
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java
 ##
 @@ -21,44 +21,43 @@
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A sample of stack traces for one or more tasks.
+ * Task back pressure stats for one or more tasks.
  *
- * The sampling is triggered in {@link StackTraceSampleCoordinator}.
+ * The stats are calculated from sampling triggered in {@link 
BackPressureSampleCoordinator}.
  */
-public class StackTraceSample {
+public class BackPressureStats {
 
-   /** ID of this sample (unique per job). */
+   /** ID of the sample (unique per job). */
private final int sampleId;
 
/** Time stamp, when the sample was triggered. */
private final long startTime;
 
-   /** Time stamp, when all stack traces were collected at the JobManager. 
*/
+   /** Time stamp, when all back pressure stats were collected at the 
JobManager. */
private final long endTime;
 
-   /** Map of stack traces by execution ID. */
-   private final Map> 
stackTracesByTask;
+   /** Map of back pressure ratio by execution ID. */
+   private final Map backPressureRatioByTask;
 
 Review comment:
   `backPressureRatioByTask` -> `backPressureRatiosByTask` or 
`backPressureRatios`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342959404
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 ##
 @@ -56,20 +56,18 @@ public String getAddress() {
}
 
@Override
-   public CompletableFuture 
requestStackTraceSample(
-   ExecutionAttemptID executionAttemptID,
-   int sampleId,
-   int numSamples,
-   Time delayBetweenSamples,
-   int maxStackTraceDepth,
-   Time timeout) {
-
-   return taskExecutorGateway.requestStackTraceSample(
+   public CompletableFuture 
sampleTaskBackPressure(
+   final ExecutionAttemptID executionAttemptID,
 
 Review comment:
   It is not very necessary to add `final` for method arguments if I remembered 
correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342971685
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStats.java
 ##
 @@ -89,28 +88,28 @@ public long getStartTime() {
}
 
/**
-* Returns the time stamp, when all stack traces were collected at the
-* JobManager.
+* Returns the time stamp, when all back pressure stats were collected 
at
+* the JobManager.
 *
-* @return Time stamp, when all stack traces were collected at the
+* @return Time stamp, when all back pressure stats were collected at 
the
 * JobManager
 */
public long getEndTime() {
return endTime;
}
 
/**
-* Returns the a map of stack traces by execution ID.
+* Returns the a map of back pressure ratio by execution ID.
 *
-* @return Map of stack traces by execution ID
+* @return Map of back pressure ratio by execution ID
 */
-   public Map> 
getStackTraces() {
-   return stackTracesByTask;
+   public Map getBackPressureRatioByTask() {
 
 Review comment:
   getBackPressureRatioByTask -> getBackPressureRatios


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-06 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342976963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/OutputAvailabilitySampleableTask.java
 ##
 @@ -22,13 +22,13 @@
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 /**
- * Task interface used by {@link StackTraceSampleService} for back pressure 
tracking.
+ * Task interface used by {@link TaskBackPressureSampleService} for back 
pressure tracking.
  */
-interface StackTraceSampleableTask {
+interface OutputAvailabilitySampleableTask {
 
 Review comment:
   Maybe call this class `BackPressureSampleableTask`. Also for the below 
method `isAvailableForOutput` -> `isBackPressured`
   We do not need to expose the detail way in the interface how to monitor the 
back pressure. This interface only describes the task can be sampled for back 
pressure, and the specific implementation can monitor the back pressure via 
different ways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342955630
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -209,18 +173,48 @@ public int getFieldCount() {
 * @param fieldIndex the index of the field
 */
public Optional getFieldName(int fieldIndex) {
-   if (fieldIndex < 0 || fieldIndex >= fieldNames.length) {
+   if (fieldIndex < 0 || fieldIndex >= columns.size()) {
+   return Optional.empty();
+   }
+   return Optional.of(this.columns.get(fieldIndex).getName());
+   }
+
+   /**
+* Returns the {@link TableColumn} instance for the given field index.
+*
+* @param fieldIndex the index of the field
+*/
+   public Optional getTableColumn(int fieldIndex) {
+   if (fieldIndex < 0 || fieldIndex >= columns.size()) {
return Optional.empty();
}
-   return Optional.of(fieldNames[fieldIndex]);
+   return Optional.of(this.columns.get(fieldIndex));
+   }
+
+   /**
+* Returns the {@link TableColumn} instance for the given field name.
+*
+* @param fieldName the name of the field
+*/
+   public Optional getTableColumn(String fieldName) {
+   return this.columns.stream()
+   .filter(column -> column.getName().equals(fieldName))
+   .findFirst();
+   }
+
+   /**
+* Returns all the {@link TableColumn}s for this table schema.
+*/
+   public TableColumn[] getTableColumns() {
 
 Review comment:
   Why not just return the `List`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342949927
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A table column represents a table column's structure with
+ * column name, column data type and computation expression(if it is a 
computed column).
+ */
+public class TableColumn {
+
+   //~ Instance fields 

+
+   private final String name;
+   private final DataType type;
+   @Nullable
+   private final String expr;
+
+   //~ Constructors 
---
+
+   /**
+* Creates a {@link TableColumn} instance.
+*
+* @param name Column name
+* @param type Column data type
+* @param expr Column computation expression if it is a computed column
+*/
+   private TableColumn(
+   String name,
+   DataType type,
+   @Nullable String expr) {
+   this.name = name;
+   this.type = type;
+   this.expr = expr;
+   }
+
+   //~ Methods 

+
+   /**
+* Creates a table column from given name and data type.
+*/
+   public static TableColumn of(String name, DataType type) {
+   Preconditions.checkNotNull(name, "Column name can not be 
null!");
+   Preconditions.checkArgument(
+   type != null,
+   "Column type can not be null!");
+   return new TableColumn(
+   name,
+   type,
+   null);
+   }
+
+   /**
+* Creates a table column from given name and computation expression.
+*
+* @param name Name of the column
+* @param expression SQL-style expression
+*/
+   public static TableColumn of(String name, DataType type, String 
expression) {
+   Preconditions.checkNotNull(name, "Column name can not be 
null!");
+   Preconditions.checkNotNull(
+   type,
+   "Column type can not be null!");
+   Preconditions.checkNotNull(
+   expression,
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342949897
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A table column represents a table column's structure with
+ * column name, column data type and computation expression(if it is a 
computed column).
+ */
+public class TableColumn {
+
+   //~ Instance fields 

+
+   private final String name;
+   private final DataType type;
+   @Nullable
+   private final String expr;
+
+   //~ Constructors 
---
+
+   /**
+* Creates a {@link TableColumn} instance.
+*
+* @param name Column name
+* @param type Column data type
+* @param expr Column computation expression if it is a computed column
+*/
+   private TableColumn(
+   String name,
+   DataType type,
+   @Nullable String expr) {
+   this.name = name;
+   this.type = type;
+   this.expr = expr;
+   }
+
+   //~ Methods 

+
+   /**
+* Creates a table column from given name and data type.
+*/
+   public static TableColumn of(String name, DataType type) {
+   Preconditions.checkNotNull(name, "Column name can not be 
null!");
+   Preconditions.checkArgument(
+   type != null,
+   "Column type can not be null!");
+   return new TableColumn(
+   name,
+   type,
+   null);
+   }
+
+   /**
+* Creates a table column from given name and computation expression.
+*
+* @param name Name of the column
+* @param expression SQL-style expression
+*/
+   public static TableColumn of(String name, DataType type, String 
expression) {
+   Preconditions.checkNotNull(name, "Column name can not be 
null!");
+   Preconditions.checkNotNull(
+   type,
 
 Review comment:
   no need to wrap line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342988946
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -423,17 +525,15 @@ public Builder watermark(String rowtimeAttribute, String 
watermarkExpressionStri
throw new IllegalStateException("Multiple 
watermark definition is not supported yet.");
}
this.watermarkSpecs.add(new 
WatermarkSpec(rowtimeAttribute, watermarkExpressionString, 
watermarkExprOutputType));
+   validateWatermarkSpecs(this.columns, 
this.watermarkSpecs);
 
 Review comment:
   ditto: validate this in `build()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342989289
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 ##
 @@ -76,6 +77,8 @@
 
public static final String TABLE_SCHEMA_TYPE = "type";
 
+   public static final String TABLE_SCHEMA_EXPR = "expr";
 
 Review comment:
   `TABLE_SCHEMA_EXPR` looks obscure to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342953235
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -346,22 +318,137 @@ public static Builder builder() {
return new Builder();
}
 
+   //~ Tools 
--
+
+   /**
+* Tools method to transform arrays of table names and types
+* into a {@link TableColumn} list.
+*/
+   private static List getTableColumns(
+   String[] fieldNames,
+   TypeInformation[] fieldTypes) {
+   DataType[] fieldDataTypes = 
fromLegacyInfoToDataType(fieldTypes);
+   validateFields(fieldNames, fieldDataTypes);
 
 Review comment:
   IIUC your purpose here is validate before/when constructing `TableSchema`, 
right? I noticed there is only one place calling this method, which is `public 
TableSchema(String[] fieldNames, TypeInformation[] fieldTypes)`.
   
   If this is true, I would suggest move this validation out of this method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342991588
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 ##
 @@ -257,6 +267,43 @@ public void putIndexedFixedProperties(String key, 
List subKeys, ListFor example:
+*
+* 
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.type = LONG, schema.fields.1.name = test2
+* schema.fields.2.type = LONG, schema.fields.1.expr = test + 1
+* 
+*
+* The arity of each subKeyValues must match the arity of 
propertyKeys.
+*/
+   public void putIndexedOptionalProperties(String key, List 
subKeys, List> subKeyValues) {
 
 Review comment:
   just extend the original `putIndexedFixedProperties` to support null?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342956231
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -249,8 +243,16 @@ public DataType toRowDataType() {
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("root\n");
-   for (int i = 0; i < fieldNames.length; i++) {
-   sb.append(" |-- ").append(fieldNames[i]).append(": 
").append(fieldDataTypes[i]).append('\n');
+   for (int i = 0; i < columns.size(); i++) {
+   sb.append(" |-- ")
+   .append(getFieldName(i).get())
+   .append(": ");
+   if (columns.get(i).isGenerated()) {
+   sb.append(columns.get(i).getExpr().get());
 
 Review comment:
   computed column also should output field type?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342948250
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A table column represents a table column's structure with
+ * column name, column data type and computation expression(if it is a 
computed column).
+ */
 
 Review comment:
   add `@PublicEvolving`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342952024
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -346,22 +318,137 @@ public static Builder builder() {
return new Builder();
}
 
+   //~ Tools 
--
+
+   /**
+* Tools method to transform arrays of table names and types
+* into a {@link TableColumn} list.
+*/
+   private static List getTableColumns(
 
 Review comment:
   no need to be static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342956037
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -209,18 +173,48 @@ public int getFieldCount() {
 * @param fieldIndex the index of the field
 */
public Optional getFieldName(int fieldIndex) {
-   if (fieldIndex < 0 || fieldIndex >= fieldNames.length) {
+   if (fieldIndex < 0 || fieldIndex >= columns.size()) {
+   return Optional.empty();
+   }
+   return Optional.of(this.columns.get(fieldIndex).getName());
+   }
+
+   /**
+* Returns the {@link TableColumn} instance for the given field index.
+*
+* @param fieldIndex the index of the field
+*/
+   public Optional getTableColumn(int fieldIndex) {
+   if (fieldIndex < 0 || fieldIndex >= columns.size()) {
return Optional.empty();
}
-   return Optional.of(fieldNames[fieldIndex]);
+   return Optional.of(this.columns.get(fieldIndex));
+   }
+
+   /**
+* Returns the {@link TableColumn} instance for the given field name.
+*
+* @param fieldName the name of the field
+*/
+   public Optional getTableColumn(String fieldName) {
+   return this.columns.stream()
+   .filter(column -> column.getName().equals(fieldName))
+   .findFirst();
+   }
+
+   /**
+* Returns all the {@link TableColumn}s for this table schema.
+*/
+   public TableColumn[] getTableColumns() {
+   return this.columns.toArray(new TableColumn[0]);
}
 
/**
 * Converts a table schema into a (nested) data type describing a 
{@link DataTypes#ROW(Field...)}.
 
 Review comment:
   Adds some notice here to say this would includes all computed columns in 
this RowType, and the caller should be careful with it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342952283
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -346,22 +318,137 @@ public static Builder builder() {
return new Builder();
}
 
+   //~ Tools 
--
+
+   /**
+* Tools method to transform arrays of table names and types
+* into a {@link TableColumn} list.
+*/
+   private static List getTableColumns(
+   String[] fieldNames,
 
 Review comment:
   nit: 2 tabs


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342956790
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -386,9 +486,11 @@ public Builder field(String name, DataType dataType) {
public Builder fields(String[] names, DataType[] dataTypes) {
Preconditions.checkNotNull(names);
Preconditions.checkNotNull(dataTypes);
-
-   fieldNames.addAll(Arrays.asList(names));
-   fieldDataTypes.addAll(Arrays.asList(dataTypes));
+   validateFields(names, dataTypes);
 
 Review comment:
   We should move this validation logic inside `build()`. It's will be more 
safer than current approach. For example, you can't really protect the field 
names are all unique with current solution. I can add column `a` with 
`field(String name, DataType dataType, String expression)`, and then add `a, b, 
c` with this method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10096: [FLINK-14623][table-api] Add computed column information into TableSc…

2019-11-06 Thread GitBox
KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r342949758
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A table column represents a table column's structure with
+ * column name, column data type and computation expression(if it is a 
computed column).
+ */
+public class TableColumn {
+
+   //~ Instance fields 

+
+   private final String name;
+   private final DataType type;
+   @Nullable
+   private final String expr;
+
+   //~ Constructors 
---
+
+   /**
+* Creates a {@link TableColumn} instance.
+*
+* @param name Column name
+* @param type Column data type
+* @param expr Column computation expression if it is a computed column
+*/
+   private TableColumn(
+   String name,
+   DataType type,
+   @Nullable String expr) {
+   this.name = name;
+   this.type = type;
+   this.expr = expr;
+   }
+
+   //~ Methods 

+
+   /**
+* Creates a table column from given name and data type.
+*/
+   public static TableColumn of(String name, DataType type) {
+   Preconditions.checkNotNull(name, "Column name can not be 
null!");
+   Preconditions.checkArgument(
 
 Review comment:
   use `checkNotNull`? and no need to wrap line. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   >