[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-02-27 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol All right, got your point. That's a problem indeed. 


---


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-02-27 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
If some day the behavior of `RestClusterClient` changes. Maybe uploading 
the jars via REST API. We could just upload the user-defined files via REST API 
either. That's not a big problem, right?


---


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-02-27 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
Hi @zentol 

In Flip-6 the client communicates with cluster via REST API, that's true. 
However this not include blobs. Currently in `RestClusterClient.submit()` 
method, the client uses blob service to upload jars of user. So this PR does 
not break the rule.

I think this feature is quite good. Spark, even MapReduce support uploading 
user-defined jars/files/archives. I think we should support it. Users would be 
able to migrate to Flink more fluently with this feature.

What do you think?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-26 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r170804963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -314,6 +315,9 @@ public static TaskExecutor startTaskManager(
 
TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
+   final FileCache fileCache = new 
FileCache(taskManagerServicesConfiguration.getTmpDirPaths(),
--- End diff --

It seems that the `fileCache` here is not been used at all.


---


[GitHub] flink issue #5048: [Flink-7871] [flip6] SlotPool should release unused slots...

2018-02-09 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5048
  
Hi @tillrohrmann , I just updated the PR, sorry for delaying so long. 


---


[GitHub] flink pull request #5322: [hotfix] [REST] Fix WebMonitorEndpoint that missin...

2018-01-19 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5322

[hotfix] [REST] Fix WebMonitorEndpoint that missing parameters while 
initializing handlers

## What is the purpose of the change

* Fix bugs in my recent PRs about migrating subtask REST handlers

## Brief change log

* Some `MessageHeaders` are missing while initializing handlers in 
`WebMonitorEndpoint`
* There is wrong parameter type in 
`SubtaskExecutionAttemptAccumulatorsHeaders`

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-REST-handlers-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5322


commit 23dd66822352ae7d1d6f0e60494f9541c57714e8
Author: biao.liub <biao.liub@...>
Date:   2018-01-20T05:26:29Z

[hotfix] [REST] Fix WebMonitorEndpoint that missing parameters while 
initializing handlers




---


[GitHub] flink pull request #5287: [FLINK-8367] [REST] Migrate SubtaskCurrentAttemptD...

2018-01-12 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5287

[FLINK-8367] [REST] Migrate SubtaskCurrentAttemptDetailsHandler to Flip-6 
REST handler

## What is the purpose of the change

* Migrate 
`org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler`
 to Flip-6 `WebMonitorEndpoint`.
* This PR is based on [#5285](https://github.com/apache/flink/pull/5285)

## Brief change log

* Add `SubtaskCurrentAttemptDetailsHandler` in Flip-6 REST framework.

## Verifying this change

* This change added unit tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-8367

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5287.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5287


commit 16b3f11b96b1a8b0f290945af3d255d47739992e
Author: biao.liub <biao.liub@...>
Date:   2018-01-10T06:25:07Z

[FLINK-8368] Migrate 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler
 to new a REST handler that registered in WebMonitorEndpoint

commit 50eb6e72293abc02ef17707f3d70e069d2f2a463
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T08:38:55Z

[FLINK-8368] Add attempts path info that is missing in 
SubtaskExecutionAttemptDetailsHeaders

commit 97124dfe717702b46f0a105b78cf86f18ac063a9
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T08:33:08Z

[FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 
REST endpoint

commit 2617cf22f69bd1e7ec6e34056bce6120a1164bd9
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T08:33:57Z

[FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 
REST endpoint

commit 52199031f81c7a267c28cf340035bc4299ac7f3c
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T09:57:05Z

[FLINK-8369] Fix some compiling issues.

commit bd8e42b8e5442978e865ba0629534c555fcb1e16
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T13:37:48Z

[FLINK-8367] Migrate SubtaskCurrentAttemptDetailsHandler to new a REST 
handler




---


[GitHub] flink pull request #5285: [FLINK-8369] [REST] Migrate SubtaskExecutionAttemp...

2018-01-12 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5285

[FLINK-8369] [REST] Migrate SubtaskExecutionAttemptAccumulatorsHandler to 
new a REST handler

## What is the purpose of the change

* Migrate 
`org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler`
 to Flip-6 `WebMonitorEndpoint`.

* This PR is based on [#5270](https://github.com/apache/flink/pull/5270)

## Brief change log

* Add `SubtaskExecutionAttemptAccumulatorsHandler` in Flip-6 REST 
framework.
* Move inner class `UserAccumulator` to public, make it reusable.

## Verifying this change

* This change added unit tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-8369

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5285.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5285


commit 16b3f11b96b1a8b0f290945af3d255d47739992e
Author: biao.liub <biao.liub@...>
Date:   2018-01-10T06:25:07Z

[FLINK-8368] Migrate 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler
 to new a REST handler that registered in WebMonitorEndpoint

commit 50eb6e72293abc02ef17707f3d70e069d2f2a463
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T08:38:55Z

[FLINK-8368] Add attempts path info that is missing in 
SubtaskExecutionAttemptDetailsHeaders

commit 97124dfe717702b46f0a105b78cf86f18ac063a9
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T08:33:08Z

[FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 
REST endpoint

commit 2617cf22f69bd1e7ec6e34056bce6120a1164bd9
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T08:33:57Z

[FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 
REST endpoint

commit 52199031f81c7a267c28cf340035bc4299ac7f3c
Author: biao.liub <biao.liub@...>
Date:   2018-01-12T09:57:05Z

[FLINK-8369] Fix some compiling issues.




---


[GitHub] flink pull request #5270: [FLINK-8368] [REST] Migrate SubtaskExecutionAttemp...

2018-01-09 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5270

[FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a 
REST handler

## What is the purpose of the change

* Migrate 
`org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler`
 to flip-6 `WebMonitorEndpoint`.

## Brief change log

* Make some abstraction about `JobVertexHandler` and 
`SubtaskAttemptHandler`.
* Add `SubtaskExecutionAttemptDetailsHandler` in flip-6 REST framework.
* Rename inner class `JobVertexMetrics` to public class `IOMetricsInfo`, 
make it more reusable.

## Verifying this change

* This change added unit tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-8368

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5270.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5270


commit 29182f04255f77288e207aef9e7015862d3e9a8c
Author: biao.liub <biao.liub@...>
Date:   2018-01-10T06:25:07Z

[FLINK-8368] Migrate 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler
 to new a REST handler that registered in WebMonitorEndpoint




---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-13 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156621784
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.g

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-13 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r154871284
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
 ---
@@ -19,68 +19,104 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 public class ScheduledUnit {
-   
+
+   @Nullable
private final Execution vertexExecution;
-   
-   private final SlotSharingGroup sharingGroup;
-   
-   private final CoLocationConstraint locationConstraint;
+
+   private final JobVertexID jobVertexId;
+
+   @Nullable
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   @Nullable
+   private final CoLocationConstraint coLocationConstraint;

// 


public ScheduledUnit(Execution task) {
-   Preconditions.checkNotNull(task);
-   
-   this.vertexExecution = task;
-   this.sharingGroup = null;
-   this.locationConstraint = null;
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   null,
+   null);
}

-   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
-   Preconditions.checkNotNull(task);
-   
-   this.vertexExecution = task;
-   this.sharingGroup = sharingUnit;
-   this.locationConstraint = null;
+   public ScheduledUnit(Execution task, SlotSharingGroupId 
slotSharingGroupId) {
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   slotSharingGroupId,
+   null);
}

-   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, 
CoLocationConstraint locationConstraint) {
-   Preconditions.checkNotNull(task);
-   Preconditions.checkNotNull(sharingUnit);
-   Preconditions.checkNotNull(locationConstraint);
-   
+   public ScheduledUnit(
+   Execution task,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint) {
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   slotSharingGroupId,
+   coLocationConstraint);
+   }
+
+   public ScheduledUnit(
+   JobVertexID jobVertexId,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint) {
+   this(
+   null,
+   jobVertexId,
+   slotSharingGroupId,
+   coLocationConstraint);
+   }
+
+   public ScheduledUnit(
+   Execution task,
+   JobVertexID jobVertexId,
--- End diff --

We can get JobVertexID from Execution. Do we need this in Constructor?


---


[GitHub] flink issue #5048: [Flink-7871] [flip6] SlotPool should release unused slots...

2017-12-04 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5048
  
Hi Till, thank you for reviewing.

I agree that it is much simpler to use `scheduleRunAsync`.

[PR-5091](https://github.com/apache/flink/pull/5091) is not a small 
reworking. But it looks really great to me. Will rebase it and address comments 
in a few days.


---


[GitHub] flink pull request #5048: [Flink-7871] [flip6] SlotPool should release unuse...

2017-11-22 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5048

[Flink-7871] [flip6] SlotPool should release unused slots to RM

## What is the purpose of the change

* This pull request makes SlotPool release unused slots back to RM if the 
slots is idle for some time

## Brief change log

* Add a TimerService in SlotPool
* Each available slot will register a timer in TimerService, and will 
unregister when the slot is removed or becomes allocated
* If timeout happened, the slot will be released by notifying TM
* Add notifySlotUnused method in TaskExecutorGateway
* Make some fixed params configurable in SlotPool

## Verifying this change

This change added tests in AvailableSlotsTest

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no, but add a new `@Public(Evolving)` class SlotOptions)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-7871

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5048.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5048


commit b16d206b1ff1a459de10dd5d08094172386d3707
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-16T10:11:20Z

[FLINK-7871] [flip6] SlotPool will release unused slot if it is idle for a 
period.

commit 19e9ccfb69b6e1529ed346a00389b682eecdfcfd
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-16T11:15:16Z

[FLINK-7871] [flip6] Stop timer service in SlotPool elegantly.

commit 0484f7457e46180fe49ae4f3483afa92c0b20ac5
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-17T02:58:57Z

[FLINK-7871] [flip6] Fix rebasing error.

commit 2a012582b1e169a90c5674ed294d95c12029ee8d
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-17T03:19:40Z

[FLINK-7871] [flip6] Fix AvailableSlotsTest.

commit bb8b8f889735ab7a93c837db3c33b3f0900755db
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-17T08:15:01Z

[FLINK-7871] [flip6] Add super.postStop() in SlotPool.postStop()

commit 29fe70cec7de136e5959c9f128d32ece97fb68fc
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-17T08:36:41Z

[Flink-7871] [flip6] SlotPool should release unused slots to RM

Summary:
## What is the purpose of the change

* This pull request makes SlotPool release unused slots back to RM if the 
slots is idle for some time

## Brief change log

* Add a TimerService in SlotPool
* Each available slot will register a timer in TimerService, and will 
unregister when the slot is removed or becomes allocated
* If timeout happened, the slot will be released by notifying TM
* Add notifySlotUnused method in TaskExecutorGateway
* Make some fixed params configurable in SlotPool

## Verifying this change

This change added tests in AvailableSlotsTest

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no, but add a new '@Public(Evolving)' class SlotOptions)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

Test Plan: UT done

Reviewers: 海涛, 辅机

Differential Revision: https://aone.alibaba-inc.com/code/D350078

commit 7b144e853df1d9a807464832a3582922c91672d1
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-11-17T09:19:40Z

[Flink-7871] [flip6] SlotPool should release unused slots to RM

Summary:
## What is the purpose of the change

* This pull request makes SlotPool release unused slots back to RM if the 
slots is idle for some time

## Brie

[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...

2017-05-12 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3810
  
Nice work!
I have rebased master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...

2017-05-10 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3810
  
OK, got it. Thank you for explanation. Is there any more problems with this 
pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...

2017-05-08 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3810
  
Hi, I updated this review. I don't know why there is no reminding in jira 
and email.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3810: [FLINK-6397] MultipleProgramsTestBase does not res...

2017-05-07 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3810#discussion_r115170542
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -80,29 +80,36 @@

protected final TestExecutionMode mode;
 
-   
+   private TestEnvironment testEnvironment;
+
+   private CollectionTestEnvironment collectionTestEnvironment;
+
public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;
-   
+
switch(mode){
case CLUSTER:
-   new TestEnvironment(cluster, 4).setAsContext();
+   testEnvironment = new TestEnvironment(cluster, 
4);
--- End diff --

Thank you for responding. Last my comment is not correct. Actually what I 
want to say is that we can not move these code to \@BeforeClass and 
\@AfterClass. But we can move them to \@Before and \@After, however it will 
bring a little overhead since \@Before and \@After will be called for each test 
method. I pushed a new commit using \@Before and \@After.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...

2017-05-06 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3810
  
I think Till has implemented a static unsetAsContext method in 
TestEnvironment recently. I rebased the implementation from master, and 
recommit the pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3810: [FLINK-6397] MultipleProgramsTestBase does not res...

2017-05-06 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3810#discussion_r115133175
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -80,29 +80,36 @@

protected final TestExecutionMode mode;
 
-   
+   private TestEnvironment testEnvironment;
+
+   private CollectionTestEnvironment collectionTestEnvironment;
+
public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;
-   
+
switch(mode){
case CLUSTER:
-   new TestEnvironment(cluster, 4).setAsContext();
+   testEnvironment = new TestEnvironment(cluster, 
4);
--- End diff --

@zentol I just found that I can not move these codes into \@Before method, 
because it will not work with JUnit Parameterized. I proposal to keep these 
codes in constructor. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3810: [Flink-6397] MultipleProgramsTestBase does not res...

2017-05-03 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3810#discussion_r114693043
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -80,29 +80,36 @@

protected final TestExecutionMode mode;
 
-   
+   private TestEnvironment testEnvironment;
+
+   private CollectionTestEnvironment collectionTestEnvironment;
+
public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;
-   
+
switch(mode){
case CLUSTER:
-   new TestEnvironment(cluster, 4).setAsContext();
+   testEnvironment = new TestEnvironment(cluster, 
4);
--- End diff --

Sounds reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3810: [Flink-6397] MultipleProgramsTestBase does not reset Cont...

2017-05-03 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3810
  
@StephanEwen 
I agree that a static unset method would be a much easier implementation. 
Do you think it's acceptable that the unset method is static but set method is 
not static in TestEnvironment? Or we can implement the set method as static 
too, but that will make more changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3810: [Flink-6397] MultipleProgramsTestBase does not res...

2017-05-02 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3810

[Flink-6397] MultipleProgramsTestBase does not reset ContextEnvironment

Reset ContextEnvironment when finished testing in 
MultipleProgramsTestBase.java and some other ITCase.
Remove some useless importing.
Add unsetContext method in TestEnvironment.java.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-6397

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3810


commit 0932dcdf0d78a783b2921fa26ae80a6a4889c3ca
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-05-02T08:07:34Z

[FLINK-6397] Unset context when integration test is finished.

commit d0becf4331c7649b0711ec37154501fc39a7177b
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-05-02T11:01:47Z

[FLINK-6397] Fix test cases.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3395: [FLINK-5861] Components of TaskManager support updating J...

2017-03-09 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3395
  
Hi Till, I almost miss this comments! I didn't see it until a few minutes 
ago.

I fully understand your concern. Just let me explain more about your 
comments.
1. I agree most of your suggestions. Such as null check, formatting problem 
and TestLogger.
2. Currently synchronize problem will not happen. I think replacing the 
field value is safe. That's a atomic operation. Correct me if I'm wrong.
3. This PR will not work with other reconciliation PRs. Nobody will notify 
these listeners. Actually we implemented reconnection between TM and JM. It 
will work with those codes. The reason I make this PR without other 
reconciliation PRs is that I think this PR is independent with other parts. I 
believe filing a huge PR is both terrible for reviewer and writer. However this 
single PR makes you confused. Sorry about that. 
4. Actually I'm not sure listener pattern is the best way to do this. But I 
think it's the simplest way which makes least modifications of current 
implementation. If the TM reconnected with new JM, how can we update the 
JobMasterGateway handled by components? I can't figure out a better way except 
reimplementing these components.

Anyway, thank you for reviewing and commenting so many! I agree with you 
that we should close this PR at this moment. After making an agreement about 
main reconciliation PRs, we can talk about what this PR try to implement. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

2017-03-09 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/3395


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3487: [FLINK-5980] Expose max-parallelism value in RuntimeConte...

2017-03-08 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3487
  
Thank you, Stephan and zentol.
Good question and suggestion. I didn't consider it too much for batch jobs.
Also I think it's a bad idea that naming the variable "numberOfKeyGroups" 
in TaskInfo. Keeping max-parallelism is better. It's more common and makes much 
more sense for other scenarios.
I will make sure it can work with batch jobs, and update this PR later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3487: [FLINK-5980] Expose max-parallelism value in Runti...

2017-03-07 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3487#discussion_r104825967
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -85,6 +85,13 @@
int getIndexOfThisSubtask();
 
/**
+* Gets the number of max-parallelism with which the parallel task runs.
+*
+* @return The max-parallelism with which the parallel task runs.
+*/
+   int getMaxParallelismOfSubtasks();
--- End diff --

That's a better name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3487: [FLINK-5980] Expose max-parallelism value in Runti...

2017-03-07 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3487

[FLINK-5980] Expose max-parallelism value in RuntimeContext.

Add new method named getMaxParallelismOfSubtasks in RuntimeContext.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5980

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3487


commit b7844d43d053d447a905fa727916683e06ec583d
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-03-07T12:11:10Z

[FLINK-5980] Expose max-parallelism value in RuntimeContext.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add...

2017-02-23 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/3396


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add super(...

2017-02-23 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3396
  
Ah, you are right. I think I mixed up this issue with another one. Thank 
you for pointing it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add...

2017-02-22 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3396

[FLINK-5879] Fix bug about ExecutionAttemptID, add super() in constructor.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5879

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3396






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

2017-02-22 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3395

[FLINK-5861] Components of TaskManager support updating JobManagerConnection



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3395






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3296: [FLINK-5784] Fix bug about registering JobManagerL...

2017-02-12 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3296

[FLINK-5784] Fix bug about registering JobManagerLeaderListener in 
LeaderRetrievalService multiple times

[FLINK-5784] Fix bug about registering JobManagerLeaderListener in 
LeaderRetrievalService multiple times.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5784

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3296


commit d827f1d87ab8a15458b328fb10beeb70f4515493
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2017-02-13T03:06:46Z

[FLINK-5784] Fix bug about registering JobManagerLeaderListener in 
LeaderRetrievalService multiple times.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2877: [FLINK-5141] Implement Flip6LocalStreamEnvironment...

2016-12-19 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2877


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2828: [FLINK-5093] java.util.ConcurrentModificationExcep...

2016-11-28 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2828


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2817: [FLINK-5076] Shutting down TM when shutting down m...

2016-11-28 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2817


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2877: [FLINK-5141] Implement Flip6LocalStreamEnvironment...

2016-11-27 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/2877

[FLINK-5141] Implement Flip6LocalStreamEnvironment to run new mini cluster 
in flip-6 branch

Implement a StreamEnvironment to run flip-6 mini cluster. According to 
Till's suggestion, I named it Flip6LocalStreamEnvironment for this moment.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5141

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2877.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2877


commit 1ed9ea9a929d1000e46687912b23b715931bb384
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2016-11-23T09:02:11Z

[FLINK-5141] Implement MiniClusterStreamEnvironment for new mini
cluster.

commit cca637ea1a3bee82802dbd237c7fc4b8d4bdee84
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2016-11-24T10:38:50Z

[FLINK-5141] Rename MiniClusterStreamEnvironment to 
Flip6LocalStreamEnvironment.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2828: [FLINK-5093] java.util.ConcurrentModificationExcep...

2016-11-18 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/2828

[FLINK-5093] java.util.ConcurrentModificationException is thrown when 
stopping TimerService

[FLINK-5093] Fix bug about java.util.ConcurrentModificationException thrown 
while stopping TimerService. Add a new method "unregisterAllTimeouts" and UT 
for it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5093

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2828.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2828


commit 21fdd8da33c634f5b6a19ed8ea7af9c18bcbe9bc
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2016-11-18T10:15:37Z

[FLINK-5093] Fix bug about throwing ConcurrentModificationException when 
stopping TimerService.

commit 24406bbe77ba211ce99f268a85adbb391cc605df
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2016-11-18T10:32:36Z

[FLINK-5093] Remove useless import.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2817: [FLINK-5076] Shutting down TM when shutting down m...

2016-11-16 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/2817

[FLINK-5076] Shutting down TM when shutting down mini cluster.

This PR [#5076](https://issues.apache.org/jira/browse/FLINK-5076) adds 
shutting down task manager when shutting down new mini cluster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5076

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2817


commit fabb666c2c4f29392287b1f7a04d47a3e021f75e
Author: biao.liub <biao.l...@alibaba-inc.com>
Date:   2016-11-16T09:54:48Z

[FLINK-5076] Shutting down TM when shutting down mini cluster.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---