[GitHub] yanghua commented on issue #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-02-14 Thread GitBox
yanghua commented on issue #7571: [FLINK-10724] Refactor failure handling in 
check point coordinator
URL: https://github.com/apache/flink/pull/7571#issuecomment-463938350
 
 
   Hi @kl0u can you help review this PR?  Or @tillrohrmann Can you recommend a 
committer to help review this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-02-14 Thread GitBox
yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor 
failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r257126465
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1277,6 +1293,34 @@ private void discardCheckpoint(PendingCheckpoint 
pendingCheckpoint, @Nullable Th
}
}
 
+   private void tryHandleCheckpointException(DeclineCheckpoint message, 
PendingCheckpoint currentCheckpoint) {
+   boolean needFailJob = message.getReason() != null && 
!(message.getReason() instanceof CheckpointDeclineException)
+   && failOnCheckpointingErrors && 
!currentCheckpoint.getProps().isSavepoint();
+
+   if (needFailJob) {
+   boolean processed = false;
+   ExecutionAttemptID failedExecutionID = 
message.getTaskExecutionId();
+   for (ExecutionVertex vertex : tasksToWaitFor) {
+   if 
(vertex.getCurrentExecutionAttempt().getAttemptId().equals(failedExecutionID)) {
+   if (currentPeriodicTrigger != null) {
+   
currentPeriodicTrigger.cancel(true);
+   currentPeriodicTrigger = null;
+   }
+
+   vertex.fail(message.getReason());
 
 Review comment:
   We can't make `Execution` or `ExecutionGraph` fail in 
`CheckpointCoordinator`, so we can throw out the decision.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-02-14 Thread GitBox
yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor 
failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r257125937
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ##
 @@ -308,8 +309,12 @@ public void apply(
env.execute("Tumbling Window Test");
}
catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   if (e.getCause().getCause() instanceof 
CancellationException) {
 
 Review comment:
   There is a concurrent cancellation exception, which is initially suspected 
to be caused by a mini cluster (multithreaded analog flink cluster) executing a 
job. So, I am currently ignoring this exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10819) The instability problem of CI, JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test fail.

2019-02-14 Thread Congxian Qiu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769016#comment-16769016
 ] 

Congxian Qiu commented on FLINK-10819:
--

Another instance:  https://travis-ci.org/klion26/flink/jobs/493604011

> The instability problem of CI, 
> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test 
> fail.
> ---
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11618) [state] Refactor operator state repartition mechanism

2019-02-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11618:
---
Labels: pull-request-available  (was: )

> [state] Refactor operator state repartition mechanism
> -
>
> Key: FLINK-11618
> URL: https://issues.apache.org/jira/browse/FLINK-11618
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently we have state assignment strategy of operator state below:
>  * When parallelism not changed:
>  ** If we only have even-split redistributed state, state assignment would 
> try to keep as the same as previously (actually not always the same).
>  ** If we have union redistributed state, all the operator state would be 
> redistributed as the new state assignment.
>  * When parallelism changed:
>  ** all the operator state would be redistributed as the new state assignment.
> There existed two problems *when parallelism not changed*:
>  # If we only have even-split redistributed state, current implementation 
> actually cannot ensure state assignment to keep as the same as previously. 
> This is because current 
> {{StateAssignmentOperation#collectPartitionableStates}} would repartition 
> {{managedOperatorStates}} without subtask-index information. Take an example, 
> if we have a operator-state with parallelism as 2, and subtask-0's 
> managed-operatorstate is empty while subtask-1 not. Although new parallelism 
> still keeps as 2, after 
> {{StateAssignmentOperation#collectPartitionableStates}} and state assigned, 
> subtask-0 would be assigned the managed-operatorstate while subtask-1 got 
> none.
>  # We should only redistribute union state and not touch the even-split 
> state. Redistribute even-split state would cause unexpected behavior after 
> {{RestartPipelinedRegionStrategy}} supported to restore state.
> We should fix the above two problems and this issue is a prerequisite of 
> FLINK-10712 and FLINK-10713 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot commented on issue #7711: [FLINK-11618][state] Refactor operator state repartition mechanism

2019-02-14 Thread GitBox
flinkbot commented on issue #7711: [FLINK-11618][state] Refactor operator state 
repartition mechanism
URL: https://github.com/apache/flink/pull/7711#issuecomment-463928531
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Myasuka opened a new pull request #7711: [FLINK-11618][state] Refactor operator state repartition mechanism

2019-02-14 Thread GitBox
Myasuka opened a new pull request #7711: [FLINK-11618][state] Refactor operator 
state repartition mechanism
URL: https://github.com/apache/flink/pull/7711
 
 
   ## What is the purpose of the change
   There existed two problems during state assignment when parallelism not 
changed:
   
   1.  If we only have even-split redistributed state, current implementation 
actually cannot ensure state assignment to keep as the same as previously. This 
is because current `StateAssignmentOperation#collectPartitionableStates` would 
repartition managedOperatorStates without subtask-index information. Take an 
example, if we have a operator-state with parallelism as 2, and subtask-0's 
managed-operatorstate is empty while subtask-1 not. Although new parallelism 
still keeps as 2, after `StateAssignmentOperation#collectPartitionableStates` 
and state assigned, subtask-0 would be assigned the managed-operatorstate while 
subtask-1 got none.
   1.  We should only redistribute union state and not touch the even-split 
state. Redistribute even-split state would cause unexpected behavior after 
`RestartPipelinedRegionStrategy` supported to restore state.
   
   This PR will fix the above two problems.
   
   ## Brief change log
   
 - Refactor `StateAssignmentOperation#collectPartitionableStates` to let 
returned `managedOperatorStates` has subtask-index information.
 - Refactor API of `OperatorStateRepartitioner` and its implementation 
`RoundRobinOperatorStateRepartitioner`. The logical of comparing old and new 
parallelism is now located in `OperatorStateRepartitioner#repartitionState`.
 - Refactor `AbstractStreamOperatorTestHarness` to extract the logic of  
repartitioning operator state into a new method 
`AbstractStreamOperatorTestHarness#repartitionOperatorState` instead of 
previously contained in `AbstractStreamOperatorTestHarness#initializeState`.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added `StateAssignmentOperationTest` to verify the logic of 
`StateAssignmentOperation#reDistributePartitionableStates`.
 - Existed all tests using `AbstractStreamOperatorTestHarness` could verify 
the refactor of `StateAssignmentOperation` generate the expected results as 
before.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, this could affect state 
assignment when restoring checkpoint
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable, this is a 
internal implementation, current doc should be enough.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10937) Add scripts to create docker image for k8s

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10937:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Add scripts to create docker image for k8s
> --
>
> Key: FLINK-10937
> URL: https://issues.apache.org/jira/browse/FLINK-10937
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> Add script to build docker image for flink on native k8s. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10936) Implement Command line tools

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10936:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Implement Command line tools
> 
>
> Key: FLINK-10936
> URL: https://issues.apache.org/jira/browse/FLINK-10936
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> Implement command tools to start kubernetes sessions: 
>  * k8s-session.sh to start and stop a session like we did in yarn-session.sh
>  * customized command line that will be invoked by CliFrontEnd and 
> ./bin/flink run to submit job to kubernetes cluster



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10935) Implement KubeClient with Faric8 Kubernetes clients

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10935:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Implement KubeClient with Faric8 Kubernetes clients 
> 
>
> Key: FLINK-10935
> URL: https://issues.apache.org/jira/browse/FLINK-10935
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> Implement KubeClient with Faric8 Kubernetes clients and add tests



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9495) Implement ResourceManager for Kubernetes

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-9495:
--

Assignee: Chunhui Shi  (was: JIN SUN)

> Implement ResourceManager for Kubernetes
> 
>
> Key: FLINK-9495
> URL: https://issues.apache.org/jira/browse/FLINK-9495
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: Chunhui Shi
>Priority: Major
>
> I noticed there is no issue for developing a Kubernetes specific 
> ResourceManager under FLIP-6, so I am creating this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10934) Implement KubernetesJobClusterEntrypoint

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10934:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Implement KubernetesJobClusterEntrypoint
> 
>
> Key: FLINK-10934
> URL: https://issues.apache.org/jira/browse/FLINK-10934
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> * Implement KubernetesJobClusterEntrypoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10939) Add documents for Flink on native k8s

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10939:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Add documents for Flink on native k8s
> -
>
> Key: FLINK-10939
> URL: https://issues.apache.org/jira/browse/FLINK-10939
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10938) Enable Flink on native k8s E2E Tests in Travis CI

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10938:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Enable Flink on native k8s E2E Tests in Travis CI 
> --
>
> Key: FLINK-10938
> URL: https://issues.apache.org/jira/browse/FLINK-10938
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> Add E2E tests to verify Flink on K8s integration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10968) Implement TaskManager Entrypoint

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10968:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Implement TaskManager Entrypoint
> 
>
> Key: FLINK-10968
> URL: https://issues.apache.org/jira/browse/FLINK-10968
> Project: Flink
>  Issue Type: Sub-task
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> implement the main() entrypoint to start task manager pod.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10933) Implement KubernetesSessionClusterEntrypoint

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10933:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Implement KubernetesSessionClusterEntrypoint
> 
>
> Key: FLINK-10933
> URL: https://issues.apache.org/jira/browse/FLINK-10933
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>
> * Implement KubernetesSessionClusterEntrypoint
>  * Implement TaskManager Entrypoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on a change in pull request #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes.

2019-02-14 Thread GitBox
sunjincheng121 commented on a change in pull request #7664: 
[FLINK-11449][table] Uncouple the Expression class from RexNodes.
URL: https://github.com/apache/flink/pull/7664#discussion_r255337933
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ##
 @@ -414,15 +414,15 @@ abstract class BatchTableEnvironment(
 * @tparam T The type of the [[DataSet]].
 */
   protected def registerDataSetInternal[T](
-  name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
+  name: String, dataSet: DataSet[T], fields: Array[PlannerExpression]): 
Unit = {
 
 Review comment:
   The `XXXInternal` methods are internal methods that the user does not 
perceive, so IMO there is no need to become an Expression? ( just reduce the 
change code for current PR?)
   
   So far I have not found the benefits of PlannerExpression to inherit 
Expression(Except reduce the some change code), because we already have the 
visitor mode to establish the conversion bridge from Expression to 
PlannerExpression. Expression is used at the user API level, and Expression is 
converted to PlannerExpression at the implementation level. We can indeed 
inherit PlannerExpression as Expression. Can you elaborate on the benefits of 
doing this?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes.

2019-02-14 Thread GitBox
sunjincheng121 commented on a change in pull request #7664: 
[FLINK-11449][table] Uncouple the Expression class from RexNodes.
URL: https://github.com/apache/flink/pull/7664#discussion_r255365157
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/UnresolvedFieldReference.java
 ##
 @@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+/**
+ * Unresolved field reference expression.
+ */
+public class UnresolvedFieldReference extends LeafExpression {
 
 Review comment:
   How about call it `FieldReference` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-9955) Kubernetes ClusterDescriptor

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-9955:
--

Assignee: Chunhui Shi  (was: JIN SUN)

> Kubernetes ClusterDescriptor
> 
>
> Key: FLINK-9955
> URL: https://issues.apache.org/jira/browse/FLINK-9955
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Reporter: Till Rohrmann
>Assignee: Chunhui Shi
>Priority: Major
> Fix For: 1.8.0
>
>
> In order to start programmatically a Flink cluster on Kubernetes, we need a 
> {{KubernetesClusterDescriptor}} implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9953) Active Kubernetes integration

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-9953:
--

Assignee: Chunhui Shi  (was: JIN SUN)

> Active Kubernetes integration
> -
>
> Key: FLINK-9953
> URL: https://issues.apache.org/jira/browse/FLINK-9953
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, ResourceManager
>Reporter: Till Rohrmann
>Assignee: Chunhui Shi
>Priority: Major
> Fix For: 1.8.0
>
>
> This is the umbrella issue tracking Flink's active Kubernetes integration. 
> Active means in this context that the {{ResourceManager}} can talk to 
> Kubernetes to launch new pods similar to Flink's Yarn and Mesos integration.
>  
> Document can be found here: 
> [https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit?usp=sharing]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10932) Initial flink-kubernetes module with empty implementation

2019-02-14 Thread Chunhui Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chunhui Shi reassigned FLINK-10932:
---

Assignee: Chunhui Shi  (was: JIN SUN)

> Initial flink-kubernetes module with empty implementation
> -
>
> Key: FLINK-10932
> URL: https://issues.apache.org/jira/browse/FLINK-10932
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>  Labels: pull-request-available
>
> Initial the skeleton module to start native kubernetes integration. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11619) Make ScheduleMode configurable via user code or configuration file

2019-02-14 Thread yuqi (JIRA)
yuqi created FLINK-11619:


 Summary: Make ScheduleMode configurable via user code or 
configuration file 
 Key: FLINK-11619
 URL: https://issues.apache.org/jira/browse/FLINK-11619
 Project: Flink
  Issue Type: Improvement
Reporter: yuqi
Assignee: yuqi


Currently, Schedule mode for stream job is always 
see StreamingJobGraphGenerator#createJobGraph

{code:java}
// make sure that all vertices start immediately
jobGraph.setScheduleMode(ScheduleMode.EAGER);
{code}

on this point, we can make ScheduleMode configurable to user so as to adapt 
different environment. Users can set this option via env.setScheduleMode() in 
code, or make it optional in configuration. 

Anyone's help and suggestions is welcomed.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11334) Migrate enum serializers to use new serialization compatibility abstractions

2019-02-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11334:
---
Labels: pull-request-available  (was: )

> Migrate enum serializers to use new serialization compatibility abstractions
> 
>
> Key: FLINK-11334
> URL: https://issues.apache.org/jira/browse/FLINK-11334
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: boshu Zheng
>Priority: Major
>  Labels: pull-request-available
>
> This subtask covers migration of:
> * EnumSerializerConfigSnapshot
> * ScalaEnumSerializerConfigSnapshot
> to use the new serialization compatibility APIs ({{TypeSerializerSnapshot}} 
> and {{TypeSerializerSchemaCompatibility).
> The enum serializer snapshots should be implemented so that on restore the 
> order of Enum constants can be reordered (a case for serializer 
> reconfiguration), as well as adding new Enum constants.
> Serializers are only considered to have completed migration according to the 
> defined list of things to check in FLINK-11327.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot commented on issue #7710: [FLINK-11334][core] Migrate enum serializers to use new serialization compatibility abstractions

2019-02-14 Thread GitBox
flinkbot commented on issue #7710: [FLINK-11334][core] Migrate enum serializers 
to use new serialization compatibility abstractions
URL: https://github.com/apache/flink/pull/7710#issuecomment-463891813
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 opened a new pull request #7710: [FLINK-11334][core] Migrate enum serializers to use new serialization compatibility abstractions

2019-02-14 Thread GitBox
klion26 opened a new pull request #7710: [FLINK-11334][core] Migrate enum 
serializers to use new serialization compatibility abstractions
URL: https://github.com/apache/flink/pull/7710
 
 
   
   ## What is the purpose of the change
   
   Migrate enum serializers to use new serialization compatibility abstractions
   
   ## Brief change log
   
   This commit 097a3b9 touches EnumSerializer:
- replace an `EnumSerializerSnapshot` when calling 
`EnumSerializer#snapshotConfiguration`.
- add a test `EnumSerializerSnapshotMigrationTest` to varify the change.
- remove function `EnumSerializer#ensureCompatibility`.
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as 
`EnumSerializerTest`.
   
   and  added a migration test `EnumSerializerSnapshotMigrationTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**yes**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**not applicable**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] KurtYoung edited a comment on issue #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes.

2019-02-14 Thread GitBox
KurtYoung edited a comment on issue #7664: [FLINK-11449][table] Uncouple the 
Expression class from RexNodes.
URL: https://github.com/apache/flink/pull/7664#issuecomment-463553997
 
 
   Thanks @sunjincheng121 for this great effort. I just quickly go through the 
changes, and have some design comments:
   1. It appears to me that i can't tell the differences between `Expression` 
and `PlannerExpression`. What are the scenarios for both expressions? Take 
`FilterableTableSource` for example, currently it takes a list of `Expression` 
for input. But clearly the method `applyPredicate` happens during optimize 
phase, my feeling is we should use `PlannerExpression` during optimize. (I know 
`FilterableTableSource` is API and should only accept `Expression` which is 
also an API, but the logic seems a little weird).
   
   2. What about we just use current `Expression` for API, by getting rid of 
the `toRexNode` method. The only problem is the number of expressions is quite 
big and keeps increasing. To solve this, we could follow the advices @twalthr 
had suggested, we categorize all expressions we currently have, and squash some 
all them. For example, we can squash "IF", "ABS" into `CallExpression`, and use 
`FunctionDefinition` to represent details information. Quite similar with 
Calcite's `RexNode` and `SqlOperator`. And for `ExpressionVisitor`, we can 
directly mapping `Expression` to `RexNode`, which seems to be much clearer. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code

2019-02-14 Thread GitBox
carp84 commented on a change in pull request #7674: [FLINK-10043] [State 
Backends] Refactor RocksDBKeyedStateBackend object 
construction/initialization/restore code
URL: https://github.com/apache/flink/pull/7674#discussion_r257088835
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackendBuilder.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.util.TernaryBoolean;
+
+import java.util.Collection;
+
+public class OperatorStateBackendBuilder implements 
StateBackendBuilder {
 
 Review comment:
   Ok, will focus on the rocksdb backend refactor here and maybe we could use 
the builder way for all backends in other PRs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code

2019-02-14 Thread GitBox
carp84 commented on a change in pull request #7674: [FLINK-10043] [State 
Backends] Refactor RocksDBKeyedStateBackend object 
construction/initialization/restore code
URL: https://github.com/apache/flink/pull/7674#discussion_r257088134
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackendBuilder.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.util.TernaryBoolean;
+
+import java.util.Collection;
+
+public class OperatorStateBackendBuilder implements 
StateBackendBuilder {
+   private final Environment env;
+   private final String operatorIdentifier;
+   protected Collection restoredState;
+   private TernaryBoolean asynchronousSnapshots;
+
+   public OperatorStateBackendBuilder(Environment env,
+  
String operatorIdentifier) {
+   this.env = env;
+   this.operatorIdentifier = operatorIdentifier;
+   }
+
+   @Override
+   public OperatorStateBackend build() {
+   return new DefaultOperatorStateBackend(
+   env.getUserClassLoader(),
+   env.getExecutionConfig(),
+   isUsingAsynchronousSnapshots());
+   }
+
+   @Override
+   public OperatorStateBackendBuilder setRestoreStateHandles(Collection 
restoredStateHandles) {
 
 Review comment:
   The `setRestoreStateHandles` method has been removed from the builder 
interface following previous review suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise merged pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions

2019-02-14 Thread GitBox
tweise merged pull request #7672: [FLINK-11568][kinesis] Don't obscure 
important Kinesis exceptions
URL: https://github.com/apache/flink/pull/7672
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions

2019-02-14 Thread GitBox
flinkbot edited a comment on issue #7672: [FLINK-11568][kinesis] Don't obscure 
important Kinesis exceptions
URL: https://github.com/apache/flink/pull/7672#issuecomment-461938418
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tweise [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tweise [committer]
   * ❔ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @tweise [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @tweise [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on issue #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions

2019-02-14 Thread GitBox
tweise commented on issue #7672: [FLINK-11568][kinesis] Don't obscure important 
Kinesis exceptions
URL: https://github.com/apache/flink/pull/7672#issuecomment-463882250
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions

2019-02-14 Thread GitBox
tweise commented on a change in pull request #7672: [FLINK-11568][kinesis] 
Don't obscure important Kinesis exceptions
URL: https://github.com/apache/flink/pull/7672#discussion_r257085977
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/AlwaysThrowsDeserializationSchema.java
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import java.io.IOException;
+
+/**
+ * A DeserializationSchema which always throws an exception when the 
deserialize method is called. Also supports
+ * waiting on a latch until at least one exception has been thrown.
+ */
+public class AlwaysThrowsDeserializationSchema implements 
DeserializationSchema {
 
 Review comment:
   This class could have probably also been defined nested in the test and some 
of the boilerplate avoided by extending an existing schema with override for 
what is relevant here. Just something to keep in mind for future work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11613) Translate the "Project Template for Scala" page into Chinese

2019-02-14 Thread Tom Goong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Goong reassigned FLINK-11613:
-

Assignee: Tom Goong

> Translate the "Project Template for Scala" page into Chinese
> 
>
> Key: FLINK-11613
> URL: https://issues.apache.org/jira/browse/FLINK-11613
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Tom Goong
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/scala_api_quickstart.html
> The markdown file is located in 
> flink/docs/dev/projectsetup/scala_api_quickstart.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version

2019-02-14 Thread GitBox
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
URL: https://github.com/apache/flink/pull/7599#issuecomment-463874242
 
 
   Thanks @StefanRRichter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11373) CliFrontend cuts off reason for error messages

2019-02-14 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768840#comment-16768840
 ] 

leesf commented on FLINK-11373:
---

[~AT-Fieldless], hi, thanks for your advise, but i am still working on this 
issue.

> CliFrontend cuts off reason for error messages
> --
>
> Key: FLINK-11373
> URL: https://issues.apache.org/jira/browse/FLINK-11373
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Maximilian Michels
>Assignee: leesf
>Priority: Minor
>  Labels: pull-request-available, starter
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The CliFrontend seems to only print the first message in the strace trace and 
> not any of its causes.
> {noformat}
> bin/flink run /non-existing/path
> Could not build the program from JAR file.
> Use the help option (-h or --help) to get help on the command.
> {noformat}
> Notice, the underlying cause of this message is FileNotFoundException.
> Consider changing 
> a) the error message for this particular case 
> b) the way the stack trace messages are trimmed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11613) Translate the "Project Template for Scala" page into Chinese

2019-02-14 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf reassigned FLINK-11613:
-

Assignee: (was: leesf)

> Translate the "Project Template for Scala" page into Chinese
> 
>
> Key: FLINK-11613
> URL: https://issues.apache.org/jira/browse/FLINK-11613
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/scala_api_quickstart.html
> The markdown file is located in 
> flink/docs/dev/projectsetup/scala_api_quickstart.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257072070
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ##
 @@ -311,4 +323,19 @@ private static void setDeserializer(Properties props) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deSerName);
}
+
+
+   // 

+   // Rate Limiter specific methods
+   // 

+
+   /**
+* Set a rate limiter to ratelimit bytes read from Kafka.
+* @param kafkaRateLimiter
+* @param maxBytesPerSecond
+*/
+   public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter, 
long maxBytesPerSecond) {
 
 Review comment:
   Fixed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257072024
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/DefaultKafkaRateLimiter.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * A default KafkaRateLimiter that uses Guava's RateLimiter to throttle the 
bytes read from Kafka.
+ */
+public class DefaultKafkaRateLimiter implements FlinkConnectorRateLimiter {
 
 Review comment:
   I think that is a better name. Thanks for the suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257072051
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FlinkConnectorRateLimiter.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * An interface to create a ratelimiter
+ *
+ * The ratelimiter is configured via {@link #setRate(long)} and
+ * created via {@link #create()}.
+ * An example implementation can be found {@link DefaultKafkaRateLimiter}.
+ * */
+
+@PublicEvolving
+public interface FlinkConnectorRateLimiter {
+
+   void open(RuntimeContext runtimeContext);
+
+   /** Creates a rate limiter.  */
+T create();
 
 Review comment:
   This does look confusing. Essentially, we need `runtimeContext` to be 
initialized to be able to determine the per-subtask rate. Perhaps, only the 
`open()` method should suffice (and all related logic can be implemented 
there)? 
   
   As for `acquire()` not being here, that was my bad. I have fixed this to 
include `acquire()` and completely removed the Guava import in the 
`KafkaConsumerThread` class (which was not the case before). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7706: [FLINK-11617] [kinesis connector] Kinesis Connector getRecords() failure logging is misleading

2019-02-14 Thread GitBox
tweise commented on a change in pull request #7706: [FLINK-11617] [kinesis 
connector] Kinesis Connector getRecords() failure logging is misleading
URL: https://github.com/apache/flink/pull/7706#discussion_r257053063
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -262,8 +262,8 @@ public GetRecordsResult getRecords(String shardIterator, 
int maxRecordsToGet) th
}
 
if (getRecordsResult == null) {
-   throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxRetries +
-   " retry attempts returned 
ProvisionedThroughputExceededException.");
+   throw new RuntimeException("Retries exceeded for 
getRecords operation - all " + getRecordsMaxRetries +
 
 Review comment:
   Please also fix same issue further down in `getShardIterator` 
(https://github.com/apache/flink/pull/7706/files#diff-ed02b5340df65de06c19eb93fe90a920L340)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
tweise commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257046152
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/DefaultKafkaRateLimiter.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * A default KafkaRateLimiter that uses Guava's RateLimiter to throttle the 
bytes read from Kafka.
+ */
+public class DefaultKafkaRateLimiter implements FlinkConnectorRateLimiter {
 
 Review comment:
   Since there is nothing Kafka specific here, but the implementation is 
specific to Guava, would `GuavaFlinkConnectorRateLimiter` be a better name?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
tweise commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257047597
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FlinkConnectorRateLimiter.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * An interface to create a ratelimiter
+ *
+ * The ratelimiter is configured via {@link #setRate(long)} and
+ * created via {@link #create()}.
+ * An example implementation can be found {@link DefaultKafkaRateLimiter}.
+ * */
+
+@PublicEvolving
+public interface FlinkConnectorRateLimiter {
+
+   void open(RuntimeContext runtimeContext);
+
+   /** Creates a rate limiter.  */
+T create();
 
 Review comment:
   This is confusing, the rate limiter was already created by the user.
   
   Why is `acquire` not defined here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
tweise commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257045352
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ##
 @@ -311,4 +323,19 @@ private static void setDeserializer(Properties props) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deSerName);
}
+
+
+   // 

+   // Rate Limiter specific methods
+   // 

+
+   /**
+* Set a rate limiter to ratelimit bytes read from Kafka.
+* @param kafkaRateLimiter
+* @param maxBytesPerSecond
+*/
+   public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter, 
long maxBytesPerSecond) {
 
 Review comment:
   Why the second parameter? That should be a setter on 
`DefaultKafkaRateLimiter`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 commented on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
glaksh100 commented on issue #7679: [FLINK-11501][Kafka Connector] Add 
ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#issuecomment-463830228
 
 
   @tweise @becketqin I updated the code to include a 
`FlinkConnectorRateLimiter` and also included a `DefaultKafkaRateLimiter` that 
implements the interface using Guava's RateLimiter. Let me know what you think. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257040349
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+import java.util.Properties;
+
+/**
+ * A RateLimiterFactory that configures and creates a rate limiter.
+ */
+public class RateLimiterFactory {
+
+   /** Flag that indicates if ratelimiting is enabled. */
+   private static final String RATELIMITING_FLAG = 
"consumer.ratelimiting.enabled";
 
 Review comment:
   I ended up passing the rate value to the setter that sets the ratelimiter on 
the consumer. Let me know if that looks alright.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-14 Thread GitBox
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r257040325
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+import java.util.Properties;
+
+/**
+ * A RateLimiterFactory that configures and creates a rate limiter.
+ */
+public class RateLimiterFactory {
 
 Review comment:
   I have not made this change yet. Perhaps we could do this as a part of 
augmenting the consumer and making the code more customizable (in another PR 
i.e.) ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-02-14 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r257039386
 
 

 ##
 File path: flink-connectors/flink-connector-pubsub/pom.xml
 ##
 @@ -0,0 +1,90 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.8-SNAPSHOT
+   ..
+   
+
+   flink-connector-pubsub_${scala.binary.version}
+   flink-connector-pubsub
+
+   jar
+
+   
+   
+   
+   
+   com.google.cloud
+   google-cloud-bom
+   0.70.0-alpha
+   pom
+   import
+   
+   
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   com.google.cloud
+   google-cloud-pubsub
+   
+   
+   
+   
+   com.google.guava
+   guava-jdk5
+   
+   
+   provided
 
 Review comment:
   Done, I rebased the our commits on upstream master


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-02-14 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r257039169
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -562,6 +578,43 @@ under the License.



+   
+   
+   PubSub
+   package
+   
+   shade
+   
+   
+   
false
+   
true
+   
false
+   
+   
+   
org.apache.flink.streaming.examples.pubsub.PubSubExample
+   
+   
+   
PubSub
+   
+   
+   
org.apache.flink:flink-connector-pubsub_${scala.binary.version}
+   

+
org/apache/flink/streaming/connectors/pubsub/**
 
 Review comment:
   - Fixed the javadoc comment.
   - I've changed where the example is stored 
(flink-examples/flink-examples-build-helper/flink-examples-streaming-pubsub/), 
could you have a look at this again? Indentation is fixed through this change 
as well.
   - I've removed `SerializableCredentialsProvider`. The reason I added it is 
because several PubSub classes require a `CredentialsProvider` rather than 
`Credentials`. But I agree we might as well rewrap Credentials using Google's 
`FixedCredentialsProvider` (which is bascially the 
`SerializableCredentialsProvider`). Good one!
   
   **Regarding the BoundedPubSubSource:**
   
   The idea was this would be useful when running integration tests for 'end 
users'. Hence it's not in test scope. Now that we've been using this code 
ourselves for a while now, in our projects we prefer using the 'normal' 
PubSubSource and start/restart the job should it be needed for tests.
   
   Although I do like the concept of being able to `Bound` SourceFunctions like 
this (either a max received messages or idle time). I would propose I remove 
the `BoundedPubSubSource` all together, this allows us to simplify the Builder 
inside of the PubSubSource as well and make everything a bit cleaner.
   
   What do you think? @zentol @rmetzger 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base

2019-02-14 Thread GitBox
GJL commented on issue #7697: [FLINK-11578][tests] Port 
BackPressureStatsTrackerImplITCase to new code base
URL: https://github.com/apache/flink/pull/7697#issuecomment-463793079
 
 
   Thanks for the review... merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base

2019-02-14 Thread GitBox
GJL commented on a change in pull request #7697: [FLINK-11578][tests] Port 
BackPressureStatsTrackerImplITCase to new code base
URL: https://github.com/apache/flink/pull/7697#discussion_r257008935
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
 ##
 @@ -39,277 +32,200 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Simple back pressured task test.
+ *
+ * @see BackPressureStatsTrackerImpl
  */
 public class BackPressureStatsTrackerImplITCase extends TestLogger {
 
-   private static NetworkBufferPool networkBufferPool;
-   private static ActorSystem testActorSystem;
+   private static final int TIMEOUT_SECONDS = 10;
 
 Review comment:
   Will change to long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11597) Remove legacy JobManagerActorTestUtils

2019-02-14 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-11597:
-
Description: Remove legacy JobManagerActorTestUtils

> Remove legacy JobManagerActorTestUtils
> --
>
> Key: FLINK-11597
> URL: https://issues.apache.org/jira/browse/FLINK-11597
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.8.0
>
>
> Remove legacy JobManagerActorTestUtils



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot commented on issue #7709: [FLINK-11577][tests] Replace StackTraceSampleCoordinatorITCase

2019-02-14 Thread GitBox
flinkbot commented on issue #7709: [FLINK-11577][tests] Replace 
StackTraceSampleCoordinatorITCase 
URL: https://github.com/apache/flink/pull/7709#issuecomment-463787310
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL opened a new pull request #7709: [FLINK-11577][tests] Replace StackTraceSampleCoordinatorITCase

2019-02-14 Thread GitBox
GJL opened a new pull request #7709: [FLINK-11577][tests] Replace 
StackTraceSampleCoordinatorITCase 
URL: https://github.com/apache/flink/pull/7709
 
 
   ## What is the purpose of the change
   
   *This removes the StackTraceSampleCoordinatorITCase test, and replaces it 
with 
`StackTraceSampleServiceTest#testShouldReturnPartialResultIfTaskStopsRunningDuringSampling`*
   
   
   ## Brief change log
   
 - *See commits*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added unit test `StackTraceSampleServiceTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-10457) Support SequenceFile for StreamingFileSink

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-10457:
--

> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector, Streaming Connectors
>Affects Versions: 1.7.0
>Reporter: Jihyun Cho
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-11487) Support for writing data to Apache Flume

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-11487:
--

> Support for writing data to Apache Flume
> 
>
> Key: FLINK-11487
> URL: https://issues.apache.org/jira/browse/FLINK-11487
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.7.1
> Environment: JDK 1.8
> Scala 2.11
> Flink 1.7.1
> Apache Flume 1.6.0
>Reporter: ambition
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flume is a distributed, reliable, and available service for efficiently 
> collecting, aggregating, and moving large amounts of data, has many users. 
> Unfortunately, Flink does not currently support  with data to Flume. 
> The following is the official website of flume and github source address:
> [Apache Flume website|http://flume.apache.org/index.html] 
> [Apache Flume github|https://github.com/apache/flume]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10457) Support SequenceFile for StreamingFileSink

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10457:
-
Fix Version/s: 1.8.0

> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector, Streaming Connectors
>Affects Versions: 1.7.0
>Reporter: Jihyun Cho
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11487) Support for writing data to Apache Flume

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-11487.

  Resolution: Invalid
Release Note:   (was: [FLINK-4446] Remove Flume connector (now in Bahir))

Bahir contains a Flume connector

> Support for writing data to Apache Flume
> 
>
> Key: FLINK-11487
> URL: https://issues.apache.org/jira/browse/FLINK-11487
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.7.1
> Environment: JDK 1.8
> Scala 2.11
> Flink 1.7.1
> Apache Flume 1.6.0
>Reporter: ambition
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flume is a distributed, reliable, and available service for efficiently 
> collecting, aggregating, and moving large amounts of data, has many users. 
> Unfortunately, Flink does not currently support  with data to Flume. 
> The following is the official website of flume and github source address:
> [Apache Flume website|http://flume.apache.org/index.html] 
> [Apache Flume github|https://github.com/apache/flume]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10457) Support SequenceFile for StreamingFileSink

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10457.

Resolution: Fixed

> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector, Streaming Connectors
>Affects Versions: 1.7.0
>Reporter: Jihyun Cho
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11487) Support for writing data to Apache Flume

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11487:
-
Fix Version/s: (was: 1.7.2)
   (was: 1.8.0)

> Support for writing data to Apache Flume
> 
>
> Key: FLINK-11487
> URL: https://issues.apache.org/jira/browse/FLINK-11487
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.7.1
> Environment: JDK 1.8
> Scala 2.11
> Flink 1.7.1
> Apache Flume 1.6.0
>Reporter: ambition
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flume is a distributed, reliable, and available service for efficiently 
> collecting, aggregating, and moving large amounts of data, has many users. 
> Unfortunately, Flink does not currently support  with data to Flume. 
> The following is the official website of flume and github source address:
> [Apache Flume website|http://flume.apache.org/index.html] 
> [Apache Flume github|https://github.com/apache/flume]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10910) Harden Kubernetes e2e test

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10910:
-
Issue Type: Improvement  (was: Bug)

> Harden Kubernetes e2e test
> --
>
> Key: FLINK-10910
> URL: https://issues.apache.org/jira/browse/FLINK-10910
> Project: Flink
>  Issue Type: Improvement
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Dawid Wysakowicz
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The {{Kubernetes test}} (e2e test) sometimes fails with the following output:
> {code}
> ==
> Running 'Run Kubernetes test'
> ==
> TEST_DATA_DIR: 
> /home/admin/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-40594844780
> Flink dist directory: /home/admin/flink-1.7.0
> /home/admin/flink/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh:
>  line 44: none: command not f
> ound
> Using flink dist: ../../flink-dist/target/flink-*-bin
> ./
> ./flink-1.7-SNAPSHOT/
> ./flink-1.7-SNAPSHOT/LICENSE
> ./flink-1.7-SNAPSHOT/examples/
> ./flink-1.7-SNAPSHOT/examples/gelly/
> ./flink-1.7-SNAPSHOT/examples/gelly/flink-gelly-examples_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/
> ./flink-1.7-SNAPSHOT/examples/streaming/WindowJoin.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/SocketWindowWordCount.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/StateMachineExample.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/Kafka010Example.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/Kafka011Example.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/SessionWindowing.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/IncrementalLearning.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/KafkaExample.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/WordCount.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/Twitter.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/Iteration.jar
> ./flink-1.7-SNAPSHOT/examples/streaming/TopSpeedWindowing.jar
> ./flink-1.7-SNAPSHOT/examples/batch/
> ./flink-1.7-SNAPSHOT/examples/batch/KMeans.jar
> ./flink-1.7-SNAPSHOT/examples/batch/PageRank.jar
> ./flink-1.7-SNAPSHOT/examples/batch/WebLogAnalysis.jar
> ./flink-1.7-SNAPSHOT/examples/batch/WordCount.jar
> ./flink-1.7-SNAPSHOT/examples/batch/EnumTriangles.jar
> ./flink-1.7-SNAPSHOT/examples/batch/DistCp.jar
> ./flink-1.7-SNAPSHOT/examples/batch/TransitiveClosure.jar
> ./flink-1.7-SNAPSHOT/examples/batch/ConnectedComponents.jar
> ./flink-1.7-SNAPSHOT/examples/python/
> ./flink-1.7-SNAPSHOT/examples/python/streaming/
> ./flink-1.7-SNAPSHOT/examples/python/streaming/fibonacci.py
> ./flink-1.7-SNAPSHOT/examples/python/streaming/word_count.py
> ./flink-1.7-SNAPSHOT/examples/python/batch/
> ./flink-1.7-SNAPSHOT/examples/python/batch/TriangleEnumeration.py
> ./flink-1.7-SNAPSHOT/examples/python/batch/TPCHQuery3.py
> ./flink-1.7-SNAPSHOT/examples/python/batch/WebLogAnalysis.py
> ./flink-1.7-SNAPSHOT/examples/python/batch/TPCHQuery10.py
> ./flink-1.7-SNAPSHOT/examples/python/batch/WordCount.py
> ./flink-1.7-SNAPSHOT/examples/python/batch/__init__.py
> ./flink-1.7-SNAPSHOT/log/
> ./flink-1.7-SNAPSHOT/opt/
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-table_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-dropwizard-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-cep_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-graphite-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-gelly-scala_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-s3-fs-hadoop-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-statsd-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-queryable-state-runtime_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-streaming-python_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-ml_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-gelly_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-swift-fs-hadoop-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-cep-scala_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-s3-fs-presto-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-metrics-datadog-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/opt/flink-sql-client_2.11-1.7-SNAPSHOT.jar
> ./flink-1.7-SNAPSHOT/NOTICE
> ./flink-1.7-SNAPSHOT/README.txt
> ./flink-1.7-SNAPSHOT/lib/
> ./flink-1.7-SNAPSHOT/lib/flink-python_2.11-1.7-SNAPSHOT.jar
> 

[jira] [Updated] (FLINK-11201) Document SBT dependency requirements when using MiniClusterResource

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11201:
-
Summary: Document SBT dependency requirements when using 
MiniClusterResource  (was: flink-test-utils dependency issue)

> Document SBT dependency requirements when using MiniClusterResource
> ---
>
> Key: FLINK-11201
> URL: https://issues.apache.org/jira/browse/FLINK-11201
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: eugen yushin
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.2, 1.8.0
>
>
> Starting with Flink 1.7, there's lack of 
> `runtime.testutils.MiniClusterResource` class in `flink-test-utils` 
> distribution.
> Steps to reproduce (Scala code)
> build.sbt
> {code}
> name := "flink-17-test-issue"
> organization := "x.y.z"
> scalaVersion := "2.11.12"
> val flinkVersion = "1.7.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.scalatest" %% "scalatest" % "3.0.5" % Test,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
> //  ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier 
> Artifact.TestsClassifier
> )
> {code}
> test class:
> {code}
> class SimpleTest extends AbstractTestBase with FlatSpecLike {
>   implicit val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   env.setParallelism(1)
>   env.setRestartStrategy(RestartStrategies.noRestart())
>   "SimpleTest" should "work" in {
> val inputDs = env.fromElements(1,2,3)
> inputDs.print()
> env.execute()
>   }
> }
> {code}
> Results in:
> {code}
> A needed class was not found. This could be due to an error in your runpath. 
> Missing class: org/apache/flink/runtime/testutils/MiniClusterResource
> java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/testutils/MiniClusterResource
> ...
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.testutils.MiniClusterResource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 31 more
> {code}
> This can be fixed by adding flink-runtime distribution with test classifier 
> into dependencies list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann merged pull request #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever

2019-02-14 Thread GitBox
tillrohrmann merged pull request #7701: [FLINK-11602] Remove legacy 
AkkaJobManagerRetriever
URL: https://github.com/apache/flink/pull/7701
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-11602) Remove legacy AkkaJobManagerRetriever

2019-02-14 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11602.
---
Resolution: Fixed

Fixed via 
https://github.com/apache/flink/commit/02ff4bfe90d8e8b896c9f1a1bdbe8d43a48f5de7

> Remove legacy AkkaJobManagerRetriever
> -
>
> Key: FLINK-11602
> URL: https://issues.apache.org/jira/browse/FLINK-11602
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10721) Kafka discovery-loop exceptions may be swallowed

2019-02-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10721:
-
Summary: Kafka discovery-loop exceptions may be swallowed  (was: 
kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute 
in FlinkKafkaConsumerBase run method )

> Kafka discovery-loop exceptions may be swallowed
> 
>
> Key: FLINK-10721
> URL: https://issues.apache.org/jira/browse/FLINK-10721
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.6.2
>Reporter: zhaoshijie
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> In FlinkKafkaConsumerBase run method on line 721(master branch), if 
> kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
> exception then finally execute cancel method, cancel method will execute 
> kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
> handover.close, then result in handover.pollNext throw ClosedException),then 
> next code will not execute,especially discoveryLoopError not be throwed,so, 
> real culprit exception will be Swallowed.
> failed log like this:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Shoud we modify it as follows?
> {code:java}
> try {
>   kafkaFetcher.runFetchLoop();
>   } catch (Exception e) {
>   // if discoveryLoopErrorRef not null ,we should 
> throw real culprit exception
>   if (discoveryLoopErrorRef.get() != null){
>   throw new 
> RuntimeException(discoveryLoopErrorRef.get());
>   } else {
>   throw e;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10807) KafkaConsumer still consume removed topic after changing topics list

2019-02-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-10807.

Resolution: Duplicate

> KafkaConsumer still consume removed topic after changing topics list
> 
>
> Key: FLINK-10807
> URL: https://issues.apache.org/jira/browse/FLINK-10807
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.6.2
>Reporter: bupt_ljy
>Assignee: bupt_ljy
>Priority: Major
>
> subscribedPartitionsToStartOffsets in KafkaConsumerBase is getting values 
> from restoredState, which is initialized in initializeState and discovering 
> partitions. However, if we remove a topic in topics list and restore the 
> Flink program, the restoredState still keeps removed topic, and the fetcher 
> still fetches the data of the removed topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11618) [state] Refactor operator state repartition mechanism

2019-02-14 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-11618:
-
Description: 
Currently we have state assignment strategy of operator state below:
 * When parallelism not changed:
 ** If we only have even-split redistributed state, state assignment would try 
to keep as the same as previously (actually not always the same).
 ** If we have union redistributed state, all the operator state would be 
redistributed as the new state assignment.
 * When parallelism changed:
 ** all the operator state would be redistributed as the new state assignment.

There existed two problems *when parallelism not changed*:
 # If we only have even-split redistributed state, current implementation 
actually cannot ensure state assignment to keep as the same as previously. This 
is because current {{StateAssignmentOperation#collectPartitionableStates}} 
would repartition {{managedOperatorStates}} without subtask-index information. 
Take an example, if we have a operator-state with parallelism as 2, and 
subtask-0's managed-operatorstate is empty while subtask-1 not. Although new 
parallelism still keeps as 2, after 
{{StateAssignmentOperation#collectPartitionableStates}} and state assigned, 
subtask-0 would be assigned the managed-operatorstate while subtask-1 got none.
 # We should only redistribute union state and not touch the even-split state. 
Redistribute even-split state would cause unexpected behavior after 
{{RestartPipelinedRegionStrategy}} supported to restore state.

We should fix the above two problems and this issue is a prerequisite of 
FLINK-10712 and FLINK-10713 .

  was:
Currently we have state assignment strategy of operator state below:
 * When parallelism not changed:
 ** If we only have even-split redistributed state, state assignment would try 
to keep as the same as previously (actually not always the same).
 ** If we have union redistributed state, all the operator state would be 
redistributed as the new state assignment.
 * When parallelism changed:
 ** all the operator state would be redistributed as the new state assignment.

There existed two problems *when parallelism not changed*:
 # If we only have even-split redistributed state, current implementation 
actually cannot ensure state assignment to keep as the same as previously. This 
is because current {{StateAssignmentOperation#collectPartitionableStates}} 
would repartition {{managedOperatorStates}} without subtask-index information. 
Take and example, if we have a operator-state with parallelism as 2, and 
subtask-0's managed-operatorstate is empty while subtask-1 not. Although new 
parallelism still keeps as 2, after 
{{StateAssignmentOperation#collectPartitionableStates}}, subtask-0 would be 
assigned the managed-operatorstate but subtask-1 get none.
 # We should only redistribute union state and not touch the even-split state. 
Redistribute even-split state would cause unexpected behavior after 
{{RestartPipelinedRegionStrategy}} supported to restore state.

We should fix the above two problems and this issue is a prerequisite of 
FLINK-10712 and FLINK-10713 .


> [state] Refactor operator state repartition mechanism
> -
>
> Key: FLINK-11618
> URL: https://issues.apache.org/jira/browse/FLINK-11618
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> Currently we have state assignment strategy of operator state below:
>  * When parallelism not changed:
>  ** If we only have even-split redistributed state, state assignment would 
> try to keep as the same as previously (actually not always the same).
>  ** If we have union redistributed state, all the operator state would be 
> redistributed as the new state assignment.
>  * When parallelism changed:
>  ** all the operator state would be redistributed as the new state assignment.
> There existed two problems *when parallelism not changed*:
>  # If we only have even-split redistributed state, current implementation 
> actually cannot ensure state assignment to keep as the same as previously. 
> This is because current 
> {{StateAssignmentOperation#collectPartitionableStates}} would repartition 
> {{managedOperatorStates}} without subtask-index information. Take an example, 
> if we have a operator-state with parallelism as 2, and subtask-0's 
> managed-operatorstate is empty while subtask-1 not. Although new parallelism 
> still keeps as 2, after 
> {{StateAssignmentOperation#collectPartitionableStates}} and state assigned, 
> subtask-0 would be assigned the managed-operatorstate while subtask-1 got 
> none.
>  # We should only redistribute union state and not touch 

[jira] [Created] (FLINK-11618) [state] Refactor operator state repartition mechanism

2019-02-14 Thread Yun Tang (JIRA)
Yun Tang created FLINK-11618:


 Summary: [state] Refactor operator state repartition mechanism
 Key: FLINK-11618
 URL: https://issues.apache.org/jira/browse/FLINK-11618
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.8.0


Currently we have state assignment strategy of operator state below:
 * When parallelism not changed:
 ** If we only have even-split redistributed state, state assignment would try 
to keep as the same as previously (actually not always the same).
 ** If we have union redistributed state, all the operator state would be 
redistributed as the new state assignment.
 * When parallelism changed:
 ** all the operator state would be redistributed as the new state assignment.

There existed two problems *when parallelism not changed*:
 # If we only have even-split redistributed state, current implementation 
actually cannot ensure state assignment to keep as the same as previously. This 
is because current {{StateAssignmentOperation#collectPartitionableStates}} 
would repartition {{managedOperatorStates}} without subtask-index information. 
Take and example, if we have a operator-state with parallelism as 2, and 
subtask-0's managed-operatorstate is empty while subtask-1 not. Although new 
parallelism still keeps as 2, after 
{{StateAssignmentOperation#collectPartitionableStates}}, subtask-0 would be 
assigned the managed-operatorstate but subtask-1 get none.
 # We should only redistribute union state and not touch the even-split state. 
Redistribute even-split state would cause unexpected behavior after 
{{RestartPipelinedRegionStrategy}} supported to restore state.

We should fix the above two problems and this issue is a prerequisite of 
FLINK-10712 and FLINK-10713 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256882174
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
 ##
 @@ -77,8 +76,8 @@ public void testSimpleCoLocatedSlotScheduling() throws 
ExecutionException, Inter
 
final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
 
-   final SlotPoolGateway slotPoolGateway = 
slotPoolResource.getSlotPoolGateway();
-   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+   final SlotPoolImpl slotPoolGateway = 
slotPoolResource.getSlotPool();
 
 Review comment:
   But can be rewritten


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256881369
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
 ##
 @@ -77,8 +76,8 @@ public void testSimpleCoLocatedSlotScheduling() throws 
ExecutionException, Inter
 
final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
 
-   final SlotPoolGateway slotPoolGateway = 
slotPoolResource.getSlotPoolGateway();
-   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+   final SlotPoolImpl slotPoolGateway = 
slotPoolResource.getSlotPool();
 
 Review comment:
   It is actually required for method `offerSlot`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7692: [FLINK-11121][license] Check and update licensing notes for Aliyun FS

2019-02-14 Thread GitBox
zentol commented on issue #7692: [FLINK-11121][license] Check and update 
licensing notes for Aliyun FS
URL: https://github.com/apache/flink/pull/7692#issuecomment-463667794
 
 
   Needs a rebase now that #7599 was merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256878807
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
 ##
 @@ -77,9 +77,18 @@ public TestingComponentMainThreadExecutorServiceAdapter 
getMainThreadExecutor()
 */
public static class Resource extends ExternalResource {
 
+   private long shutdownTimeoutMillis;
private TestingComponentMainThreadExecutor 
componentMainThreadTestExecutor;
private ScheduledExecutorService innerExecutorService;
 
+   public Resource() {
+   this(500L);
 
 Review comment:
   Yes, I think 500ms are (more) than enough for what the test need. 
Technically I think they could also run all run fine with 0L.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256852009
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 ##
 @@ -125,8 +146,10 @@ public void testScheduleToDeploying() {
new 
ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext(;
final SimpleSlot slot = instance.allocateSimpleSlot();
 
+   Scheduler scheduler = mock(Scheduler.class);
CompletableFuture future = new 
CompletableFuture<>();
future.complete(slot);
+   when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), any(SlotProfile.class), anyBoolean(), 
any(Time.class))).thenReturn(future);
 
 Review comment:
   Please remove mocks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256859785
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
 ##
 @@ -72,8 +71,8 @@ public void testSingleQueuedSharedSlotScheduling() throws 
Exception {
(SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
 
LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-   final SlotPoolGateway slotPoolGateway = 
slotPoolResource.getSlotPoolGateway();
-   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+   final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
+   slotPoolResource.executeInMainThreadAndJoin(() -> 
slotPool.registerTaskManager(taskManagerLocation.getResourceID()));
 
 Review comment:
   I think `executeInMainThreadAndJoin` is not needed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256864344
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ##
 @@ -730,31 +676,33 @@ public void testReleasingIdleSlotFailed() throws 
Exception {
// expected
}
 
-   } finally {
-   RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}
 
+// private  V executeInMainThreadAndJoin(Supplier supplier) throws 
Exception {
+// return CompletableFuture.supplyAsync(supplier, 
mainThreadExecutor).get();
+// }
 
 Review comment:
   Please remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256842090
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+public abstract class SlotSelectionStrategyTestBase extends TestLogger {
+
+   protected final ResourceProfile resourceProfile = new 
ResourceProfile(2, 1024);
+   protected final ResourceProfile biggerResourceProfile = new 
ResourceProfile(3, 1024);
+
+   protected final AllocationID aid1 = new AllocationID();
+   protected final AllocationID aid2 = new AllocationID();
+   protected final AllocationID aid3 = new AllocationID();
+   protected final AllocationID aid4 = new AllocationID();
+   protected final AllocationID aidX = new AllocationID();
+
+   protected final TaskManagerLocation tml1 = new TaskManagerLocation(new 
ResourceID("tm-1"), InetAddress.getLoopbackAddress(), 42);
+   protected final TaskManagerLocation tml2 = new TaskManagerLocation(new 
ResourceID("tm-2"), InetAddress.getLoopbackAddress(), 43);
+   protected final TaskManagerLocation tml3 = new TaskManagerLocation(new 
ResourceID("tm-3"), InetAddress.getLoopbackAddress(), 44);
+   protected final TaskManagerLocation tml4 = new TaskManagerLocation(new 
ResourceID("tm-4"), InetAddress.getLoopbackAddress(), 45);
+   protected final TaskManagerLocation tmlX = new TaskManagerLocation(new 
ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46);
+
+   protected final TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   protected SimpleSlotContext ssc1 = new SimpleSlotContext(aid1, tml1, 1, 
taskManagerGateway, resourceProfile);
+   protected SimpleSlotContext ssc2 = new SimpleSlotContext(aid2, tml2, 2, 
taskManagerGateway, biggerResourceProfile);
+   protected SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, 
taskManagerGateway, resourceProfile);
+   protected SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, 
taskManagerGateway, resourceProfile);
 
 Review comment:
   Can these fields be `final`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256859879
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
 ##
 @@ -102,8 +97,10 @@ protected void after() {
}
 
private void terminateSlotPool() {
-   slotPool.shutDown();
-   CompletableFuture terminationFuture = 
slotPool.getTerminationFuture();
-   terminationFuture.join();
+   slotPool.close();
+   }
+
+   public  V executeInMainThreadAndJoin(Supplier supplier) throws 
Exception {
 
 Review comment:
   I think this method should not be necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256868137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -70,9 +70,10 @@
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 
 Review comment:
   Wrong import order


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256851648
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 ##
 @@ -58,14 +69,21 @@ public void testSlotReleasedWhenScheduledImmediately() {
 
CompletableFuture future = new 
CompletableFuture<>();
future.complete(slot);
+   Scheduler scheduler = new TestingScheduler() {
+   @Override
+   public CompletableFuture 
allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   SlotProfile slotProfile,
+   boolean allowQueuedScheduling,
+   Time allocationTimeout) {
+   return future;
+   }
+   };
 
assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
// try to deploy to the slot
-   vertex.scheduleForExecution(
-   new TestingSlotProvider(ignore -> future),
-   false,
-   LocationPreferenceConstraint.ALL,
-   Collections.emptySet());
+   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
 
 Review comment:
   This can be done easier by replacing `scheduler` with `new 
TestingSlotProvider(ignored -> future)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256855501
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestingScheduler.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+public class TestingScheduler implements Scheduler {
+   @Override
+   public void start(@Nonnull ComponentMainThreadExecutor 
mainThreadExecutor) {
+
+   }
+
+   @Override
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   SlotProfile slotProfile,
+   boolean allowQueuedScheduling,
+   Time allocationTimeout) {
+   return null;
 
 Review comment:
   Since we don't need this class, it doesn't matter, but I think it would be 
better to return some sane default values which would not break the method 
contract in a testing implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256855186
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestingScheduler.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+public class TestingScheduler implements Scheduler {
 
 Review comment:
   I think we don't need this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256857264
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
 ##
 @@ -43,7 +41,7 @@
 
 
 Review comment:
   `DEFAULT_TESTING_BIG_PROFILE` is no longer needed here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256852629
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
 ##
 @@ -77,9 +77,18 @@ public TestingComponentMainThreadExecutorServiceAdapter 
getMainThreadExecutor()
 */
public static class Resource extends ExternalResource {
 
+   private long shutdownTimeoutMillis;
private TestingComponentMainThreadExecutor 
componentMainThreadTestExecutor;
private ScheduledExecutorService innerExecutorService;
 
+   public Resource() {
+   this(500L);
 
 Review comment:
   Not equivalent with the old shut down timeout. Was this change in value 
intended?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256871460
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -525,12 +525,12 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
slotProvider.allocateSlot(
 
 Review comment:
   I think callbacks on the `logicalSlotFuture` no longer need to be executed 
in the main thread executor, because the `logicalSlotFuture` should be 
completed by the main thread. This should affect line `Execution.java:548`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256867000
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
 ##
 @@ -281,13 +250,11 @@ public void testExtraSlotsAreKept() throws Exception {
final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
final TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
 
-   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+   testMainThreadExecutor.execute(() 
->pool.registerTaskManager(taskManagerLocation.getResourceID()));
 
 Review comment:
   whitespace missing after `->`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256859426
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
 ##
 @@ -52,17 +52,16 @@
 import static org.junit.Assert.fail;
 
 /**
- * Test cases for slot sharing with the {@link SlotPool}.
+ * Test cases for slot sharing with the {@link SlotPoolImpl}.
  */
 public class SlotPoolSlotSharingTest extends TestLogger {
 
@ClassRule
public static final TestingRpcServiceResource testingRpcServiceResource 
= new TestingRpcServiceResource();
 
 Review comment:
   Should no longer be needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256863219
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ##
 @@ -97,34 +93,30 @@
 
private TestingResourceManagerGateway resourceManagerGateway;
 
+   private ComponentMainThreadExecutor mainThreadExecutor =
+   
TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+
@Before
public void setUp() throws Exception {
-   this.rpcService = new TestingRpcService();
this.jobId = new JobID();
 
taskManagerLocation = new LocalTaskManagerLocation();
taskManagerGateway = new SimpleAckingTaskManagerGateway();
resourceManagerGateway = new TestingResourceManagerGateway();
}
 
-   @After
-   public void tearDown() throws Exception {
-   RpcUtils.terminateRpcService(rpcService, timeout);
-   }
-
@Test
public void testAllocateSimpleSlot() throws Exception {
CompletableFuture slotRequestFuture = new 
CompletableFuture<>();
-   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
slotRequestFuture.complete(slotRequest));
-
-   final SlotPool slotPool = new SlotPool(rpcService, jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
+   
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
 
-   try {
-   SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
-   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+   try (SlotPoolImpl newSlotPool = new SlotPoolImpl(jobId)) {
+   SlotPoolImpl slotPool = setupSlotPool(newSlotPool, 
resourceManagerGateway, mainThreadExecutor);
 
 Review comment:
   The aliasing of `newSlotPool` with `slotPool` is a bit confusing. I think we 
can change `setupSlotPool` to return `void`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256858255
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
 ##
 @@ -77,8 +76,8 @@ public void testSimpleCoLocatedSlotScheduling() throws 
ExecutionException, Inter
 
final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
 
-   final SlotPoolGateway slotPoolGateway = 
slotPoolResource.getSlotPoolGateway();
-   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+   final SlotPoolImpl slotPoolGateway = 
slotPoolResource.getSlotPool();
 
 Review comment:
   could be `final SlotPool slotPool`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256863744
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ##
 @@ -160,19 +150,18 @@ public void testAllocationFulfilledByReturnedSlot() 
throws Exception {
}
});
 
-   final SlotPool slotPool = new SlotPool(rpcService, jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
-
-   try {
-   SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
+   try (SlotPoolImpl newSlotPool = new SlotPoolImpl(jobId)) {
+   SlotPoolImpl slotPool = setupSlotPool(newSlotPool, 
resourceManagerGateway, mainThreadExecutor);
 
 Review comment:
   The same applies for the other tests I assume with aliasing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256849683
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 ##
 @@ -357,11 +355,8 @@ public void testActionsWhileCancelling() {
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, 
ExecutionState.CANCELING);
 
-   vertex.scheduleForExecution(
-   new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
-   false,
-   LocationPreferenceConstraint.ALL,
-   Collections.emptySet());
+   Scheduler scheduler = mock(Scheduler.class);
+   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
 
 Review comment:
   We could avoid the mock by using the `ProgrammedSlotProvider` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256851926
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 ##
 @@ -92,6 +110,9 @@ public void testSlotReleasedWhenScheduledQueued() {
 
final CompletableFuture future = new 
CompletableFuture<>();
 
+   Scheduler scheduler = mock(Scheduler.class);
+   when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), any(SlotProfile.class), anyBoolean(), 
any(Time.class))).thenReturn(future);
 
 Review comment:
   Please remove mocks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256864748
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ##
 @@ -805,53 +750,61 @@ public void testFreeFailedSlots() throws Exception {
 */
@Test
public void testFailingAllocationFailsPendingSlotRequests() throws 
Exception {
-   final SlotPool slotPool = new SlotPool(rpcService, jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
 
-   try {
+   try (SlotPoolImpl newSlotPool = new SlotPoolImpl(jobId)) {
final CompletableFuture 
allocationIdFuture = new CompletableFuture<>();

resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
-   final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
 
-   final CompletableFuture slotFuture = 
allocateSlot(slotPoolGateway, new SlotRequestId());
+   SlotPool slotPool = setupSlotPool(newSlotPool, 
resourceManagerGateway, mainThreadExecutor);
+   Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+
+   final CompletableFuture slotFuture = 
allocateSlot(scheduler, new SlotRequestId());
 
final AllocationID allocationId = 
allocationIdFuture.get();
 
assertThat(slotFuture.isDone(), is(false));
 
final FlinkException cause = new FlinkException("Fail 
pending slot request failure.");
-   final 
CompletableFuture> responseFuture = 
slotPoolGateway.failAllocation(allocationId, cause);
+   final Optional responseFuture = 
slotPool.failAllocation(allocationId, cause);
 
-   assertThat(responseFuture.get().isPresent(), is(false));
+   assertThat(responseFuture.isPresent(), is(false));
 
try {
slotFuture.get();
fail("Expected a slot allocation failure.");
} catch (ExecutionException ee) {

assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause));
}
-   } finally {
-   RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}
 
-   private CompletableFuture allocateSlot(SlotPoolGateway 
slotPoolGateway, SlotRequestId slotRequestId) {
-   return slotPoolGateway.allocateSlot(
+   private CompletableFuture allocateSlot(Scheduler 
scheduler, SlotRequestId slotRequestId) {
+   return scheduler.allocateSlot(
slotRequestId,
new DummyScheduledUnit(),
SlotProfile.noRequirements(),
true,
timeout);
}
 
-   private static SlotPoolGateway setupSlotPool(
-   SlotPool slotPool,
-   ResourceManagerGateway resourceManagerGateway) throws 
Exception {
+   private static SlotPoolImpl setupSlotPool(
 
 Review comment:
   Let's change it to `void` in order to solve the aliasing problem in the 
tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-14 Thread GitBox
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r256860858
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ##
 @@ -1,531 +1,531 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.slotpool;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test cases for the {@link SlotSharingManager}.
- */
-public class SlotSharingManagerTest extends TestLogger {
-
-   private static final SlotSharingGroupId SLOT_SHARING_GROUP_ID = new 
SlotSharingGroupId();
-
-   private static final DummySlotOwner SLOT_OWNER = new DummySlotOwner();
-
-   @Test
-   public void testRootSlotCreation() {
-   final TestingAllocatedSlotActions allocatedSlotActions = new 
TestingAllocatedSlotActions();
-
-   final SlotSharingManager slotSharingManager = new 
SlotSharingManager(
-   SLOT_SHARING_GROUP_ID,
-   allocatedSlotActions,
-   SLOT_OWNER);
-
-   SlotRequestId slotRequestId = new SlotRequestId();
-   SlotRequestId allocatedSlotRequestId = new SlotRequestId();
-
-   final SlotSharingManager.MultiTaskSlot multiTaskSlot = 
slotSharingManager.createRootSlot(
-   slotRequestId,
-   new CompletableFuture<>(),
-   allocatedSlotRequestId);
-
-   assertEquals(slotRequestId, multiTaskSlot.getSlotRequestId());
-   assertNotNull(slotSharingManager.getTaskSlot(slotRequestId));
-   }
-
-   @Test
-   public void testRootSlotRelease() throws ExecutionException, 
InterruptedException {
-   final CompletableFuture slotReleasedFuture = new 
CompletableFuture<>();
-   final TestingAllocatedSlotActions allocatedSlotActions = new 
TestingAllocatedSlotActions();
-
-   allocatedSlotActions.setReleaseSlotConsumer(
-   tuple3 -> slotReleasedFuture.complete(tuple3.f0));
-
-   final SlotSharingManager slotSharingManager = new 
SlotSharingManager(
-   SLOT_SHARING_GROUP_ID,
-   allocatedSlotActions,
-   SLOT_OWNER);
-
-   SlotRequestId slotRequestId = new SlotRequestId();
-   SlotRequestId allocatedSlotRequestId = new SlotRequestId();
-
-   CompletableFuture slotContextFuture = new 
CompletableFuture<>();
-
-   SlotSharingManager.MultiTaskSlot rootSlot = 
slotSharingManager.createRootSlot(
-   slotRequestId,
-   slotContextFuture,
-  

[GitHub] flinkbot commented on issue #7708: [WIP][FLINK-10569][tests] Clean up uses of Scheduler and Instance in valid tests(Batch 2)

2019-02-14 Thread GitBox
flinkbot commented on issue #7708: [WIP][FLINK-10569][tests] Clean up uses of 
Scheduler and Instance in valid tests(Batch 2)
URL: https://github.com/apache/flink/pull/7708#issuecomment-463659717
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun opened a new pull request #7708: [FLINK-10569][tests] Clean up uses of Scheduler and Instance in valid tests(Batch 2)

2019-02-14 Thread GitBox
TisonKun opened a new pull request #7708: [FLINK-10569][tests] Clean up uses of 
Scheduler and Instance in valid tests(Batch 2)
URL: https://github.com/apache/flink/pull/7708
 
 
   ## What is the purpose of the change
   
   1. Remove `Instance` usage in ExecutionGraphDeploymentTest
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector:(no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable) 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11373) CliFrontend cuts off reason for error messages

2019-02-14 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768320#comment-16768320
 ] 

金钟镛 commented on FLINK-11373:
-

Hello,is anyone still working on this issue?If not,do you mind assigning to me?

> CliFrontend cuts off reason for error messages
> --
>
> Key: FLINK-11373
> URL: https://issues.apache.org/jira/browse/FLINK-11373
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Maximilian Michels
>Assignee: leesf
>Priority: Minor
>  Labels: pull-request-available, starter
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The CliFrontend seems to only print the first message in the strace trace and 
> not any of its causes.
> {noformat}
> bin/flink run /non-existing/path
> Could not build the program from JAR file.
> Use the help option (-h or --help) to get help on the command.
> {noformat}
> Notice, the underlying cause of this message is FileNotFoundException.
> Consider changing 
> a) the error message for this particular case 
> b) the way the stack trace messages are trimmed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading

2019-02-14 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768330#comment-16768330
 ] 

Jamie Grier commented on FLINK-11617:
-

Here's an example:

 

Stacktrace is:

{{java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3 
retry attempts returned ProvisionedThroughputExceededException.}}

{{  at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:234)}}

{{  at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:373)}}

{{  at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:216)}}

{{  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}

{{  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}

{{  at java.lang.Thread.run(Thread.java:748)}}

 

But the root cause is actually given by this log line:

{{Got recoverable SdkClientException. Backing off for 140 millis (null 
(Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request 
ID: c49c8e5b-a068-9733-9043-b215d51b0aa1))}}

 

 

> Kinesis Connector getRecords() failure logging is misleading
> 
>
> Key: FLINK-11617
> URL: https://issues.apache.org/jira/browse/FLINK-11617
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There isn't enough information in the current logging to diagnose a 
> getRecords() failure.  Also there is a hardcoded string that states the 
> failure cause was always ProvisionedThroughputExceededException which isn't 
> true.  There are many possible causes of failures.  This is misleading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter merged pull request #7599: [FLINK-11442] upgrade oss sdk version

2019-02-14 Thread GitBox
StefanRRichter merged pull request #7599: [FLINK-11442] upgrade oss sdk version
URL: https://github.com/apache/flink/pull/7599
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11616) Flink official document has an error

2019-02-14 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768307#comment-16768307
 ] 

Fabian Hueske commented on FLINK-11616:
---

You would need to provide a pull request.
Once it got merged you'll be the author of a commit in Flink.

> Flink official document has an error
> 
>
> Key: FLINK-11616
> URL: https://issues.apache.org/jira/browse/FLINK-11616
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: xulinjie
>Assignee: xulinjie
>Priority: Major
> Attachments: wx20190214-214...@2x.png
>
>
> The page url is 
> [https://ci.apache.org/projects/flink/flink-docs-master/tutorials/flink_on_windows.html]
> The mistake is in paragraph “Installing Flink from Git”.
> “The solution is to adjust the Cygwin settings to deal with the correct line 
> endings by following these three steps:”,
> The sequence of steps you wrote was "1, 2, 1".But I think you might want to 
> write "1, 2, 3".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10241) Reduce performance/stability impact of latency metrics

2019-02-14 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-10241:

Fix Version/s: (was: 1.6.4)
   1.6.5

> Reduce performance/stability impact of latency metrics
> --
>
> Key: FLINK-10241
> URL: https://issues.apache.org/jira/browse/FLINK-10241
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.8.0, 1.6.5
>
>
> Umbrella issue for performance/stability improvements around the latency 
> metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10954) Hardlink from files of previous local stored state might cross devices

2019-02-14 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-10954:

Fix Version/s: (was: 1.6.4)
   1.6.5

> Hardlink from files of previous local stored state might cross devices
> --
>
> Key: FLINK-10954
> URL: https://issues.apache.org/jira/browse/FLINK-10954
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.7.3, 1.8.0, 1.6.5
>
>
> Currently, local recovery's base directories is initialized from 
> '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not 
> set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', 
> which might consist of directories from different devices, such as 
> /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB 
> is initialized from IOManager's spillingDirectories, which might located in 
> different device from local recovery's folder. However, hard-link between 
> different devices is not allowed, it will throw exception below:
> {code:java}
> java.nio.file.FileSystemException: target -> souce: Invalid cross-device link
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >