[jira] [Commented] (FLINK-11590) Redundant spaces occur when the string true is generated using 'case when' grammar

2019-02-13 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766899#comment-16766899
 ] 

aitozi commented on FLINK-11590:


Hi,  [~jiaqiang]

 
I think this is due to the `case when` expression will have the longest length 
of all branches, so in the example you post , It will be char(5). And a CHAR 
value will always be right-padded with the spaces to the declared length.
 
You can fix this by cast the type to varchar or use the trim() function. And I 
think this is not a bug but we can add a notice in the flink doc.
 
Thanks,
Aitozi

> Redundant spaces occur when the string true is generated using 'case when' 
> grammar
> --
>
> Key: FLINK-11590
> URL: https://issues.apache.org/jira/browse/FLINK-11590
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.4, 1.7.0
>Reporter: dong
>Priority: Major
>
> {code:java}
> case when conditionA then 'false'  else 'true'  end as field
> {code}
> When I execute the above statement, if the field is true, the output will be 
> 'true ' with an extra space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10079) Support external sink table in INSERT INTO statement

2019-02-13 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-10079.
---
Resolution: Fixed

Close the JIRA due to the PR has been merged.

> Support external sink table in INSERT INTO statement
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.6.4, 1.8.0
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9776) Interrupt TaskThread only while in User/Operator code

2019-02-13 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-9776.
--
Resolution: Fixed

The PR has been merged.

> Interrupt TaskThread only while in User/Operator code
> -
>
> Key: FLINK-9776
> URL: https://issues.apache.org/jira/browse/FLINK-9776
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.8.0
>
>
> Upon cancellation, the task thread is periodically interrupted.
> This helps to pull the thread out of blocking operations in the user code.
> Once the thread leaves the user code, the repeated interrupts may interfere 
> with the shutdown cleanup logic, causing confusing exceptions.
> We should stop sending the periodic interrupts once the thread leaves the 
> user code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11382) Disable MetricFetcher if interval is configured to 0

2019-02-13 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766921#comment-16766921
 ] 

leesf commented on FLINK-11382:
---

[~Zentol] ok,i will open a follow-up to update the documentation, and happy 
Chinese new year.

> Disable MetricFetcher if interval is configured to 0
> 
>
> Key: FLINK-11382
> URL: https://issues.apache.org/jira/browse/FLINK-11382
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Reporter: Chesnay Schepler
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Follow-up for FLINK-10822 to disable the MetricFetcher completely if the 
> interval is configured to 0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2019-02-13 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766913#comment-16766913
 ] 

sunjincheng commented on FLINK-9900:


Can we close the JIRA due to the PRs has been merged. [~Zentol] [~mingleizhang] 
?

> Failed to testRestoreBehaviourWithFaultyStateHandles 
> (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
> ---
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.1, 1.6.0
>Reporter: zhangminglei
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.3, 1.8.0
>
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11591) Restoring LockableTypeSerializer's snapshot from 1.6 and below requires pre-processing before snapshot is valid to use

2019-02-13 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-11591:
---

 Summary: Restoring LockableTypeSerializer's snapshot from 1.6 and 
below requires pre-processing before snapshot is valid to use
 Key: FLINK-11591
 URL: https://issues.apache.org/jira/browse/FLINK-11591
 Project: Flink
  Issue Type: Bug
  Components: CEP, Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Igal Shilman


In 1.6 and below, the {{LockableTypeSerializer}} incorrectly returns directly 
the element serializer's snapshot instead of wrapping it within an independent 
snapshot class:
https://github.com/apache/flink/blob/release-1.6/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java#L188

This results in the fact that the the written state information for this would 
be {{(LockableTypeSerializer, SomeArbitrarySnapshot)}}.

The problem occurs when restoring this in Flink 1.7+, since compatibility 
checks are now performed by providing the new serializer to the snapshot, what 
would happen is:
{{SomeArbitrarySnapshot.resolveSchemaCompatibility(newLockableTypeSerializer)}},
 which would not work since the arbitrary snapshot does not recognize the 
{{LockableTypeSerializer}}.

To fix this, we essentially need to preprocess that arbitrary snapshot when 
restoring from <= 1.6 version snapshots.

A proposed fix would be to have the following interface:
{code}
public interface RequiresLegacySerializerSnapshotPreprocessing {
TypeSerializerSnapshot 
preprocessLegacySerializerSnapshot(TypeSerializerSnapshot legacySnapshot)
}
{code}

The {{LockableTypeSerializer}} would then implement this interface, and in the 
{{preprocessLegacySerializerSnapshot}} method, properly wrap that arbitrary 
element serializer snapshot into a {{LockableTypeSerializerSnapshot}}.

In general, this interface is useful to preprocess any problematic snapshot 
that was returned pre 1.7.

The point-in-time to check if a written serializer in <= 1.6 savepoints 
implements this interface and preprocesses the read snapshot would be:
https://github.com/apache/flink/blob/a567a1ef628eadad21e11864ec328481cd6d7898/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java#L218



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11586) Check and port SlotSharingITCase to new code base

2019-02-13 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766937#comment-16766937
 ] 

Till Rohrmann commented on FLINK-11586:
---

Sorry for the duplication of FLINK-10610. I'll write on the other PR thread.

> Check and port SlotSharingITCase to new code base
> -
>
> Key: FLINK-11586
> URL: https://issues.apache.org/jira/browse/FLINK-11586
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Check and port {{SlotSharingITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

2019-02-13 Thread GitBox
tillrohrmann commented on issue #6883: [FLINK-10610] [tests] Port slot sharing 
cases to new codebase
URL: https://github.com/apache/flink/pull/6883#issuecomment-463109196
 
 
   Sorry for joining so late to the party. I actually missed this PR and 
accidentally opened #7689. While porting the tests, I think that we can 
completely remove the `SlotSharingITCase`, since the test is no longer valid. 
If you remove the slot sharing groups the test would pass with the new code 
because of the queued scheduling. The only test which I would port is the 
`CoLocationConstraintITCase`  (#7690).
   
   Moreover, with #7676 I added some functionality for terminating and starting 
new `TaskExecutors` for the `MiniCluster`.
   
   I also think that `TaskManagerFailsWithSlotSharingITCase` should simply be 
removed since it is the same as the #7676 just with slot sharing. One could 
argue that slot sharing is the more common case and, therefore should be used. 
But then we should simply adapt #7676.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-11587) Check and port CoLocationConstraintITCase to new code base

2019-02-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-11587:
---

I think FLINK-10610 ports too many test cases which are not strictly needed. 
Therefore I would like to keep this issue to track the porting efforts for the 
{{CoLocationConstraintITCase}}.

> Check and port CoLocationConstraintITCase to new code base
> --
>
> Key: FLINK-11587
> URL: https://issues.apache.org/jira/browse/FLINK-11587
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> Check and port {{CoLocationConstraintITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] leesf opened a new pull request #7691: [hotfix] update documentation of metrics fetcher update interval

2019-02-13 Thread GitBox
leesf opened a new pull request #7691: [hotfix] update documentation of metrics 
fetcher update interval
URL: https://github.com/apache/flink/pull/7691
 
 
   
   ## What is the purpose of the change
   
   Update documentation of metrics fetcher update interval.
   
   ## Brief change log
   
   Update documentation of metrics fetcher update interval
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yno)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] leesf commented on issue #7691: [hotfix] update documentation of metrics fetcher update interval

2019-02-13 Thread GitBox
leesf commented on issue #7691: [hotfix] update documentation of metrics 
fetcher update interval
URL: https://github.com/apache/flink/pull/7691#issuecomment-463111873
 
 
   cc @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot commented on issue #7691: [hotfix] update documentation of metrics fetcher update interval

2019-02-13 Thread GitBox
flinkbot commented on issue #7691: [hotfix] update documentation of metrics 
fetcher update interval
URL: https://github.com/apache/flink/pull/7691#issuecomment-463111999
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256292386
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -928,9 +931,16 @@ public void onFatalError(Throwable exception) {
 
private class TerminatingFatalErrorHandlerFactory {
 
+   /**
+* Create a new {@link TerminatingFatalErrorHandler} for the 
{@link TaskExecutor} with
+* the given index.
+*
+* @param index into the {{@link #taskManagers}} collection to 
identify the correct {@link TaskExecutor}.
 
 Review comment:
   Are the double curly braces `{{}}` intended?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256289231
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
+   BiConsumer 
jobReachedRunningState,
+   BiFunction> heartbeatConsumerFunction) throws Exception {
+   final JobGraph jobGraph = createSingleVertexJobGraph();
+   final TestingOnCompletionActions onCompletionActions = new 
TestingOnCompletionActions();
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build(),
+   heartbeatSupplier.get(),
 
 Review comment:
   ```suggestion
heartbeatServices,
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256290442
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
+   BiConsumer 
jobReachedRunningState,
+   BiFunction> heartbeatConsumerFunction) throws Exception {
+   final JobGraph jobGraph = createSingleVertexJobGraph();
+   final TestingOnCompletionActions onCompletionActions = new 
TestingOnCompletionActions();
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build(),
+   heartbeatSupplier.get(),
+   onCompletionActions);
+
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = 
createAndRegisterTestingResourceManagerGateway();
+   
notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
+
+   try {
+   jobMaster.start(jobMasterId).get();
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+   final CompletableFuture 
taskDeploymentFuture = new CompletableFuture<>();
+   final TestingTaskExecutorGateway taskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+   
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+   
taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   })
+   
.setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway,
 taskManagerLocation.getResourceID()))
+   .createTestingTaskExecutorGateway();
+   
rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
+
+   
jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+
+   offerSingleSlot(jobMasterGateway, taskManagerLocation);
+
+   final ExecutionAttemptID executionAttemptId = 
taskDeploymentFuture.get();
+
+   jobMasterGatewa

[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256293420
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -215,6 +215,14 @@ public HighAvailabilityServices 
getHighAvailabilityServices() {
}
}
 
+   @VisibleForTesting
+   @Nonnull
+   protected Collection> 
getDispatcherResourceManagerComponents() {
 
 Review comment:
   I am confused about why this change is show in the fixup commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256289349
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
 
 Review comment:
   ```suggestion
heartbeatServices,
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256289829
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1430,120 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
+   BiConsumer 
jobReachedRunningState,
+   BiFunction> 
heartbeatConsumerFunction) throws Exception {
+   final JobGraph jobGraph = createSingleVertexJobGraph();
+   final TestingOnCompletionActions onCompletionActions = new 
TestingOnCompletionActions();
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build(),
+   heartbeatSupplier.get(),
+   onCompletionActions);
+
+   createAndRegisterTestingResourceManagerGateway();
+
+   try {
+   jobMaster.start(jobMasterId).get();
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+   final CompletableFuture 
taskDeploymentFuture = new CompletableFuture<>();
+   final TestingTaskExecutorGateway taskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+   
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+   
taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   })
+   
.setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway,
 taskManagerLocation.getResourceID()))
+   .createTestingTaskExecutorGateway();
+   
rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
+
+   
jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+   final SlotOffer slotOffer = new SlotOffer(new 
AllocationID(), 0, ResourceProfile.UNKNOWN);
+   final Collection slotOffers = 
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), 
Colle

[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256289305
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
 
 Review comment:
   ```suggestion
fastHeartbeatServices,
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256295246
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 ##
 @@ -74,8 +74,13 @@ public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfigurati
}
 
@Override
-   public void startTaskExecutor(boolean localCommunication) throws 
Exception {
 
 Review comment:
   Is it crucial for testing to be able to set the right `localCommunication` 
flag? If yes, a method overload that sets `localCommunication` to `false` would 
have been enough but I am not insisting on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256289155
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
 
 Review comment:
   Does `HeartbeatServices` have to be lazily supplied?
   
   ```suggestion
HeartbeatServices heartbeatServices,
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256289829
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1430,120 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
+   BiConsumer 
jobReachedRunningState,
+   BiFunction> 
heartbeatConsumerFunction) throws Exception {
+   final JobGraph jobGraph = createSingleVertexJobGraph();
+   final TestingOnCompletionActions onCompletionActions = new 
TestingOnCompletionActions();
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build(),
+   heartbeatSupplier.get(),
+   onCompletionActions);
+
+   createAndRegisterTestingResourceManagerGateway();
+
+   try {
+   jobMaster.start(jobMasterId).get();
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+   final CompletableFuture 
taskDeploymentFuture = new CompletableFuture<>();
+   final TestingTaskExecutorGateway taskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+   
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+   
taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   })
+   
.setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway,
 taskManagerLocation.getResourceID()))
+   .createTestingTaskExecutorGateway();
+   
rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
+
+   
jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+   final SlotOffer slotOffer = new SlotOffer(new 
AllocationID(), 0, ResourceProfile.UNKNOWN);
+   final Collection slotOffers = 
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), 
Colle

[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256293420
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -215,6 +215,14 @@ public HighAvailabilityServices 
getHighAvailabilityServices() {
}
}
 
+   @VisibleForTesting
+   @Nonnull
+   protected Collection> 
getDispatcherResourceManagerComponents() {
 
 Review comment:
   I am confused about why this change is shown in the fixup commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11591) Restoring LockableTypeSerializer's snapshot from 1.6 and below requires pre-processing before snapshot is valid to use

2019-02-13 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11591:

Affects Version/s: 1.7.0
   1.7.1

> Restoring LockableTypeSerializer's snapshot from 1.6 and below requires 
> pre-processing before snapshot is valid to use
> --
>
> Key: FLINK-11591
> URL: https://issues.apache.org/jira/browse/FLINK-11591
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, Type Serialization System
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Igal Shilman
>Priority: Major
>
> In 1.6 and below, the {{LockableTypeSerializer}} incorrectly returns directly 
> the element serializer's snapshot instead of wrapping it within an 
> independent snapshot class:
> https://github.com/apache/flink/blob/release-1.6/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java#L188
> This results in the fact that the the written state information for this 
> would be {{(LockableTypeSerializer, SomeArbitrarySnapshot)}}.
> The problem occurs when restoring this in Flink 1.7+, since compatibility 
> checks are now performed by providing the new serializer to the snapshot, 
> what would happen is:
> {{SomeArbitrarySnapshot.resolveSchemaCompatibility(newLockableTypeSerializer)}},
>  which would not work since the arbitrary snapshot does not recognize the 
> {{LockableTypeSerializer}}.
> To fix this, we essentially need to preprocess that arbitrary snapshot when 
> restoring from <= 1.6 version snapshots.
> A proposed fix would be to have the following interface:
> {code}
> public interface RequiresLegacySerializerSnapshotPreprocessing {
> TypeSerializerSnapshot 
> preprocessLegacySerializerSnapshot(TypeSerializerSnapshot legacySnapshot)
> }
> {code}
> The {{LockableTypeSerializer}} would then implement this interface, and in 
> the {{preprocessLegacySerializerSnapshot}} method, properly wrap that 
> arbitrary element serializer snapshot into a 
> {{LockableTypeSerializerSnapshot}}.
> In general, this interface is useful to preprocess any problematic snapshot 
> that was returned pre 1.7.
> The point-in-time to check if a written serializer in <= 1.6 savepoints 
> implements this interface and preprocesses the read snapshot would be:
> https://github.com/apache/flink/blob/a567a1ef628eadad21e11864ec328481cd6d7898/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java#L218



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #6883: [FLINK-10610] [tests] 
Port slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r256294812
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -209,6 +209,13 @@ public HighAvailabilityServices 
getHighAvailabilityServices() {
}
}
 
+   TaskExecutor[] getTaskManagers() {
+   synchronized (lock) {
+   checkState(running, "MiniCluster is not yet running.");
+   return taskManagers;
+   }
+   }
 
 Review comment:
   I would like to do this differently. Please take a look at #7676.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256293420
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -215,6 +215,14 @@ public HighAvailabilityServices 
getHighAvailabilityServices() {
}
}
 
+   @VisibleForTesting
+   @Nonnull
+   protected Collection> 
getDispatcherResourceManagerComponents() {
 
 Review comment:
   I am confused about why this change is shown in the fixup commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256296254
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -738,6 +732,26 @@ protected ResourceManagerRunner startResourceManager(
//  Internal methods
// 

 
+   @GuardedBy("lock")
+   private Collection> 
terminateTaskExecutor() {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256296223
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 ##
 @@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for the {@link TaskExecutor}.
+ */
+public class TaskExecutorITCase extends TestLogger {
+
+   private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L);
+   private static final int NUM_TMS = 2;
+   private static final int SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+
+   private TestingMiniCluster miniCluster;
+
+   @Before
+   public void setup() throws Exception  {
+   miniCluster = new TestingMiniCluster(
+   new TestingMiniClusterConfiguration.Builder()
+   .setNumTaskManagers(NUM_TMS)
+   .setNumSlotsPerTaskManager(SLOTS_PER_TM)
+   .build(),
+   null);
+
+   miniCluster.start();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (miniCluster != null) {
+   miniCluster.close();
+   }
+   }
+
+   /**
+* Tests that a job will be re-executed if a new TaskExecutor joins the 
cluster.
+*/
+   @Test
+   public void testNewTaskExecutorJoinsCluster() throws Exception {
+
+   final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT);
+
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.isDone(), is(false));
+
+   CommonTestUtils.waitUntilCondition(
+   jobIsRunning(() -> 
miniCluster.getExecutionGraph(jobGraph.getJobID())),
+   deadline,
+   20L);
+
+   // kill one TaskExecutor which should fail the job execution
+   miniCluster.terminateTaskExecutor(0);
+
+   final JobResult jobResult = jobResultFuture.get();
+
+   assertThat(jobResult.isSuccess(), is(false));
+
+   miniCluster.startTaskExecutor(false);
+
+   BlockingOperator.unblock();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   miniCluster.requestJobResult(jobGraph.getJobID()).get();
+   }
+
+   private SupplierWithException 
jobIsRunning(Supplier> 
executionGraphFutureSupplier) {
+   final Predicate allExecutionsRunning = 
ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING)

[GitHub] tillrohrmann commented on a change in pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #6883: [FLINK-10610] [tests] 
Port slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r256295712
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 ##
 @@ -337,6 +347,262 @@ public void testSchedulingAllAtOnce() throws Exception {
}
}
 
+   @Test
+   public void testSlotSharingForForwardJobWithCoLocationConstraint() 
throws Exception {
+   testSlotSharingForForwardJob(true);
+   }
+
+   @Test
+   public void testSlotSharingForForwardJobWithoutCoLocationConstraint() 
throws Exception {
+   testSlotSharingForForwardJob(false);
+   }
+
+   /**
+* This job runs in N slots with N senders and N receivers.
+* Unless slot sharing is used, it cannot complete.
+* Either with or without co-location constraint should not
+* make difference.
+*/
+   private void testSlotSharingForForwardJob(boolean 
withCoLocationConstraint) throws Exception {
+   final int parallelism = 11;
+
+   final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+   .setNumTaskManagers(1)
+   .setNumSlotsPerTaskManager(parallelism)
+   .setConfiguration(getDefaultConfiguration())
+   .build();
+
+   try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+   miniCluster.start();
+
+   final JobVertex sender = new JobVertex("Sender");
+   sender.setInvokableClass(CountDownLatchedSender.class);
+   sender.setParallelism(parallelism);
+
+   final JobVertex receiver = new JobVertex("Receiver");
+   
receiver.setInvokableClass(CountDownLatchedReceiver.class);
+   receiver.setParallelism(parallelism);
+
+   receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+   ResultPartitionType.PIPELINED);
+
+   final CountDownLatch countDownLatch = new 
CountDownLatch(parallelism);
+   CountDownLatchedSender.setLatch(countDownLatch);
+   CountDownLatchedReceiver.setLatch(countDownLatch);
+
+   final SlotSharingGroup sharingGroup = new 
SlotSharingGroup(sender.getID(), receiver.getID());
+   sender.setSlotSharingGroup(sharingGroup);
+   receiver.setSlotSharingGroup(sharingGroup);
+
+   if (withCoLocationConstraint) {
+   receiver.setStrictlyCoLocatedWith(sender);
+   }
+
+   final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+   miniCluster.executeJobBlocking(jobGraph);
+   }
+   }
+
+   /**
+* A sender that does not exit until all receivers are running.
+*/
+   public static class CountDownLatchedSender extends AbstractInvokable {
+
+   private static CountDownLatch latch;
+
+   static void setLatch(CountDownLatch latch) {
+   CountDownLatchedSender.latch = latch;
+   }
+
+   public CountDownLatchedSender(Environment environment) {
+   super(environment);
+   }
+
+   @Override
+   public void invoke() throws Exception {
+   RecordWriter writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
+
+   try {
+   writer.emit(new IntValue(42));
+   writer.emit(new IntValue(1337));
+   writer.flushAll();
+   } finally {
+   writer.clearBuffers();
+   latch.await();
+   }
+   }
+   }
+
+   /**
+* A receiver that counts down the latch on running.
+*/
+   public static class CountDownLatchedReceiver extends AbstractInvokable {
+
+   private static CountDownLatch latch;
+
+   static void setLatch(CountDownLatch latch) {
+   CountDownLatchedReceiver.latch = latch;
+   }
+
+   public CountDownLatchedReceiver(Environment environment) {
+   super(environment);
+   }
+
+   @Override
+   public void invoke() throws Exception {
+   latch.countDown();
+
+   RecordReader reader = new RecordReader<>(
+   getEnvironment().getInputGate(0),
+   IntValue.class,
+   

[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256296239
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -945,4 +954,12 @@ public void onFatalError(Throwable exception) {
closeAsync();
}
}
+
+   private class TerminatingFatalErrorHandlerFactory {
+
+   @GuardedBy("lock")
+   private TerminatingFatalErrorHandler create() {
+   return new 
TerminatingFatalErrorHandler(taskManagers.size());
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256296275
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -696,34 +686,38 @@ protected ResourceManagerRunner startResourceManager(
return resourceManagerRunner;
}
 
-   protected TaskExecutor[] startTaskManagers(
-   Configuration configuration,
-   HighAvailabilityServices haServices,
-   HeartbeatServices heartbeatServices,
-   MetricRegistry metricRegistry,
-   BlobCacheService blobCacheService,
-   int numTaskManagers,
-   RpcServiceFactory rpcServiceFactory) throws Exception {
+   @GuardedBy("lock")
+   private void startTaskManagers() throws Exception {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] 
Port TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256297148
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
+   BiConsumer 
jobReachedRunningState,
+   BiFunction> heartbeatConsumerFunction) throws Exception {
+   final JobGraph jobGraph = createSingleVertexJobGraph();
+   final TestingOnCompletionActions onCompletionActions = new 
TestingOnCompletionActions();
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build(),
+   heartbeatSupplier.get(),
+   onCompletionActions);
+
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = 
createAndRegisterTestingResourceManagerGateway();
+   
notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
+
+   try {
+   jobMaster.start(jobMasterId).get();
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+   final CompletableFuture 
taskDeploymentFuture = new CompletableFuture<>();
+   final TestingTaskExecutorGateway taskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+   
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+   
taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   })
+   
.setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway,
 taskManagerLocation.getResourceID()))
+   .createTestingTaskExecutorGateway();
+   
rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
+
+   
jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+
+   offerSingleSlot(jobMasterGateway, taskManagerLocation);
+
+   final ExecutionAttemptID executionAttemptId = 
taskDeploymentFuture.get();
+
+   jobMas

[GitHub] tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] 
Port TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256297227
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -928,9 +931,16 @@ public void onFatalError(Throwable exception) {
 
private class TerminatingFatalErrorHandlerFactory {
 
+   /**
+* Create a new {@link TerminatingFatalErrorHandler} for the 
{@link TaskExecutor} with
+* the given index.
+*
+* @param index into the {{@link #taskManagers}} collection to 
identify the correct {@link TaskExecutor}.
 
 Review comment:
   unintended. will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] 
Port TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256298061
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1422,125 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
 
 Review comment:
   No, this is not correct. Will change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256295246
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 ##
 @@ -74,8 +74,13 @@ public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfigurati
}
 
@Override
-   public void startTaskExecutor(boolean localCommunication) throws 
Exception {
 
 Review comment:
   Is it crucial for testing to be able to set the right `localCommunication` 
flag? If yes, a method overload that sets `localCommunication` to `false` would 
have been enough but I am not insisting on it.
   edit: alternatively always use `false` with no option to override


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256295246
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 ##
 @@ -74,8 +74,13 @@ public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfigurati
}
 
@Override
-   public void startTaskExecutor(boolean localCommunication) throws 
Exception {
 
 Review comment:
   Is it crucial for testing to be able to set the right `localCommunication` 
flag? If yes, a method overload that sets `localCommunication` to `false` would 
have been enough but I am not insisting on it.
   
   _edit:_ alternatively always use `false` with no option to override (if 
possible)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] 
Port TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256300086
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 ##
 @@ -74,8 +74,13 @@ public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfigurati
}
 
@Override
-   public void startTaskExecutor(boolean localCommunication) throws 
Exception {
 
 Review comment:
   are you suggesting to always start the `TaskExecutors` with 
`localCommunication = false`? Or only for the `TestingMiniCluster`? The latter 
should now be the case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11587) Check and port CoLocationConstraintITCase to new code base

2019-02-13 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766944#comment-16766944
 ] 

Gary Yao commented on FLINK-11587:
--

ok

> Check and port CoLocationConstraintITCase to new code base
> --
>
> Key: FLINK-11587
> URL: https://issues.apache.org/jira/browse/FLINK-11587
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> Check and port {{CoLocationConstraintITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2019-02-13 Thread chauncy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766945#comment-16766945
 ] 

chauncy commented on FLINK-10658:
-

[~till.rohrmann] ok 

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't when throw the exception  who tell me ?  thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

2019-02-13 Thread GitBox
TisonKun commented on issue #6883: [FLINK-10610] [tests] Port slot sharing 
cases to new codebase
URL: https://github.com/apache/flink/pull/6883#issuecomment-463116842
 
 
   @tillrohrmann thanks for your guide. Now I see the reason that slot sharing 
isn't really tested and approve that we should only port 
`CoLocationConstraintITCase`.
   
   I've moved to #7690 and find that even we don't have code below we can pass 
the test. Maybe we can set a CountDownLatch like this pr does?
   
   ```java
   final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
receiver.setSlotSharingGroup(slotSharingGroup);
sender.setSlotSharingGroup(slotSharingGroup);
   
receiver.setStrictlyCoLocatedWith(sender);
   ```
   
   basically it's OK to me that we close this thread and move discussions to 
#7689 #7690 and #7676 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on issue #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#issuecomment-463117312
 
 
   Thanks for the second round of review @GJL. I addressed your comments except 
for the last one for which I didn't understand your proposal yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7690: [FLINK-11587][tests] Port CoLocationConstraintITCase to new code base

2019-02-13 Thread GitBox
TisonKun commented on a change in pull request #7690: [FLINK-11587][tests] Port 
CoLocationConstraintITCase to new code base
URL: https://github.com/apache/flink/pull/7690#discussion_r256301442
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for job scheduling.
+ */
+public class JobExecutionITCase extends TestLogger {
+
+   private static final int SLOTS_PER_TM = 11;
+   private static final int NUM_TMS = 2;
+   private static final int PARALLELISM = SLOTS_PER_TM * NUM_TMS;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+   .setNumberTaskManagers(NUM_TMS)
+   .build());
+
+   @Test
+   public void testCoLocationConstraintJobExecution() throws Exception {
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   final MiniCluster miniCluster = 
MINI_CLUSTER_RESOURCE.getMiniCluster();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.get().isSuccess(), is(true));
+   }
+
+   private JobGraph createJobGraph(int parallelism) {
+   final JobVertex sender = new JobVertex("Sender");
+   sender.setParallelism(parallelism);
+   sender.setInvokableClass(Sender.class);
+
+   final JobVertex receiver = new JobVertex("Receiver");
+   receiver.setParallelism(parallelism);
+   receiver.setInvokableClass(Receiver.class);
+
+   final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+   receiver.setSlotSharingGroup(slotSharingGroup);
+   sender.setSlotSharingGroup(slotSharingGroup);
+
+   receiver.setStrictlyCoLocatedWith(sender);
 
 Review comment:
   Even without L82-86 the test can pass. The reason is that some 
Sender/Receiver parallelism start and finish quickly. We can make sure that All 
Senders don't exit until all Receivers become running, maybe by setting a 
CountDownLatch like #6883 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11382) Disable MetricFetcher if interval is configured to 0

2019-02-13 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766956#comment-16766956
 ] 

Chesnay Schepler commented on FLINK-11382:
--

[~xleesf] I've already updated the documentation, thanks for getting back to me 
though. :)

> Disable MetricFetcher if interval is configured to 0
> 
>
> Key: FLINK-11382
> URL: https://issues.apache.org/jira/browse/FLINK-11382
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Reporter: Chesnay Schepler
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Follow-up for FLINK-10822 to disable the MetricFetcher completely if the 
> interval is configured to 0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

2019-02-13 Thread GitBox
tillrohrmann commented on issue #6883: [FLINK-10610] [tests] Port slot sharing 
cases to new codebase
URL: https://github.com/apache/flink/pull/6883#issuecomment-463119230
 
 
   You're right that #7690 does not contain any assertions that `receiver` and 
`sender` run in the same slot. The count down latch does not enforce this (at 
least not how we used it in this PR). Maybe we could send non serializable 
records which need to go through a local channel without serialization to test 
the functionality.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7690: [FLINK-11587][tests] Port CoLocationConstraintITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7690: [FLINK-11587][tests] 
Port CoLocationConstraintITCase to new code base
URL: https://github.com/apache/flink/pull/7690#discussion_r256304674
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for job scheduling.
+ */
+public class JobExecutionITCase extends TestLogger {
+
+   private static final int SLOTS_PER_TM = 11;
+   private static final int NUM_TMS = 2;
+   private static final int PARALLELISM = SLOTS_PER_TM * NUM_TMS;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+   .setNumberTaskManagers(NUM_TMS)
+   .build());
+
+   @Test
+   public void testCoLocationConstraintJobExecution() throws Exception {
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   final MiniCluster miniCluster = 
MINI_CLUSTER_RESOURCE.getMiniCluster();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.get().isSuccess(), is(true));
+   }
+
+   private JobGraph createJobGraph(int parallelism) {
+   final JobVertex sender = new JobVertex("Sender");
+   sender.setParallelism(parallelism);
+   sender.setInvokableClass(Sender.class);
+
+   final JobVertex receiver = new JobVertex("Receiver");
+   receiver.setParallelism(parallelism);
+   receiver.setInvokableClass(Receiver.class);
+
+   final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+   receiver.setSlotSharingGroup(slotSharingGroup);
+   sender.setSlotSharingGroup(slotSharingGroup);
+
+   receiver.setStrictlyCoLocatedWith(sender);
 
 Review comment:
   this is true. We don't have true assertions making sure that task are being 
co-located. The `CountDownLatch` would enforce that both tasks are online at 
the same time. I think this is not what we want to guarantee here. Instead we 
should test that the tasks are deployed in the same slot and, thus, using local 
channels for communication. Maybe a non serializable record could do the trick 
here. I'll try it out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2019-02-13 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766966#comment-16766966
 ] 

Chesnay Schepler commented on FLINK-9900:
-

[~sunjincheng121] No, because these only added more debugging details and did 
not fix the underlying issue.

> Failed to testRestoreBehaviourWithFaultyStateHandles 
> (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
> ---
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.1, 1.6.0
>Reporter: zhangminglei
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.3, 1.8.0
>
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2019-02-13 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9900:

Fix Version/s: (was: 1.6.4)

> Failed to testRestoreBehaviourWithFaultyStateHandles 
> (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
> ---
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.1, 1.6.0
>Reporter: zhangminglei
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #7690: [FLINK-11587][tests] Port CoLocationConstraintITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7690: [FLINK-11587][tests] 
Port CoLocationConstraintITCase to new code base
URL: https://github.com/apache/flink/pull/7690#discussion_r256308933
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for job scheduling.
+ */
+public class JobExecutionITCase extends TestLogger {
+
+   private static final int SLOTS_PER_TM = 11;
+   private static final int NUM_TMS = 2;
+   private static final int PARALLELISM = SLOTS_PER_TM * NUM_TMS;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+   .setNumberTaskManagers(NUM_TMS)
+   .build());
+
+   @Test
+   public void testCoLocationConstraintJobExecution() throws Exception {
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   final MiniCluster miniCluster = 
MINI_CLUSTER_RESOURCE.getMiniCluster();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.get().isSuccess(), is(true));
+   }
+
+   private JobGraph createJobGraph(int parallelism) {
+   final JobVertex sender = new JobVertex("Sender");
+   sender.setParallelism(parallelism);
+   sender.setInvokableClass(Sender.class);
+
+   final JobVertex receiver = new JobVertex("Receiver");
+   receiver.setParallelism(parallelism);
+   receiver.setInvokableClass(Receiver.class);
+
+   final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+   receiver.setSlotSharingGroup(slotSharingGroup);
+   sender.setSlotSharingGroup(slotSharingGroup);
+
+   receiver.setStrictlyCoLocatedWith(sender);
 
 Review comment:
   Hmm this doesn't work because we always serialize into a buffer independent 
of the channel type. The only difference is whether it goes through Netty or 
not I think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7690: [FLINK-11587][tests] Port CoLocationConstraintITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7690: [FLINK-11587][tests] 
Port CoLocationConstraintITCase to new code base
URL: https://github.com/apache/flink/pull/7690#discussion_r256310005
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for job scheduling.
+ */
+public class JobExecutionITCase extends TestLogger {
+
+   private static final int SLOTS_PER_TM = 11;
+   private static final int NUM_TMS = 2;
+   private static final int PARALLELISM = SLOTS_PER_TM * NUM_TMS;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+   .setNumberTaskManagers(NUM_TMS)
+   .build());
+
+   @Test
+   public void testCoLocationConstraintJobExecution() throws Exception {
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   final MiniCluster miniCluster = 
MINI_CLUSTER_RESOURCE.getMiniCluster();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.get().isSuccess(), is(true));
+   }
+
+   private JobGraph createJobGraph(int parallelism) {
+   final JobVertex sender = new JobVertex("Sender");
+   sender.setParallelism(parallelism);
+   sender.setInvokableClass(Sender.class);
+
+   final JobVertex receiver = new JobVertex("Receiver");
+   receiver.setParallelism(parallelism);
+   receiver.setInvokableClass(Receiver.class);
+
+   final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+   receiver.setSlotSharingGroup(slotSharingGroup);
+   sender.setSlotSharingGroup(slotSharingGroup);
+
+   receiver.setStrictlyCoLocatedWith(sender);
 
 Review comment:
   I think this should be the solution. What we can do is to start the 
`MiniCluster` with only local communication enabled. That way we won't start 
netty and the communication needs to happen strictly locally :-).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11590) Redundant spaces occur when the string true is generated using 'case when' grammar

2019-02-13 Thread dong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dong closed FLINK-11590.

  Resolution: Not A Bug
Release Note: it is not a bug but should notice in flink doc

> Redundant spaces occur when the string true is generated using 'case when' 
> grammar
> --
>
> Key: FLINK-11590
> URL: https://issues.apache.org/jira/browse/FLINK-11590
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.4, 1.7.0
>Reporter: dong
>Priority: Major
>
> {code:java}
> case when conditionA then 'false'  else 'true'  end as field
> {code}
> When I execute the above statement, if the field is true, the output will be 
> 'true ' with an extra space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol merged pull request #7686: [FLINK-11585][docs] Fix prefix matching

2019-02-13 Thread GitBox
zentol merged pull request #7686: [FLINK-11585][docs] Fix prefix matching
URL: https://github.com/apache/flink/pull/7686
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7638: [FLINK-11510] [DataStream] Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )

2019-02-13 Thread GitBox
rmetzger commented on issue #7638: [FLINK-11510] [DataStream] Add the 
MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )
URL: https://github.com/apache/flink/pull/7638#issuecomment-463130233
 
 
   Thanks a lot for your contribution!
   I've commented on the PR because the tracking comment by flinkbot was 
outdated.
   Before I can review the pull request, I would like to get approval from 
somebody more familiar with the Flink API whether we have consensus to actually 
add this feature (see also the contribution guide: 
https://flink.apache.org/contribute-code.html#before-you-start-coding) 
   
   I've pinged Aljoscha in the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-11510
   Let's hope he soon confirms that the feature is a good fit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7638: [FLINK-11510] [DataStream] Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )

2019-02-13 Thread GitBox
rmetzger commented on issue #7638: [FLINK-11510] [DataStream] Add the 
MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )
URL: https://github.com/apache/flink/pull/7638#issuecomment-463130315
 
 
   @flinkbot approve description


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11585) Prefix matching in ConfigDocsGenerator can result in wrong assignments

2019-02-13 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-11585.

Resolution: Fixed

master: 0719db8895452ccb98548da1706cc65c34c99d76
1.7: 26abcfd4934d5ddae517f66f4485ad5d0c2f3593
1.6: 1cfa90c4b458b50e4216dfa7684591b0a025f98a

> Prefix matching in ConfigDocsGenerator can result in wrong assignments
> --
>
> Key: FLINK-11585
> URL: https://issues.apache.org/jira/browse/FLINK-11585
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.6.3, 1.7.1, 1.8.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.3, 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There are some cases where the key-prefix matching does not work as intended:
> * given the prefixes "a.b" and "a.b.c.d", then an option with a key "a.b.c.X" 
> will be assigned to the default groups instead of "a.b"
> * given a prefix "a.b", an option "a.c.b" will be matched to that group 
> instead of the default



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #7682: [FLINK-11583][configuration] Support deprecated and fallback keys at once

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7682: 
[FLINK-11583][configuration] Support deprecated and fallback keys at once
URL: https://github.com/apache/flink/pull/7682#discussion_r256317833
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
 ##
 @@ -134,10 +138,13 @@
 * @return A new config options, with the given deprecated keys.
 */
public ConfigOption withDeprecatedKeys(String... deprecatedKeys) {
-   FallbackKey[] fallbackKeys = Arrays.stream(deprecatedKeys)
-   .map(FallbackKey::createDeprecatedKey)
+   final Stream newFallbackKeys = 
Arrays.stream(deprecatedKeys).map(FallbackKey::createFallbackKey);
 
 Review comment:
   Good idea. Thanks Chesnay.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11510) Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )

2019-02-13 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766973#comment-16766973
 ] 

Robert Metzger commented on FLINK-11510:


[~aljoscha] do you think this change is a good fit for Flink?

> Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] 
> positionToSums )
> -
>
> Key: FLINK-11510
> URL: https://issues.apache.org/jira/browse/FLINK-11510
> Project: Flink
>  Issue Type: Improvement
>Reporter: wangpeibin
>Assignee: wangpeibin
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The goal is to implement a KeyedStream API to sum with *multi field*.
> The example code with like:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream> src = 
> env.fromCollection(Arrays.asList(
> new Tuple3<>(1L, 2L, 2),
> new Tuple3<>(1L, 3L, 3),
> new Tuple3<>(1L, 4L, 4),
> new Tuple3<>(2L, 2L, 2),
> new Tuple3<>(2L, 3L, 3),
> new Tuple3<>(2L, 4L, 4)
> ));
> src.keyBy(0)
> .sum(new int[] {1, 2}) // right now only sum(1) or sum("f0") is supported
> .print();
> env.execute();
>  
> {code}
> and the output is expected to be:
> {code:java}
> 1> (1,2,2)
> 1> (2,2,2)
> 1> (1,5,5)
> 1> (2,5,5)
> 1> (1,9,9)
> 1> (2,9,9)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] rmetzger edited a comment on issue #7638: [FLINK-11510] [DataStream] Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )

2019-02-13 Thread GitBox
rmetzger edited a comment on issue #7638: [FLINK-11510] [DataStream] Add the 
MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )
URL: https://github.com/apache/flink/pull/7638#issuecomment-463130233
 
 
   Thank you for your contribution!
   I've commented on the PR because the tracking comment by flinkbot was 
outdated.
   Before I can review the pull request, I would like to get approval from 
somebody more familiar with the Flink API whether we have consensus to actually 
add this feature (see also the contribution guide: 
https://flink.apache.org/contribute-code.html#before-you-start-coding) 
   
   I've pinged Aljoscha in the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-11510
   Let's hope he soon confirms that the feature is a good fit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger edited a comment on issue #7638: [FLINK-11510] [DataStream] Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )

2019-02-13 Thread GitBox
rmetzger edited a comment on issue #7638: [FLINK-11510] [DataStream] Add the 
MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )
URL: https://github.com/apache/flink/pull/7638#issuecomment-463130233
 
 
   Thanks  a lot for your contribution!
   I've commented on the PR because the tracking comment by flinkbot was 
outdated.
   Before I can review the pull request, I would like to get approval from 
somebody more familiar with the Flink API whether we have consensus to actually 
add this feature (see also the contribution guide: 
https://flink.apache.org/contribute-code.html#before-you-start-coding) 
   
   I've pinged Aljoscha in the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-11510
   Let's hope he soon confirms that the feature is a good fit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11585) Prefix matching in ConfigDocsGenerator can result in wrong assignments

2019-02-13 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11585:
-
Fix Version/s: (was: 1.7.2)
   1.7.3

> Prefix matching in ConfigDocsGenerator can result in wrong assignments
> --
>
> Key: FLINK-11585
> URL: https://issues.apache.org/jira/browse/FLINK-11585
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.6.3, 1.7.1, 1.8.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.3, 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There are some cases where the key-prefix matching does not work as intended:
> * given the prefixes "a.b" and "a.b.c.d", then an option with a key "a.b.c.X" 
> will be assigned to the default groups instead of "a.b"
> * given a prefix "a.b", an option "a.c.b" will be matched to that group 
> instead of the default



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot edited a comment on issue #7638: [FLINK-11510] [DataStream] Add the MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )

2019-02-13 Thread GitBox
flinkbot edited a comment on issue #7638: [FLINK-11510] [DataStream] Add the 
MultiFieldSumAggregator to support KeyedStream.sum(int[] positionToSums )
URL: https://github.com/apache/flink/pull/7638#issuecomment-459738297
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @rmetzger [PMC]
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger edited a comment on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-13 Thread GitBox
rmetzger edited a comment on issue #7679: [FLINK-11501][Kafka Connector] Add 
ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#issuecomment-462943479
 
 
   Thank you :) 
   There's a cool new feature coming up in the pipeline: Labeling of the PR 
progress.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7691: [hotfix] update documentation of metrics fetcher update interval

2019-02-13 Thread GitBox
zentol commented on issue #7691: [hotfix] update documentation of metrics 
fetcher update interval
URL: https://github.com/apache/flink/pull/7691#issuecomment-463136827
 
 
   Thanks for fixing this!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol merged pull request #7691: [hotfix] update documentation of metrics fetcher update interval

2019-02-13 Thread GitBox
zentol merged pull request #7691: [hotfix] update documentation of metrics 
fetcher update interval
URL: https://github.com/apache/flink/pull/7691
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #5923: [FLINK-9253][network] make the maximum floating buffers count channel-type independent

2019-02-13 Thread GitBox
rmetzger commented on issue #5923: [FLINK-9253][network] make the maximum 
floating buffers count channel-type independent
URL: https://github.com/apache/flink/pull/5923#issuecomment-463141772
 
 
   I would propose to not include this PR into the 1.6.4 release.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-9776) Interrupt TaskThread only while in User/Operator code

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reopened FLINK-9776:
---

> Interrupt TaskThread only while in User/Operator code
> -
>
> Key: FLINK-9776
> URL: https://issues.apache.org/jira/browse/FLINK-9776
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.8.0
>
>
> Upon cancellation, the task thread is periodically interrupted.
> This helps to pull the thread out of blocking operations in the user code.
> Once the thread leaves the user code, the repeated interrupts may interfere 
> with the shutdown cleanup logic, causing confusing exceptions.
> We should stop sending the periodic interrupts once the thread leaves the 
> user code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9776) Interrupt TaskThread only while in User/Operator code

2019-02-13 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767026#comment-16767026
 ] 

Robert Metzger commented on FLINK-9776:
---

This change has actually been merged to Flink 1.6.0 (as you can see from the 
commit) 
[https://github.com/apache/flink/commit/53e6657658bc750b78c32e91fa7e2c02e8c54e33]

The fixversion has probably been pushed forward because the ticket was still 
open.

I'm adjusting the fix version from 1.6.4 to 1.6.0

> Interrupt TaskThread only while in User/Operator code
> -
>
> Key: FLINK-9776
> URL: https://issues.apache.org/jira/browse/FLINK-9776
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.8.0
>
>
> Upon cancellation, the task thread is periodically interrupted.
> This helps to pull the thread out of blocking operations in the user code.
> Once the thread leaves the user code, the repeated interrupts may interfere 
> with the shutdown cleanup logic, causing confusing exceptions.
> We should stop sending the periodic interrupts once the thread leaves the 
> user code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #5923: [FLINK-9253][network] make the maximum floating buffers count channel-type independent

2019-02-13 Thread GitBox
sunjincheng121 commented on issue #5923: [FLINK-9253][network] make the maximum 
floating buffers count channel-type independent
URL: https://github.com/apache/flink/pull/5923#issuecomment-463144891
 
 
   Hi @rmetzger  +1 for not include this PR into the 1.6.4 release.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9776) Interrupt TaskThread only while in User/Operator code

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-9776:
--
Fix Version/s: (was: 1.6.4)
   1.6.2

> Interrupt TaskThread only while in User/Operator code
> -
>
> Key: FLINK-9776
> URL: https://issues.apache.org/jira/browse/FLINK-9776
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.2, 1.8.0
>
>
> Upon cancellation, the task thread is periodically interrupted.
> This helps to pull the thread out of blocking operations in the user code.
> Once the thread leaves the user code, the repeated interrupts may interfere 
> with the shutdown cleanup logic, causing confusing exceptions.
> We should stop sending the periodic interrupts once the thread leaves the 
> user code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9776) Interrupt TaskThread only while in User/Operator code

2019-02-13 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767028#comment-16767028
 ] 

Robert Metzger commented on FLINK-9776:
---

There is no 1.6.0 version. Assigning it to 1.6.2 to indicate that it has been 
early in the 1.6.0 releases.

> Interrupt TaskThread only while in User/Operator code
> -
>
> Key: FLINK-9776
> URL: https://issues.apache.org/jira/browse/FLINK-9776
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.2, 1.8.0
>
>
> Upon cancellation, the task thread is periodically interrupted.
> This helps to pull the thread out of blocking operations in the user code.
> Once the thread leaves the user code, the repeated interrupts may interfere 
> with the shutdown cleanup logic, causing confusing exceptions.
> We should stop sending the periodic interrupts once the thread leaves the 
> user code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10079) Support external sink table in INSERT INTO statement

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-10079:
---
Fix Version/s: (was: 1.6.4)
   1.6.2

> Support external sink table in INSERT INTO statement
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.6.2, 1.8.0
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10079) Support external sink table in INSERT INTO statement

2019-02-13 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767034#comment-16767034
 ] 

Robert Metzger commented on FLINK-10079:


Changing fixed version from 1.6.4 to 1.6.2

> Support external sink table in INSERT INTO statement
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.6.4, 1.8.0
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10079) Support external sink table in INSERT INTO statement

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reopened FLINK-10079:


> Support external sink table in INSERT INTO statement
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.6.4, 1.8.0
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10079) Support external sink table in INSERT INTO statement

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger closed FLINK-10079.
--
Resolution: Fixed

> Support external sink table in INSERT INTO statement
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.8.0, 1.6.2
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
GJL commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase 
to new code base
URL: https://github.com/apache/flink/pull/7676#issuecomment-463149521
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7609: [FLINK-filesystems] [flink-s3-fs-hadoop] Add mirrored config key for s3 endpoint

2019-02-13 Thread GitBox
rmetzger commented on issue #7609: [FLINK-filesystems] [flink-s3-fs-hadoop] Add 
mirrored config key for s3 endpoint
URL: https://github.com/apache/flink/pull/7609#issuecomment-463149667
 
 
   Thank you for this contribution!
   This seems to be a valuable improvement for Flink, but I would like to first 
get a quick yes or no from @StephanEwen, who has worked a lot on the S3 
integration.
   If Stephan thinks the PR is a good fit for Flink, then I can help you 
merging it.
   
   By the way: I would suggest you to check out our [contribution 
guide](https://flink.apache.org/contribute-code.html#before-you-start-coding). 
We require our PRs be associated with a JIRA ticket. Once we have agreement on 
merging, I can help you creating the ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
flinkbot edited a comment on issue #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#issuecomment-462382064
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @GJL [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @GJL [committer]
   * ❔ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @GJL [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @GJL [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7604: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-02-13 Thread GitBox
rmetzger commented on issue #7604: [FLINK-11249] FlinkKafkaProducer011 can not 
be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7604#issuecomment-463151660
 
 
   Note to other reviewers, this is a companion pull request to this PR: 
https://github.com/apache/flink/pull/7405
   @pnowojski is actively reviewing the changes there, but he's away for 2 
weeks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on a change in pull request #7575: [FLINK-11418][docs] Fix version of bundler to 1.16.1

2019-02-13 Thread GitBox
rmetzger commented on a change in pull request #7575: [FLINK-11418][docs] Fix 
version of bundler to 1.16.1
URL: https://github.com/apache/flink/pull/7575#discussion_r256342038
 
 

 ##
 File path: docs/README.md
 ##
 @@ -12,11 +12,11 @@ dependencies are installed locally when you build the 
documentation through the
 `build_docs.sh` script. If you want to install the software manually, use 
Ruby's
 Bundler Gem to install all dependencies:
 
-gem install bundler
+gem install bundler -v 1.16.1
 bundle install
 
-Note that in Ubuntu based systems, it may be necessary to install the 
`ruby-dev`
-via apt to build native code.
+Note that in Ubuntu based systems, it may be necessary to install the following
+packages `rubygems ruby-dev libssl-dev build-essential`.
 
 Review comment:
   Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256341909
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPool slotPool;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
 
 Review comment:
   sounds good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256342262
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPool slotPool;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
 
 Review comment:
   I'll revisit this part on the second pass and try to make up my mind ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7682: [FLINK-11583][configuration] Support deprecated and fallback keys at once

2019-02-13 Thread GitBox
zentol commented on issue #7682: [FLINK-11583][configuration] Support 
deprecated and fallback keys at once
URL: https://github.com/apache/flink/pull/7682#issuecomment-463153846
 
 
   @tillrohrmann Do you want to take another look?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256343326
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##
 @@ -0,0 +1,1324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot pool serves slot request issued by {@link ExecutionGraph}. It will 
attempt to acquire new slots
+ * from the ResourceManager when it cannot serve a slot request. If no 
ResourceManager is currently available,
+ * or it gets a decline from the ResourceManager, or a request times out, it 
fails the slot request. The slot pool also
+ * holds all the slots that were offered to it and accepted, and can thus 
provides registered free slots even if the
+ * ResourceManager is down. The slots will only be released when they are 
useless, e.g. when the job is fully running
+ * but we still have some free slots.
+ *
+ * All the allocation or the slot offering will be identified by self 
generated AllocationID, we will use it to
+ * eliminate ambiguities.
+ *
+ * TODO : Make pending requests location preference aware
+ * TODO : Make pass location preferences to ResourceManager when sending a 
slot request
+ */
+public class SlotPoolImpl implements SlotPool, AllocatedSlotActions {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** The interval (in milliseconds) in which the SlotPool writes its 
slot distribution on debug level. */
+   private static final int STATUS_LOG_INTERVAL_MS = 60_000;
+
+   private final JobID jo

[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256343812
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##
 @@ -0,0 +1,1324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot pool serves slot request issued by {@link ExecutionGraph}. It will 
attempt to acquire new slots
+ * from the ResourceManager when it cannot serve a slot request. If no 
ResourceManager is currently available,
+ * or it gets a decline from the ResourceManager, or a request times out, it 
fails the slot request. The slot pool also
+ * holds all the slots that were offered to it and accepted, and can thus 
provides registered free slots even if the
+ * ResourceManager is down. The slots will only be released when they are 
useless, e.g. when the job is fully running
+ * but we still have some free slots.
+ *
+ * All the allocation or the slot offering will be identified by self 
generated AllocationID, we will use it to
+ * eliminate ambiguities.
+ *
+ * TODO : Make pending requests location preference aware
+ * TODO : Make pass location preferences to ResourceManager when sending a 
slot request
+ */
+public class SlotPoolImpl implements SlotPool, AllocatedSlotActions {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** The interval (in milliseconds) in which the SlotPool writes its 
slot distribution on debug level. */
+   private static final int STATUS_LOG_INTERVAL_MS = 60_000;
+
+   private final JobID jo

[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256344303
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
 ##
 @@ -19,54 +19,55 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
 
 import org.junit.rules.ExternalResource;
 
 import javax.annotation.Nonnull;
 
+import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /**
- * {@link ExternalResource} which provides a {@link SlotPool}.
+ * {@link ExternalResource} which provides a {@link SlotPoolImpl}.
  */
 public class SlotPoolResource extends ExternalResource {
 
 Review comment:
   Ok, I'll double check it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256344401
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
 ##
 @@ -69,33 +62,18 @@
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the SlotPool using a proper RPC setup.
+ * Tests for the SlotPoolImpl interactions.
  */
-public class SlotPoolRpcTest extends TestLogger {
-
-   private static RpcService rpcService;
-
-   private static final Time timeout = Time.seconds(10L);
+public class SlotPoolInteractionsTest extends TestLogger {
 
 Review comment:
   Alright, thanks for the clarification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7575: [FLINK-11418][docs] Fix version of bundler to 1.16.1

2019-02-13 Thread GitBox
rmetzger commented on issue #7575: [FLINK-11418][docs] Fix version of bundler 
to 1.16.1
URL: https://github.com/apache/flink/pull/7575#issuecomment-463155627
 
 
   I will push this fix to master now


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7676: [FLINK-11364][tests] 
Port TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r256345295
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 ##
 @@ -74,8 +74,13 @@ public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfigurati
}
 
@Override
-   public void startTaskExecutor(boolean localCommunication) throws 
Exception {
 
 Review comment:
   Actually, this functionality was needed for #7690.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7575: [FLINK-11418][docs] Fix version of bundler to 1.16.1

2019-02-13 Thread GitBox
asfgit closed pull request #7575: [FLINK-11418][docs] Fix version of bundler to 
1.16.1
URL: https://github.com/apache/flink/pull/7575
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11418) Unable to build docs in Docker image

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger closed FLINK-11418.
--
   Resolution: Fixed
Fix Version/s: 1.8.0

Pushed fix to master in 5e8e00b463 for release 1.8.0

> Unable to build docs in Docker image
> 
>
> Key: FLINK-11418
> URL: https://issues.apache.org/jira/browse/FLINK-11418
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Running 
> {code:java}
> cd flink/docs/docker
> ./run.sh{code}
>  
> And then in the container
> {code:java}
> Welcome to Apache Flink docs
> To build, execute
> ./build_docs.sh
> To watch and regenerate automatically
> ./build_docs.sh -p
> and access http://localhost:4000
> bash-4.4$ ./build_docs.sh -p
> Traceback (most recent call last):
> 2: from /usr/local/bin/bundle:23:in `'
> 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path'
> /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem 
> bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code}
> I believe there's something wrong.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #7682: [FLINK-11583][configuration] Support deprecated and fallback keys at once

2019-02-13 Thread GitBox
tillrohrmann commented on a change in pull request #7682: 
[FLINK-11583][configuration] Support deprecated and fallback keys at once
URL: https://github.com/apache/flink/pull/7682#discussion_r256345558
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionTest.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ConfigOption}.
+ */
+public class ConfigOptionTest {
 
 Review comment:
   Let's extend `TestLogger` here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7519: FLINK-11378 - Allow writing to Hadoop compatible Filesystems via Hado…

2019-02-13 Thread GitBox
rmetzger commented on issue #7519: FLINK-11378 - Allow writing to Hadoop 
compatible Filesystems via Hado…
URL: https://github.com/apache/flink/pull/7519#issuecomment-463158636
 
 
   @martijnvdgrift since we are approaching the 1.8 release, it would be nice 
to get this into the release (If GCS really has all the semantics we expect)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11378) Allow HadoopRecoverableWriter to write to Hadoop compatible Filesystems

2019-02-13 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767056#comment-16767056
 ] 

Robert Metzger commented on FLINK-11378:


[~martijnvdgrift]: I gave you contributor permissions in our Jira (so that I 
can assign you) and assigned you, because you've opened a PR for this.

> Allow HadoopRecoverableWriter to write to Hadoop compatible Filesystems
> ---
>
> Key: FLINK-11378
> URL: https://issues.apache.org/jira/browse/FLINK-11378
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Reporter: Martijn van de Grift
>Assignee: Martijn van de Grift
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> At a client we're using Flink jobs to read data from Kafka and writing to 
> GCS. In earlier versions, we've used `BucketingFileSink` for this, but we 
> want to switch to the newer `StreamingFileSink`.
> Since we're running Flink on Google's DataProc, we're using the Hadoop 
> compatible GCS 
> [connector|https://github.com/GoogleCloudPlatform/bigdata-interop] made by 
> Google. This currently doesn't work on Flink, because Flink checks for a HDFS 
> scheme at 'HadoopRecoverableWriter'.
> We've successfully ran our jobs by creating a custom Flink Distro which has 
> the hdfs scheme check removed.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11378) Allow HadoopRecoverableWriter to write to Hadoop compatible Filesystems

2019-02-13 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-11378:
--

Assignee: Martijn van de Grift

> Allow HadoopRecoverableWriter to write to Hadoop compatible Filesystems
> ---
>
> Key: FLINK-11378
> URL: https://issues.apache.org/jira/browse/FLINK-11378
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Reporter: Martijn van de Grift
>Assignee: Martijn van de Grift
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> At a client we're using Flink jobs to read data from Kafka and writing to 
> GCS. In earlier versions, we've used `BucketingFileSink` for this, but we 
> want to switch to the newer `StreamingFileSink`.
> Since we're running Flink on Google's DataProc, we're using the Hadoop 
> compatible GCS 
> [connector|https://github.com/GoogleCloudPlatform/bigdata-interop] made by 
> Google. This currently doesn't work on Flink, because Flink checks for a HDFS 
> scheme at 'HadoopRecoverableWriter'.
> We've successfully ran our jobs by creating a custom Flink Distro which has 
> the hdfs scheme check removed.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #7690: [FLINK-11587][tests] Port CoLocationConstraintITCase to new code base

2019-02-13 Thread GitBox
TisonKun commented on a change in pull request #7690: [FLINK-11587][tests] Port 
CoLocationConstraintITCase to new code base
URL: https://github.com/apache/flink/pull/7690#discussion_r256355337
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for job scheduling.
+ */
+public class JobExecutionITCase extends TestLogger {
+
+   private static final int SLOTS_PER_TM = 11;
+   private static final int NUM_TMS = 2;
+   private static final int PARALLELISM = SLOTS_PER_TM * NUM_TMS;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+   .setNumberTaskManagers(NUM_TMS)
+   .build());
+
+   @Test
+   public void testCoLocationConstraintJobExecution() throws Exception {
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   final MiniCluster miniCluster = 
MINI_CLUSTER_RESOURCE.getMiniCluster();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.get().isSuccess(), is(true));
+   }
+
+   private JobGraph createJobGraph(int parallelism) {
+   final JobVertex sender = new JobVertex("Sender");
+   sender.setParallelism(parallelism);
+   sender.setInvokableClass(Sender.class);
+
+   final JobVertex receiver = new JobVertex("Receiver");
+   receiver.setParallelism(parallelism);
+   receiver.setInvokableClass(Receiver.class);
+
+   final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+   receiver.setSlotSharingGroup(slotSharingGroup);
+   sender.setSlotSharingGroup(slotSharingGroup);
+
+   receiver.setStrictlyCoLocatedWith(sender);
 
 Review comment:
   make sense :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun closed pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

2019-02-13 Thread GitBox
TisonKun closed pull request #6883: [FLINK-10610] [tests] Port slot sharing 
cases to new codebase
URL: https://github.com/apache/flink/pull/6883
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10610) Port slot sharing cases to new codebase

2019-02-13 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun closed FLINK-10610.

Resolution: Duplicate

> Port slot sharing cases to new codebase
> ---
>
> Key: FLINK-10610
> URL: https://issues.apache.org/jira/browse/FLINK-10610
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Port {{CoLocationConstraintITCase}} and {{SlotSharingITCase}} to new codebase



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7689: [FLINK-11586][tests] Remove legacy SlotSharingITCase

2019-02-13 Thread GitBox
zentol commented on issue #7689: [FLINK-11586][tests] Remove legacy 
SlotSharingITCase
URL: https://github.com/apache/flink/pull/7689#issuecomment-463176833
 
 
   @flinkbot approve all 
   
   +1 for commit `eccdc2465b7afc90564d23fd730910b9f7dd877d` (We can merge it 
before the other PRs have been finished)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7689: [FLINK-11586][tests] Remove legacy SlotSharingITCase

2019-02-13 Thread GitBox
flinkbot edited a comment on issue #7689: [FLINK-11586][tests] Remove legacy 
SlotSharingITCase
URL: https://github.com/apache/flink/pull/7689#issuecomment-462952910
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @zentol [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @zentol [PMC]
   * ❔ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @zentol [PMC]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @zentol [PMC]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7690: [FLINK-11587][tests] Port CoLocationConstraintITCase to new code base

2019-02-13 Thread GitBox
zentol commented on issue #7690: [FLINK-11587][tests] Port 
CoLocationConstraintITCase to new code base
URL: https://github.com/apache/flink/pull/7690#issuecomment-463178398
 
 
   @flinkbot approve description
   @flinkbot approve consensus
   @flinkbot approve architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7457: [FLINK-11297][docs] Add a doc link of jobmanager ha details.

2019-02-13 Thread GitBox
rmetzger commented on issue #7457: [FLINK-11297][docs] Add a doc link of 
jobmanager ha details.
URL: https://github.com/apache/flink/pull/7457#issuecomment-463178624
 
 
   I'm merging this change


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >