[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279261#comment-16279261
 ] 

ASF GitHub Bot commented on FLINK-5506:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5126

[FLINK-5506] [gelly] Fix CommunityDetection NullPointerException

## What is the purpose of the change

This fixes a regression which can result in `NullPointerException` when 
running the `CommunityDetection` algorithm. When a new highest valued label had 
a negative summed value the lookup was performed on the old label value which 
did not necessarily exist in the dictionary.

## Brief change log

- replaced `Double.MIN_VALUE` with the intended (and original) 
`-Double.MAX_VALUE`
- added `Graph#mapVertices` and `Graph#mapEdges` variants accepting 
`TypeHint` rather than `TypeInformation`; this seemed useful when using Java 8 
lambdas which require explicit type definition with OpenJDK
- added additional tests for `CommunityDetection`

## Verifying this change

`CommunityDetectionTest#testWithSingletonEdgeGraph` is a user supplied 
example which fails without the bug fix.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
5506_communitydetection_nullpointerexception

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5126


commit 8d57371616a742ba29548192d5d8a4018cafd26b
Author: Greg Hogan 
Date:   2017-12-05T17:36:18Z

[hotfix] [gelly] Graph API mapVertices/mapEdges with TypeHint

commit 8c131b42cfa95df34333a9ca733c9337f75695ba
Author: Greg Hogan 
Date:   2017-12-05T17:38:50Z

[FLINK-5506] [gelly] Fix CommunityDetection NullPointerException

Double.MIN_VALUE != min(double)




> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and 

[GitHub] flink pull request #5126: [FLINK-5506] [gelly] Fix CommunityDetection NullPo...

2017-12-05 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5126

[FLINK-5506] [gelly] Fix CommunityDetection NullPointerException

## What is the purpose of the change

This fixes a regression which can result in `NullPointerException` when 
running the `CommunityDetection` algorithm. When a new highest valued label had 
a negative summed value the lookup was performed on the old label value which 
did not necessarily exist in the dictionary.

## Brief change log

- replaced `Double.MIN_VALUE` with the intended (and original) 
`-Double.MAX_VALUE`
- added `Graph#mapVertices` and `Graph#mapEdges` variants accepting 
`TypeHint` rather than `TypeInformation`; this seemed useful when using Java 8 
lambdas which require explicit type definition with OpenJDK
- added additional tests for `CommunityDetection`

## Verifying this change

`CommunityDetectionTest#testWithSingletonEdgeGraph` is a user supplied 
example which fails without the bug fix.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
5506_communitydetection_nullpointerexception

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5126


commit 8d57371616a742ba29548192d5d8a4018cafd26b
Author: Greg Hogan 
Date:   2017-12-05T17:36:18Z

[hotfix] [gelly] Graph API mapVertices/mapEdges with TypeHint

commit 8c131b42cfa95df34333a9ca733c9337f75695ba
Author: Greg Hogan 
Date:   2017-12-05T17:38:50Z

[FLINK-5506] [gelly] Fix CommunityDetection NullPointerException

Double.MIN_VALUE != min(double)




---


[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279100#comment-16279100
 ] 

ASF GitHub Bot commented on FLINK-8174:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5114
  
@tillrohrmann I believe your feedback has been addressed, please take a 
look thanks.


> Mesos RM unable to accept offers for unreserved resources
> -
>
> Key: FLINK-8174
> URL: https://issues.apache.org/jira/browse/FLINK-8174
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5114: [FLINK-8174] [mesos] Mesos RM unable to accept offers for...

2017-12-05 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5114
  
@tillrohrmann I believe your feedback has been addressed, please take a 
look thanks.


---


[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278956#comment-16278956
 ] 

ASF GitHub Bot commented on FLINK-8174:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r155026726
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -185,7 +189,7 @@ class LaunchCoordinator(
 // process the assignments into a set of operations (reserve 
and/or launch)
 val slaveId = assignments.getLeasesUsed.get(0).getOffer.getSlaveId
 val offerIds = 
assignments.getLeasesUsed.asScala.map(_.getOffer.getId)
-val operations = processAssignments(slaveId, assignments, 
remaining.toMap)
+val operations = processAssignments(LOG, slaveId, assignments, 
remaining.toMap)
--- End diff --

The issue is that the class extends `Actor` which has a `log` field that we 
don't intend to use.  I will use `log` in `processAssignments`.


> Mesos RM unable to accept offers for unreserved resources
> -
>
> Key: FLINK-8174
> URL: https://issues.apache.org/jira/browse/FLINK-8174
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5114: [FLINK-8174] [mesos] Mesos RM unable to accept off...

2017-12-05 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r155026726
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -185,7 +189,7 @@ class LaunchCoordinator(
 // process the assignments into a set of operations (reserve 
and/or launch)
 val slaveId = assignments.getLeasesUsed.get(0).getOffer.getSlaveId
 val offerIds = 
assignments.getLeasesUsed.asScala.map(_.getOffer.getId)
-val operations = processAssignments(slaveId, assignments, 
remaining.toMap)
+val operations = processAssignments(LOG, slaveId, assignments, 
remaining.toMap)
--- End diff --

The issue is that the class extends `Actor` which has a `log` field that we 
don't intend to use.  I will use `log` in `processAssignments`.


---


[jira] [Commented] (FLINK-8085) Thin out the LogicalSlot interface

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278898#comment-16278898
 ] 

ASF GitHub Bot commented on FLINK-8085:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155018207
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
 
+   /**
+* Checks that the {@link Execution} termination future is only 
completed after the
+* assigned slot has been released.
+*
+* NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+* to execute this test multiple times to see the failure.
+*/
+   @Test
+   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+   final JobVertexID jobVertexId = new JobVertexID();
+   final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+   final SimpleSlot slot = new SimpleSlot(
+   new JobID(),
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway());
+
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+   slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+
+   Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+   CompletableFuture returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+   CompletableFuture terminationFuture = 
executionVertex.cancel();
+
+   // run canceling in a separate thread to allow an interleaving 
between termination
+   // future callback registrations
+   CompletableFuture.runAsync(
+   () -> currentExecutionAttempt.cancelingComplete(),
+   TestingUtils.defaultExecutor());
--- End diff --

If an executor is created, where is it shutdown afterwards? I think this 
should be just fine:

```
new Thread() {
@Override
public void run() {
currentExecutionAttempt.cancelingComplete();
}
}.start();
```


> Thin out the LogicalSlot interface
> --
>
> Key: FLINK-8085
> URL: https://issues.apache.org/jira/browse/FLINK-8085
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{LogicalSlot}} interface contains method which we don't strictly need 
> (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the 
> {{LogicalSlot}} from the {{Execution}} by only setting the 
> {{ExecutionAttemptID}} instead of {{Execution}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155018207
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
 
+   /**
+* Checks that the {@link Execution} termination future is only 
completed after the
+* assigned slot has been released.
+*
+* NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+* to execute this test multiple times to see the failure.
+*/
+   @Test
+   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+   final JobVertexID jobVertexId = new JobVertexID();
+   final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+   final SimpleSlot slot = new SimpleSlot(
+   new JobID(),
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway());
+
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+   slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+
+   Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+   CompletableFuture returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+   CompletableFuture terminationFuture = 
executionVertex.cancel();
+
+   // run canceling in a separate thread to allow an interleaving 
between termination
+   // future callback registrations
+   CompletableFuture.runAsync(
+   () -> currentExecutionAttempt.cancelingComplete(),
+   TestingUtils.defaultExecutor());
--- End diff --

If an executor is created, where is it shutdown afterwards? I think this 
should be just fine:

```
new Thread() {
@Override
public void run() {
currentExecutionAttempt.cancelingComplete();
}
}.start();
```


---


[jira] [Commented] (FLINK-8085) Thin out the LogicalSlot interface

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278891#comment-16278891
 ] 

ASF GitHub Bot commented on FLINK-8085:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155016510
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
 
+   /**
+* Checks that the {@link Execution} termination future is only 
completed after the
+* assigned slot has been released.
+*
+* NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+* to execute this test multiple times to see the failure.
+*/
+   @Test
+   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+   final JobVertexID jobVertexId = new JobVertexID();
+   final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+   final SimpleSlot slot = new SimpleSlot(
+   new JobID(),
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway());
+
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+   slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+
+   Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+   CompletableFuture returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+   CompletableFuture terminationFuture = 
executionVertex.cancel();
+
+   // run canceling in a separate thread to allow an interleaving 
between termination
+   // future callback registrations
+   CompletableFuture.runAsync(
+   () -> currentExecutionAttempt.cancelingComplete(),
+   TestingUtils.defaultExecutor());
+
+   // to increase probability for problematic interleaving, let 
the current thread yield the processor
+   Thread.yield();
+
+   CompletableFuture restartFuture = 
terminationFuture.thenApply(
+   ignored -> {
+   try {
+   assertTrue(returnedSlotFuture.isDone());
+   } catch (Exception e) {
+   throw new CompletionException(e);
--- End diff --

`isDone` does not throw an exception. `assertTrue` throws an `Error`, which 
is not an `Exception`. Is it possible to get into this code path?


> Thin out the LogicalSlot interface
> --
>
> Key: FLINK-8085
> URL: https://issues.apache.org/jira/browse/FLINK-8085
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{LogicalSlot}} interface contains method which we don't strictly need 
> (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the 
> {{LogicalSlot}} from the {{Execution}} by only setting the 
> {{ExecutionAttemptID}} instead of {{Execution}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155016510
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
 
+   /**
+* Checks that the {@link Execution} termination future is only 
completed after the
+* assigned slot has been released.
+*
+* NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+* to execute this test multiple times to see the failure.
+*/
+   @Test
+   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+   final JobVertexID jobVertexId = new JobVertexID();
+   final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+   final SimpleSlot slot = new SimpleSlot(
+   new JobID(),
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway());
+
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+   slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+
+   Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+   CompletableFuture returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+   CompletableFuture terminationFuture = 
executionVertex.cancel();
+
+   // run canceling in a separate thread to allow an interleaving 
between termination
+   // future callback registrations
+   CompletableFuture.runAsync(
+   () -> currentExecutionAttempt.cancelingComplete(),
+   TestingUtils.defaultExecutor());
+
+   // to increase probability for problematic interleaving, let 
the current thread yield the processor
+   Thread.yield();
+
+   CompletableFuture restartFuture = 
terminationFuture.thenApply(
+   ignored -> {
+   try {
+   assertTrue(returnedSlotFuture.isDone());
+   } catch (Exception e) {
+   throw new CompletionException(e);
--- End diff --

`isDone` does not throw an exception. `assertTrue` throws an `Error`, which 
is not an `Exception`. Is it possible to get into this code path?


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155015331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 ---
@@ -294,11 +297,11 @@ public void testScheduleWithDyingInstances() {

i2.markDead();

-   for (SimpleSlot slot : slots) {
-   if (slot.getOwner() == i2) {
-   assertTrue(slot.isCanceled());
+   for (LogicalSlot slot : slots) {
+   if 
(Objects.equals(slot.getTaskManagerLocation().getResourceID(), 
i2.getTaskManagerID())) {
--- End diff --

I think `slot.getTaskManagerLocation().getResourceID()` cannot return null. 
Is there a need to use `Objects.equals`?


---


[jira] [Commented] (FLINK-8085) Thin out the LogicalSlot interface

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278883#comment-16278883
 ] 

ASF GitHub Bot commented on FLINK-8085:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155015331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 ---
@@ -294,11 +297,11 @@ public void testScheduleWithDyingInstances() {

i2.markDead();

-   for (SimpleSlot slot : slots) {
-   if (slot.getOwner() == i2) {
-   assertTrue(slot.isCanceled());
+   for (LogicalSlot slot : slots) {
+   if 
(Objects.equals(slot.getTaskManagerLocation().getResourceID(), 
i2.getTaskManagerID())) {
--- End diff --

I think `slot.getTaskManagerLocation().getResourceID()` cannot return null. 
Is there a need to use `Objects.equals`?


> Thin out the LogicalSlot interface
> --
>
> Key: FLINK-8085
> URL: https://issues.apache.org/jira/browse/FLINK-8085
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{LogicalSlot}} interface contains method which we don't strictly need 
> (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the 
> {{LogicalSlot}} from the {{Execution}} by only setting the 
> {{ExecutionAttemptID}} instead of {{Execution}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8085) Thin out the LogicalSlot interface

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278868#comment-16278868
 ] 

ASF GitHub Bot commented on FLINK-8085:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155012983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -331,8 +334,19 @@ public void setInitialState(TaskStateSnapshot 
checkpointStateHandles) {
 *
 * @return A future for the execution's termination
 */
-   public CompletableFuture getTerminationFuture() {
-   return terminationFuture;
+   @Override
+   public CompletableFuture getTerminalStateFuture() {
+   return terminalStateFuture;
+   }
+
+   /**
+* Gets the release future which is completed once the execution 
reaches a terminal
+* state and the assigned resource has been released.
+*
+* @return
--- End diff --

`@return` tag can be removed.


> Thin out the LogicalSlot interface
> --
>
> Key: FLINK-8085
> URL: https://issues.apache.org/jira/browse/FLINK-8085
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{LogicalSlot}} interface contains method which we don't strictly need 
> (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the 
> {{LogicalSlot}} from the {{Execution}} by only setting the 
> {{ExecutionAttemptID}} instead of {{Execution}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155012983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -331,8 +334,19 @@ public void setInitialState(TaskStateSnapshot 
checkpointStateHandles) {
 *
 * @return A future for the execution's termination
 */
-   public CompletableFuture getTerminationFuture() {
-   return terminationFuture;
+   @Override
+   public CompletableFuture getTerminalStateFuture() {
+   return terminalStateFuture;
+   }
+
+   /**
+* Gets the release future which is completed once the execution 
reaches a terminal
+* state and the assigned resource has been released.
+*
+* @return
--- End diff --

`@return` tag can be removed.


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278865#comment-16278865
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5125
  
There are a few reason for that:

1) It keeps the `Outputs` immutable. We need all metrics to be available up 
front before creating the output, but since the OperatorIOMetricGroup is 
created in `StreamOperator#setup` (which requires the Output) this just doesn't 
work. We could create the OperatorMetricGroup before calling setup, but that 
would mean a) moving even more things into the OperatorChain (and that thing is 
loaded _as is_) b) requiring a way to pass said group to the operator (or 
living with a duplicate call to `TaskMetricGroup#addOperator`).

2) It's hard to support having the same watermark for in/output" with 
pre-registered metrics, as these would by design be independent.


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5125: [WIP][FLINK-4812][metrics] Expose currentLowWatermark for...

2017-12-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5125
  
There are a few reason for that:

1) It keeps the `Outputs` immutable. We need all metrics to be available up 
front before creating the output, but since the OperatorIOMetricGroup is 
created in `StreamOperator#setup` (which requires the Output) this just doesn't 
work. We could create the OperatorMetricGroup before calling setup, but that 
would mean a) moving even more things into the OperatorChain (and that thing is 
loaded _as is_) b) requiring a way to pass said group to the operator (or 
living with a duplicate call to `TaskMetricGroup#addOperator`).

2) It's hard to support having the same watermark for in/output" with 
pre-registered metrics, as these would by design be independent.


---


[jira] [Commented] (FLINK-8193) Rework quickstart exclusions

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278843#comment-16278843
 ] 

ASF GitHub Bot commented on FLINK-8193:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5118
  
+1 for the latest suggestion


> Rework quickstart exclusions
> 
>
> Key: FLINK-8193
> URL: https://issues.apache.org/jira/browse/FLINK-8193
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Quickstarts
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> The quickstart poms currently contain 2 entirely separate dependency 
> exclusion mechanisms:
> A) manual exclusions of dependencies (including transitive dependencies) via 
> the {{artifactSet}} of the {{maven-shade-plugin}}
> B) automatic inclusion by marking dependencies as {{}} with the 
> {{build-jar}} profile
> I propose removing A) entirely. By default, all dependencies will be included 
> in the jar to allow execution in the IDE (which setting all dependencies to 
> provided would not). To execute things on a cluster we now always refer to 
> Option B), which is trivial to maintain and also reliable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5118: [FLINK-8193][quickstart] Cleanup quickstart poms

2017-12-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5118
  
+1 for the latest suggestion


---


[GitHub] flink issue #5125: [WIP][FLINK-4812][metrics] Expose currentLowWatermark for...

2017-12-05 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5125
  
What's a bit strange is that other input/output related metrics are created 
in `OperatorIOMetricGroup` and then handed to the `Outputs` while the watermark 
gauges are created in the `Outputs` and then handed to the operator metrics.


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278839#comment-16278839
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5125
  
What's a bit strange is that other input/output related metrics are created 
in `OperatorIOMetricGroup` and then handed to the `Outputs` while the watermark 
gauges are created in the `Outputs` and then handed to the operator metrics.


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278832#comment-16278832
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK , sorry I missed that synchronization comment, I may submit the 
updates tomorrow for the latest comments.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278831#comment-16278831
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r155004802
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

yes


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r155004802
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

yes


---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK , sorry I missed that synchronization comment, I may submit the 
updates tomorrow for the latest comments.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278826#comment-16278826
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
@NicoK , thanks for the patch you provided offline, and I submit the new 
commit for your changes. :)


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
@NicoK , thanks for the patch you provided offline, and I submit the new 
commit for your changes. :)


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278822#comment-16278822
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r155002960
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

so for two-input operators you will get three metrics: "input1", "input2", 
and "input" where the last is the combined min, right?


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r155002960
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

so for two-input operators you will get three metrics: "input1", "input2", 
and "input" where the last is the combined min, right?


---


[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278817#comment-16278817
 ] 

ASF GitHub Bot commented on FLINK-8174:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r155000484
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -217,15 +222,13 @@ public String toString() {

dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 
taskManagerHostname);
}
 
-   // use the assigned ports for the TM
-   if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) 
{
-   throw new IllegalArgumentException("unsufficient # of 
ports assigned");
-   }
-   for (int i = 0; i < TM_PORT_KEYS.length; i++) {
-   int port = assignment.getAssignedPorts().get(i);
-   String key = TM_PORT_KEYS[i];
-   taskInfo.addResources(ranges("ports", 
mesosConfiguration.frameworkInfo().getRole(), range(port, port)));
-   dynamicProperties.setInteger(key, port);
+   // take needed ports for the TM
+   List portResources = 
allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
+   taskInfo.addAllResources(portResources);
+   Iterator portsToAssign = 
Iterators.forArray(TM_PORT_KEYS);
+   rangeValues(portResources).forEach(port -> 
dynamicProperties.setLong(portsToAssign.next(), port));
--- End diff --

A call to `takeRanges` won't give more than requested so it won't happen.


> Mesos RM unable to accept offers for unreserved resources
> -
>
> Key: FLINK-8174
> URL: https://issues.apache.org/jira/browse/FLINK-8174
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5114: [FLINK-8174] [mesos] Mesos RM unable to accept off...

2017-12-05 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r155000484
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -217,15 +222,13 @@ public String toString() {

dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 
taskManagerHostname);
}
 
-   // use the assigned ports for the TM
-   if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) 
{
-   throw new IllegalArgumentException("unsufficient # of 
ports assigned");
-   }
-   for (int i = 0; i < TM_PORT_KEYS.length; i++) {
-   int port = assignment.getAssignedPorts().get(i);
-   String key = TM_PORT_KEYS[i];
-   taskInfo.addResources(ranges("ports", 
mesosConfiguration.frameworkInfo().getRole(), range(port, port)));
-   dynamicProperties.setInteger(key, port);
+   // take needed ports for the TM
+   List portResources = 
allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
+   taskInfo.addAllResources(portResources);
+   Iterator portsToAssign = 
Iterators.forArray(TM_PORT_KEYS);
+   rangeValues(portResources).forEach(port -> 
dynamicProperties.setLong(portsToAssign.next(), port));
--- End diff --

A call to `takeRanges` won't give more than requested so it won't happen.


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278813#comment-16278813
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154999100
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

oh wait. No this is different. For TwoINputStreamTasks we register the 
input1/2 watermark metrics here, but the common min watermark metric in the 
operator chain.


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278814#comment-16278814
 ] 

ASF GitHub Bot commented on FLINK-8174:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r154999162
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -148,14 +149,17 @@ class LaunchCoordinator(
 
 case Event(offers: ResourceOffers, data: GatherData) =>
   val leases = offers.offers().asScala.map(
-new VMLeaseObject(_).asInstanceOf[VirtualMachineLease])
+new Offer(_).asInstanceOf[VirtualMachineLease])
   if(LOG.isInfoEnabled) {
 val (cpus, mem) = leases.foldLeft((0.0,0.0)) {
   (z,o) => (z._1 + o.cpuCores(), z._2 + o.memoryMB())
 }
 LOG.info(s"Received offer(s) of $mem MB, $cpus cpus:")
 for(l <- leases) {
-  LOG.info(s"  ${l.getId} from ${l.hostname()} of ${l.memoryMB()} 
MB, ${l.cpuCores()} cpus")
+  val reservations = 
l.asInstanceOf[Offer].getResources.asScala.map(_.getRole).toSet
--- End diff --

(see above)


> Mesos RM unable to accept offers for unreserved resources
> -
>
> Key: FLINK-8174
> URL: https://issues.apache.org/jira/browse/FLINK-8174
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5114: [FLINK-8174] [mesos] Mesos RM unable to accept off...

2017-12-05 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r154999162
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -148,14 +149,17 @@ class LaunchCoordinator(
 
 case Event(offers: ResourceOffers, data: GatherData) =>
   val leases = offers.offers().asScala.map(
-new VMLeaseObject(_).asInstanceOf[VirtualMachineLease])
+new Offer(_).asInstanceOf[VirtualMachineLease])
   if(LOG.isInfoEnabled) {
 val (cpus, mem) = leases.foldLeft((0.0,0.0)) {
   (z,o) => (z._1 + o.cpuCores(), z._2 + o.memoryMB())
 }
 LOG.info(s"Received offer(s) of $mem MB, $cpus cpus:")
 for(l <- leases) {
-  LOG.info(s"  ${l.getId} from ${l.hostname()} of ${l.memoryMB()} 
MB, ${l.cpuCores()} cpus")
+  val reservations = 
l.asInstanceOf[Offer].getResources.asScala.map(_.getRole).toSet
--- End diff --

(see above)


---


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154999100
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

oh wait. No this is different. For TwoINputStreamTasks we register the 
input1/2 watermark metrics here, but the common min watermark metric in the 
operator chain.


---


[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278806#comment-16278806
 ] 

ASF GitHub Bot commented on FLINK-8174:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r154998531
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -148,14 +149,17 @@ class LaunchCoordinator(
 
 case Event(offers: ResourceOffers, data: GatherData) =>
   val leases = offers.offers().asScala.map(
-new VMLeaseObject(_).asInstanceOf[VirtualMachineLease])
+new Offer(_).asInstanceOf[VirtualMachineLease])
--- End diff --

If I recall correctly, The collection must be of a certain type.


> Mesos RM unable to accept offers for unreserved resources
> -
>
> Key: FLINK-8174
> URL: https://issues.apache.org/jira/browse/FLINK-8174
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154998623
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

i think



---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278808#comment-16278808
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154998623
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

i think



> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278807#comment-16278807
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154998584
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

The headoperator is never touched in the OperatorChain; we only setup 
chained operators.


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154998584
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

The headoperator is never touched in the OperatorChain; we only setup 
chained operators.


---


[GitHub] flink pull request #5114: [FLINK-8174] [mesos] Mesos RM unable to accept off...

2017-12-05 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5114#discussion_r154998531
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -148,14 +149,17 @@ class LaunchCoordinator(
 
 case Event(offers: ResourceOffers, data: GatherData) =>
   val leases = offers.offers().asScala.map(
-new VMLeaseObject(_).asInstanceOf[VirtualMachineLease])
+new Offer(_).asInstanceOf[VirtualMachineLease])
--- End diff --

If I recall correctly, The collection must be of a certain type.


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278793#comment-16278793
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154986786
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

Doesn't this override what the `OperatorChain` does according to the 
`WatermarkMetricPreference`?


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278795#comment-16278795
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154951890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
 ---
@@ -36,7 +36,7 @@
@SuppressWarnings({"unchecked", "rawtypes"})
public CopyingDirectedOutput(
List outputSelectors,
-   List, StreamEdge>> 
outputs) {
+   List>, StreamEdge>> outputs) {
--- End diff --

nice  


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278794#comment-16278794
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154986979
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -135,4 +135,14 @@ OperatorSnapshotResult snapshotState(
MetricGroup getMetricGroup();
 
OperatorID getOperatorID();
+   
--- End diff --

If we keep this it should be marked `@Internal`.


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154951890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
 ---
@@ -36,7 +36,7 @@
@SuppressWarnings({"unchecked", "rawtypes"})
public CopyingDirectedOutput(
List outputSelectors,
-   List, StreamEdge>> 
outputs) {
+   List>, StreamEdge>> outputs) {
--- End diff --

nice 😉 


---


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278791#comment-16278791
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5062#discussion_r154955042
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1480,7 +1259,52 @@ public String fold(String accumulator, 
Tuple2 value) throws Excep
 
/   General Utility Methods 
//
 
-   private CompletableFuture 
notifyWhenJobStatusIs(
+   /**
+* A wrapper of the job graph that makes sure to cancell the job and 
wait for
--- End diff --

typo: cancell -> cancel


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154986979
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -135,4 +135,14 @@ OperatorSnapshotResult snapshotState(
MetricGroup getMetricGroup();
 
OperatorID getOperatorID();
+   
--- End diff --

If we keep this it should be marked `@Internal`.


---


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r154986786
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

Doesn't this override what the `OperatorChain` does according to the 
`WatermarkMetricPreference`?


---


[GitHub] flink pull request #5062: [FLINK-7880][QS] Wait for proper resource cleanup ...

2017-12-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5062#discussion_r154955042
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1480,7 +1259,52 @@ public String fold(String accumulator, 
Tuple2 value) throws Excep
 
/   General Utility Methods 
//
 
-   private CompletableFuture 
notifyWhenJobStatusIs(
+   /**
+* A wrapper of the job graph that makes sure to cancell the job and 
wait for
--- End diff --

typo: cancell -> cancel


---


[jira] [Commented] (FLINK-8201) YarnResourceManagerTest causes license checking failure

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278779#comment-16278779
 ] 

ASF GitHub Bot commented on FLINK-8201:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5123
  
Ok, sounds good to me. @tillrohrmann could you please have a look at this? 
It's a very small change but I know too little about these parts to be sure 
that it doesn't cause other problems.


> YarnResourceManagerTest causes license checking failure
> ---
>
> Key: FLINK-8201
> URL: https://issues.apache.org/jira/browse/FLINK-8201
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> YarnResourceManagerTest generates a temporary taskmanager config file in 
> flink-yarn module 
>  root folder and never clear it, which makes license checking fail when we 
> run {{mvn clean verify}} multiple times in the same source folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5123: [FLINK-8201] YarnResourceManagerTest causes license check...

2017-12-05 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5123
  
Ok, sounds good to me. @tillrohrmann could you please have a look at this? 
It's a very small change but I know too little about these parts to be sure 
that it doesn't cause other problems.


---


[jira] [Closed] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8186.
---
Resolution: Fixed

Fixed on release-1.4 in
612f07bbd2dc9aa1abf01ebf817c8341c1ad7a10

Fixed on master in 
f6e24ab60c3d434186d9cd9906c6562433617ff7

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278745#comment-16278745
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4533
  
oh, you were faster...


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2017-12-05 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4533
  
oh, you were faster...


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278730#comment-16278730
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4533
  
ok, I fiddled with my remarks a bit and you are right for the problems of 
reducing the visibility, which is partly caused by the 
`CreditBasedClientHandler` not replacing `PartitionRequestClientHandler` yet.

I'll send you my suggestions offline (as I can't attach them here) and they 
should fix all my last remarks.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2017-12-05 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4533
  
ok, I fiddled with my remarks a bit and you are right for the problems of 
reducing the visibility, which is partly caused by the 
`CreditBasedClientHandler` not replacing `PartitionRequestClientHandler` yet.

I'll send you my suggestions offline (as I can't attach them here) and they 
should fix all my last remarks.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278721#comment-16278721
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
@NicoK @pnowojski , I have updated the codes for above comments.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
@NicoK @pnowojski , I have updated the codes for above comments.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278717#comment-16278717
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154977986
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
--- End diff --

agree


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154977986
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
--- End diff --

agree


---


[jira] [Commented] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278715#comment-16278715
 ] 

ASF GitHub Bot commented on FLINK-8200:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/5122
  
@StephanEwen Thanks for reviewing, I have update the pr according to your 
comment.


> RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name
> -
>
> Key: FLINK-8200
> URL: https://issues.apache.org/jira/browse/FLINK-8200
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> The following case failed when different user run the test in the same 
> machine.
> Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< 
> FAILURE! - in 
> org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest
> testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest)
>   Time elapsed: 0.023 sec  <<< ERROR!
> java.io.IOException: No local storage directories available. Local DB files 
> directory 'file:/tmp/foobar' does not exist and cannot be created.
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5122: [FLINK-8200] RocksDBAsyncSnapshotTest should use temp fol...

2017-12-05 Thread wenlong88
Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/5122
  
@StephanEwen Thanks for reviewing, I have update the pr according to your 
comment.


---


[jira] [Commented] (FLINK-8201) YarnResourceManagerTest causes license checking failure

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278711#comment-16278711
 ] 

ASF GitHub Bot commented on FLINK-8201:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/5123
  
@aljoscha yes, the task manager config file is created in $PWD dir, and 
files created in temporary folder will be cleared automatically by junit.


> YarnResourceManagerTest causes license checking failure
> ---
>
> Key: FLINK-8201
> URL: https://issues.apache.org/jira/browse/FLINK-8201
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> YarnResourceManagerTest generates a temporary taskmanager config file in 
> flink-yarn module 
>  root folder and never clear it, which makes license checking fail when we 
> run {{mvn clean verify}} multiple times in the same source folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5123: [FLINK-8201] YarnResourceManagerTest causes license check...

2017-12-05 Thread wenlong88
Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/5123
  
@aljoscha yes, the task manager config file is created in $PWD dir, and 
files created in temporary folder will be cleared automatically by junit.


---


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278686#comment-16278686
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154961218
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -98,7 +98,9 @@ abstract class BatchTableEnvironment(
 
 tableSource match {
   case batchTableSource: BatchTableSource[_] =>
-registerTableInternal(name, new 
BatchTableSourceTable(batchTableSource))
+val table = new BatchTableSourceTable(batchTableSource)
+checkValidTableSourceType(tableSource)
+registerTableInternal(name, table)
--- End diff --

Couldn't we just add the check in `registerTableInternal`?


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278688#comment-16278688
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154965051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
+val types = table.getSchema.getTypes
+checkTypeArray(types)
+  }
+
+  private def checkTypeArray(types: Array[TypeInformation[_]]) = {
+for (typeInfo <- types) {
+  if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType &&
+!typeInfo.isInstanceOf[PrimitiveArrayTypeInfo[_]]) {
--- End diff --

Also add `BasicArrayTypeInfo` and `ObjectArrayTypeInfo`. For 
`ObjectArrayTypeInfo` we need to check the component types as well.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278687#comment-16278687
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154967459
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -411,6 +411,7 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 }
 
 checkValidTableName(name)
+checkValidTableType(table)
--- End diff --

I would call this method `validateInputTypes` and rename 
`org.apache.flink.table.api.TableEnvironment#validateType` to 
`validateOutputTypes`.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278684#comment-16278684
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154972475
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
--- End diff --

agree


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278689#comment-16278689
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154968022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
--- End diff --

Please merge the following three methods into one and move it to companion 
object.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278685#comment-16278685
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154964510
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
+val types = table.getSchema.getTypes
+checkTypeArray(types)
+  }
+
+  private def checkTypeArray(types: Array[TypeInformation[_]]) = {
+for (typeInfo <- types) {
+  if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType &&
--- End diff --

Missing space.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154965051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
+val types = table.getSchema.getTypes
+checkTypeArray(types)
+  }
+
+  private def checkTypeArray(types: Array[TypeInformation[_]]) = {
+for (typeInfo <- types) {
+  if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType &&
+!typeInfo.isInstanceOf[PrimitiveArrayTypeInfo[_]]) {
--- End diff --

Also add `BasicArrayTypeInfo` and `ObjectArrayTypeInfo`. For 
`ObjectArrayTypeInfo` we need to check the component types as well.


---


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154964510
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
+val types = table.getSchema.getTypes
+checkTypeArray(types)
+  }
+
+  private def checkTypeArray(types: Array[TypeInformation[_]]) = {
+for (typeInfo <- types) {
+  if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType &&
--- End diff --

Missing space.


---


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154961218
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -98,7 +98,9 @@ abstract class BatchTableEnvironment(
 
 tableSource match {
   case batchTableSource: BatchTableSource[_] =>
-registerTableInternal(name, new 
BatchTableSourceTable(batchTableSource))
+val table = new BatchTableSourceTable(batchTableSource)
+checkValidTableSourceType(tableSource)
+registerTableInternal(name, table)
--- End diff --

Couldn't we just add the check in `registerTableInternal`?


---


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154967459
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -411,6 +411,7 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 }
 
 checkValidTableName(name)
+checkValidTableType(table)
--- End diff --

I would call this method `validateInputTypes` and rename 
`org.apache.flink.table.api.TableEnvironment#validateType` to 
`validateOutputTypes`.


---


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r154968022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
--- End diff --

Please merge the following three methods into one and move it to companion 
object.


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154972475
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
--- End diff --

agree


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278681#comment-16278681
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154972024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -280,94 +280,120 @@ public String toString() {
// 

 
/**
-* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+* Enqueue this input channel in the pipeline for notifying the 
producer of unannounced credit.
 */
void notifyCreditAvailable() {
-   //TODO in next PR
+   checkState(partitionRequestClient != null, "Tried to send task 
event to producer before requesting a queue.");
+
+   // We should skip the notification if this channel is already 
released.
+   if (!isReleased.get()) {
+   partitionRequestClient.notifyCreditAvailable(this);
+   }
}
 
/**
-* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
-* credit to producer.
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger return extra
+* floating buffer and notify increased credit to the producer.
 *
 * @param segment The exclusive segment of this channel.
 */
@Override
public void recycle(MemorySegment segment) {
-   synchronized (availableBuffers) {
-   // Important: the isReleased check should be inside the 
synchronized block.
-   // that way the segment can also be returned to global 
pool after added into
-   // the available queue during releasing all resources.
+   int numAddedBuffers;
+
+   synchronized (bufferQueue) {
+   // Important: check the isReleased state inside 
synchronized block, so there is no
+   // race condition when recycle and releaseAllResources 
running in parallel.
if (isReleased.get()) {
try {
-   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
return;
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
-   availableBuffers.add(new Buffer(segment, this));
+   numAddedBuffers = bufferQueue.addExclusiveBuffer(new 
Buffer(segment, this), numRequiredBuffers);
}
 
-   if (unannouncedCredit.getAndAdd(1) == 0) {
+   if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) 
{
notifyCreditAvailable();
}
}
 
public int getNumberOfAvailableBuffers() {
-   synchronized (availableBuffers) {
-   return availableBuffers.size();
+   synchronized (bufferQueue) {
+   return bufferQueue.getAvailableBufferSize();
}
}
 
+   @VisibleForTesting
+   public int getNumberOfRequiredBuffers() {
--- End diff --

yes


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian 

[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154972024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -280,94 +280,120 @@ public String toString() {
// 

 
/**
-* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+* Enqueue this input channel in the pipeline for notifying the 
producer of unannounced credit.
 */
void notifyCreditAvailable() {
-   //TODO in next PR
+   checkState(partitionRequestClient != null, "Tried to send task 
event to producer before requesting a queue.");
+
+   // We should skip the notification if this channel is already 
released.
+   if (!isReleased.get()) {
+   partitionRequestClient.notifyCreditAvailable(this);
+   }
}
 
/**
-* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
-* credit to producer.
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger return extra
+* floating buffer and notify increased credit to the producer.
 *
 * @param segment The exclusive segment of this channel.
 */
@Override
public void recycle(MemorySegment segment) {
-   synchronized (availableBuffers) {
-   // Important: the isReleased check should be inside the 
synchronized block.
-   // that way the segment can also be returned to global 
pool after added into
-   // the available queue during releasing all resources.
+   int numAddedBuffers;
+
+   synchronized (bufferQueue) {
+   // Important: check the isReleased state inside 
synchronized block, so there is no
+   // race condition when recycle and releaseAllResources 
running in parallel.
if (isReleased.get()) {
try {
-   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
return;
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
-   availableBuffers.add(new Buffer(segment, this));
+   numAddedBuffers = bufferQueue.addExclusiveBuffer(new 
Buffer(segment, this), numRequiredBuffers);
}
 
-   if (unannouncedCredit.getAndAdd(1) == 0) {
+   if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) 
{
notifyCreditAvailable();
}
}
 
public int getNumberOfAvailableBuffers() {
-   synchronized (availableBuffers) {
-   return availableBuffers.size();
+   synchronized (bufferQueue) {
+   return bufferQueue.getAvailableBufferSize();
}
}
 
+   @VisibleForTesting
+   public int getNumberOfRequiredBuffers() {
--- End diff --

yes


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278677#comment-16278677
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154971179
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -223,11 +229,13 @@ void notifySubpartitionConsumed() {
/**
 * Releases all exclusive and floating buffers, closes the partition 
request client.
 */
+   @VisibleForTesting
@Override
-   void releaseAllResources() throws IOException {
+   public void releaseAllResources() throws IOException {
--- End diff --

yes


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154971179
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -223,11 +229,13 @@ void notifySubpartitionConsumed() {
/**
 * Releases all exclusive and floating buffers, closes the partition 
request client.
 */
+   @VisibleForTesting
@Override
-   void releaseAllResources() throws IOException {
+   public void releaseAllResources() throws IOException {
--- End diff --

yes


---


[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278664#comment-16278664
 ] 

ASF GitHub Bot commented on FLINK-7692:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5115#discussion_r154968275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 ---
@@ -441,14 +441,14 @@ private MetricGroup addGroup(String name, boolean 
keyed) {
}
else {
// return a non-registered group that is 
immediately closed already
-   GenericMetricGroup closedGroup = 
createChildGroup(name);
+   GenericMetricGroup closedGroup = 
createChildGroupInKeyedContext(name);
--- End diff --

Have addressed it.


> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5115: [FLINK-7692] [metrics] Support user-defined variab...

2017-12-05 Thread tony810430
Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5115#discussion_r154968275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 ---
@@ -441,14 +441,14 @@ private MetricGroup addGroup(String name, boolean 
keyed) {
}
else {
// return a non-registered group that is 
immediately closed already
-   GenericMetricGroup closedGroup = 
createChildGroup(name);
+   GenericMetricGroup closedGroup = 
createChildGroupInKeyedContext(name);
--- End diff --

Have addressed it.


---


[jira] [Commented] (FLINK-8198) Useless check against -1 in LimitedConnectionsFileSystem#fromConfig

2017-12-05 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278663#comment-16278663
 ] 

Ted Yu commented on FLINK-8198:
---

Thanks for the fix, Stephan.

> Useless check against -1 in LimitedConnectionsFileSystem#fromConfig
> ---
>
> Key: FLINK-8198
> URL: https://issues.apache.org/jira/browse/FLINK-8198
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Stephan Ewen
>Priority: Minor
> Fix For: 1.4.0, 1.5.0
>
>
> {code}
> return new ConnectionLimitingSettings(
> totalLimit == -1 ? 0 : totalLimit,
> limitIn == -1 ? 0 : limitIn,
> limitOut == -1 ? 0 : limitOut,
> openTimeout,
> {code}
> If any of the 3 variables is negative, control would have returned in the if 
> block above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154965613
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
+
+   final private ResourceAggregateType type;
+
+   public Resource(String name, double value, 
ResourceAggregateType type) {
+   this.name = checkNotNull(name);
+   this.value = Double.valueOf(value);
+   this.type = checkNotNull(type);
+   }
+
+   Resource merge(Resource other) {
+   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
+   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
+   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
+
+   Double value = null;
+   switch (type) {
+   case AGGREGATE_TYPE_MAX :
+   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
+   break;
+
+   case AGGREGATE_TYPE_SUM:
+   default:
+   value = this.value + other.value;
+   }
+
+   Resource resource = create(value, type);
+   return resource;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   } else if (o != null && getClass() == o.getClass()) {
+   Resource other = (Resource) o;
+
+   return name.equals(other.name) && 
type.equals(other.type) && value.equals(other.value);
+   } else {
+   return false;
+   }
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name != null ? name.hashCode() : 0;
+   result = 31 * result + type.ordinal();
+   result = 31 * result + value.hashCode();
+   return result;
+   }
+
+   /**
+* create a resource of the same resource type
+*
+* @param value the value of the resource
+* @param type the aggregate type of the resource
+* @return a new instance of the sub resource
+*/
+   protected abstract Resource create(double value, 
ResourceAggregateType type);
+   }
+
+   /**
+* The GPU resource.
+*/
+   public static class GPUResource extends Resource {
+
+   public GPUResource(double value) {
+   this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+   }
+
+   public GPUResource(double value, ResourceAggregateType type) {
+   super("GPU", value, type);
+   }
+
+   @Override
+   public Resource create(double value, ResourceAggregateType 
type) {
+   return new GPUResource(value, type);
+   }
+   }
+
+   /**
+* The FPGA resource.
+*/
+   public static class FPGAResource extends Resource {

[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278654#comment-16278654
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154965118
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -37,11 +44,24 @@
  * Direct Memory Size
  * Native Memory Size
  * State Size
+ * Extended resources
  * 
  */
 @Internal
 public class ResourceSpec implements Serializable {
 
+   public enum ResourceAggregateType {
+   /**
+* Denotes keeping the sum of the values with same name when 
merging two resource specs for operator chaining
+*/
+   AGGREGATE_TYPE_SUM,
+
+   /**
+* Denotes keeping the max of the values with same name when 
merging two resource specs for operator chaining
+*/
+   AGGREGATE_TYPE_MAX
+   }
--- End diff --

Let's move this enum to `Resource`


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154965306
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
--- End diff --

Wrong order of keywords: `private final`


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154499927
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
+
+   final private ResourceAggregateType type;
+
+   public Resource(String name, double value, 
ResourceAggregateType type) {
+   this.name = checkNotNull(name);
+   this.value = Double.valueOf(value);
+   this.type = checkNotNull(type);
+   }
+
+   Resource merge(Resource other) {
+   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
+   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
+   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
+
+   Double value = null;
+   switch (type) {
+   case AGGREGATE_TYPE_MAX :
+   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
+   break;
+
+   case AGGREGATE_TYPE_SUM:
+   default:
+   value = this.value + other.value;
+   }
+
+   Resource resource = create(value, type);
+   return resource;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   } else if (o != null && getClass() == o.getClass()) {
+   Resource other = (Resource) o;
+
+   return name.equals(other.name) && 
type.equals(other.type) && value.equals(other.value);
+   } else {
+   return false;
+   }
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name != null ? name.hashCode() : 0;
+   result = 31 * result + type.ordinal();
+   result = 31 * result + value.hashCode();
+   return result;
+   }
+
+   /**
+* create a resource of the same resource type
--- End diff --

Capital letter


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278653#comment-16278653
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154500054
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -83,18 +102,23 @@ public ResourceSpec(double cpuCores, int 
heapMemoryInMB) {
 * @param directMemoryInMB The size of the java nio direct memory, in 
megabytes.
 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 * @param stateSizeInMB The state size for storing in checkpoint.
+* @param extendedResources The extended resources, associated with the 
resource manager used
 */
public ResourceSpec(
double cpuCores,
int heapMemoryInMB,
int directMemoryInMB,
int nativeMemoryInMB,
-   int stateSizeInMB) {
+   int stateSizeInMB,
+   Resource... extendedResources) {
--- End diff --

I think we have far too many constructors to create a `ResourceSpec`. I 
would suggest to offer a single constructor plus a builder for the 
`ResourceSpec`. This builder should in the initial version only allow to set a 
GPU resource and no other resources. Moreover, we should make this constructor 
protected.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154965118
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -37,11 +44,24 @@
  * Direct Memory Size
  * Native Memory Size
  * State Size
+ * Extended resources
  * 
  */
 @Internal
 public class ResourceSpec implements Serializable {
 
+   public enum ResourceAggregateType {
+   /**
+* Denotes keeping the sum of the values with same name when 
merging two resource specs for operator chaining
+*/
+   AGGREGATE_TYPE_SUM,
+
+   /**
+* Denotes keeping the max of the values with same name when 
merging two resource specs for operator chaining
+*/
+   AGGREGATE_TYPE_MAX
+   }
--- End diff --

Let's move this enum to `Resource`


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278651#comment-16278651
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154499904
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
--- End diff --

use primitive type


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278657#comment-16278657
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154965613
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
+
+   final private ResourceAggregateType type;
+
+   public Resource(String name, double value, 
ResourceAggregateType type) {
+   this.name = checkNotNull(name);
+   this.value = Double.valueOf(value);
+   this.type = checkNotNull(type);
+   }
+
+   Resource merge(Resource other) {
+   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
+   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
+   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
+
+   Double value = null;
+   switch (type) {
+   case AGGREGATE_TYPE_MAX :
+   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
+   break;
+
+   case AGGREGATE_TYPE_SUM:
+   default:
+   value = this.value + other.value;
+   }
+
+   Resource resource = create(value, type);
+   return resource;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   } else if (o != null && getClass() == o.getClass()) {
+   Resource other = (Resource) o;
+
+   return name.equals(other.name) && 
type.equals(other.type) && value.equals(other.value);
+   } else {
+   return false;
+   }
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name != null ? name.hashCode() : 0;
+   result = 31 * result + type.ordinal();
+   result = 31 * result + value.hashCode();
+   return result;
+   }
+
+   /**
+* create a resource of the same resource type
+*
+* @param value the value of the resource
+* @param type the aggregate type of the resource
+* @return a new instance of the sub resource
+*/
+   protected abstract Resource create(double value, 
ResourceAggregateType type);
+   }
+
+   /**
+* The GPU resource.
+*/
+   public static class GPUResource extends Resource {
+
+   public GPUResource(double value) {
+   this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+   }
+
+   public GPUResource(double value, ResourceAggregateType type) {
+   super("GPU", value, type);
+   }
+
+   @Override
+   public Resource create(double value, 

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154500054
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -83,18 +102,23 @@ public ResourceSpec(double cpuCores, int 
heapMemoryInMB) {
 * @param directMemoryInMB The size of the java nio direct memory, in 
megabytes.
 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 * @param stateSizeInMB The state size for storing in checkpoint.
+* @param extendedResources The extended resources, associated with the 
resource manager used
 */
public ResourceSpec(
double cpuCores,
int heapMemoryInMB,
int directMemoryInMB,
int nativeMemoryInMB,
-   int stateSizeInMB) {
+   int stateSizeInMB,
+   Resource... extendedResources) {
--- End diff --

I think we have far too many constructors to create a `ResourceSpec`. I 
would suggest to offer a single constructor plus a builder for the 
`ResourceSpec`. This builder should in the initial version only allow to set a 
GPU resource and no other resources. Moreover, we should make this constructor 
protected.


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278649#comment-16278649
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154499927
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
+
+   final private ResourceAggregateType type;
+
+   public Resource(String name, double value, 
ResourceAggregateType type) {
+   this.name = checkNotNull(name);
+   this.value = Double.valueOf(value);
+   this.type = checkNotNull(type);
+   }
+
+   Resource merge(Resource other) {
+   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
+   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
+   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
+
+   Double value = null;
+   switch (type) {
+   case AGGREGATE_TYPE_MAX :
+   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
+   break;
+
+   case AGGREGATE_TYPE_SUM:
+   default:
+   value = this.value + other.value;
+   }
+
+   Resource resource = create(value, type);
+   return resource;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   } else if (o != null && getClass() == o.getClass()) {
+   Resource other = (Resource) o;
+
+   return name.equals(other.name) && 
type.equals(other.type) && value.equals(other.value);
+   } else {
+   return false;
+   }
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name != null ? name.hashCode() : 0;
+   result = 31 * result + type.ordinal();
+   result = 31 * result + value.hashCode();
+   return result;
+   }
+
+   /**
+* create a resource of the same resource type
--- End diff --

Capital letter


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--

[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278650#comment-16278650
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154500041
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for ResourceSpec class, including its all public api: isValid, 
lessThanOrEqual, equals, hashCode and merge.
+ */
+public class ResourceSpecTest extends TestLogger {
+
+   @Test
+   public void testIsValid() throws Exception {
+   ResourceSpec rs = new ResourceSpec(1.0, 100);
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.GPUResource(1));
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.GPUResource(-1));
+   assertFalse(rs.isValid());
+   }
+
+   @Test
+   public void testLessThanOrEqual() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(1));
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+
+   ResourceSpec rs3 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2));
+   assertFalse(rs3.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs3));
+
+   ResourceSpec rs4 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.FPGAResource(1),
+   new ResourceSpec.GPUResource( 1));
+   assertFalse(rs3.lessThanOrEqual(rs4));
+   assertFalse(rs4.lessThanOrEqual(rs3));
+   }
+
+   @Test
+   public void testEquals() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.equals(rs2));
+   assertTrue(rs2.equals(rs1));
+
+   ResourceSpec rs3 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2.2));
+   ResourceSpec rs4 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource( 1));
+   assertFalse(rs3.equals(rs4));
+
+   ResourceSpec rs5 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2.2));
+   assertTrue(rs3.equals(rs5));
+
+   ResourceSpec rs6 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.FPGAResource(2),
+   new ResourceSpec.GPUResource( 0.5));
+   ResourceSpec rs7 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.FPGAResource( 2),
+   new ResourceSpec.GPUResource(0.5, 
ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX));
+   assertFalse(rs6.equals(rs7));
+   }
+
+   @Test
+   public void testHashCode() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertEquals(rs1.hashCode(), rs2.hashCode());
+
+   ResourceSpec rs3 = new 

[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278656#comment-16278656
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154965306
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
--- End diff --

Wrong order of keywords: `private final`


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154500041
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for ResourceSpec class, including its all public api: isValid, 
lessThanOrEqual, equals, hashCode and merge.
+ */
+public class ResourceSpecTest extends TestLogger {
+
+   @Test
+   public void testIsValid() throws Exception {
+   ResourceSpec rs = new ResourceSpec(1.0, 100);
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.GPUResource(1));
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.GPUResource(-1));
+   assertFalse(rs.isValid());
+   }
+
+   @Test
+   public void testLessThanOrEqual() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(1));
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+
+   ResourceSpec rs3 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2));
+   assertFalse(rs3.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs3));
+
+   ResourceSpec rs4 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.FPGAResource(1),
+   new ResourceSpec.GPUResource( 1));
+   assertFalse(rs3.lessThanOrEqual(rs4));
+   assertFalse(rs4.lessThanOrEqual(rs3));
+   }
+
+   @Test
+   public void testEquals() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.equals(rs2));
+   assertTrue(rs2.equals(rs1));
+
+   ResourceSpec rs3 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2.2));
+   ResourceSpec rs4 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource( 1));
+   assertFalse(rs3.equals(rs4));
+
+   ResourceSpec rs5 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2.2));
+   assertTrue(rs3.equals(rs5));
+
+   ResourceSpec rs6 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.FPGAResource(2),
+   new ResourceSpec.GPUResource( 0.5));
+   ResourceSpec rs7 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.FPGAResource( 2),
+   new ResourceSpec.GPUResource(0.5, 
ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX));
+   assertFalse(rs6.equals(rs7));
+   }
+
+   @Test
+   public void testHashCode() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertEquals(rs1.hashCode(), rs2.hashCode());
+
+   ResourceSpec rs3 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(2.2));
+   ResourceSpec rs4 = new ResourceSpec(1.0, 100, new 
ResourceSpec.FPGAResource(1));
+   assertFalse(rs3.hashCode() == rs4.hashCode());
+
+   

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154499904
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
--- End diff --

use primitive type


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154499917
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
+
+   final private ResourceAggregateType type;
+
+   public Resource(String name, double value, 
ResourceAggregateType type) {
+   this.name = checkNotNull(name);
+   this.value = Double.valueOf(value);
+   this.type = checkNotNull(type);
+   }
+
+   Resource merge(Resource other) {
+   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
+   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
+   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
+
+   Double value = null;
+   switch (type) {
+   case AGGREGATE_TYPE_MAX :
+   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
--- End diff --

`Math.max` should do the trick here.


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278652#comment-16278652
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154964974
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -133,14 +162,31 @@ public int getStateSize() {
return this.stateSizeInMB;
}
 
+   public Map getExtendedResources() {
+   Map resources = new 
HashMap<>(extendedResources.size());
+   for (Resource resource : extendedResources.values()) {
+   resources.put(resource.name, resource.value);
+   }
+   return Collections.unmodifiableMap(resources);
+   }
--- End diff --

Let's remove this method because it expose the internal resources.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278648#comment-16278648
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154499917
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   public static abstract class Resource implements Serializable {
+   final private String name;
+
+   final private Double value;
+
+   final private ResourceAggregateType type;
+
+   public Resource(String name, double value, 
ResourceAggregateType type) {
+   this.name = checkNotNull(name);
+   this.value = Double.valueOf(value);
+   this.type = checkNotNull(type);
+   }
+
+   Resource merge(Resource other) {
+   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
+   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
+   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
+
+   Double value = null;
+   switch (type) {
+   case AGGREGATE_TYPE_MAX :
+   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
--- End diff --

`Math.max` should do the trick here.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r154964117
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -61,18 +81,17 @@
/** How many state size in mb are used */
private final int stateSizeInMB;
 
+   private final Map extendedResources = new 
HashMap<>(1);
+
/**
 * Creates a new ResourceSpec with basic common resources.
 *
 * @param cpuCores The number of CPU cores (possibly fractional, i.e., 
0.2 cores)
 * @param heapMemoryInMB The size of the java heap memory, in megabytes.
+* @param extendedResources The extended resources, associated with the 
resource manager used
 */
-   public ResourceSpec(double cpuCores, int heapMemoryInMB) {
-   this.cpuCores = cpuCores;
-   this.heapMemoryInMB = heapMemoryInMB;
-   this.directMemoryInMB = 0;
-   this.nativeMemoryInMB = 0;
-   this.stateSizeInMB = 0;
+   public ResourceSpec(double cpuCores, int heapMemoryInMB, Resource... 
extendedResources) {
+   this(cpuCores, heapMemoryInMB, 0, 0, 0, extendedResources);
--- End diff --

Let's remove this constructor.


---


  1   2   3   >