[GitHub] yanghua commented on issue #7571: [FLINK-10724] Refactor failure handling in check point coordinator
yanghua commented on issue #7571: [FLINK-10724] Refactor failure handling in check point coordinator URL: https://github.com/apache/flink/pull/7571#issuecomment-463938350 Hi @kl0u can you help review this PR? Or @tillrohrmann Can you recommend a committer to help review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator
yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator URL: https://github.com/apache/flink/pull/7571#discussion_r257126465 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1277,6 +1293,34 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th } } + private void tryHandleCheckpointException(DeclineCheckpoint message, PendingCheckpoint currentCheckpoint) { + boolean needFailJob = message.getReason() != null && !(message.getReason() instanceof CheckpointDeclineException) + && failOnCheckpointingErrors && !currentCheckpoint.getProps().isSavepoint(); + + if (needFailJob) { + boolean processed = false; + ExecutionAttemptID failedExecutionID = message.getTaskExecutionId(); + for (ExecutionVertex vertex : tasksToWaitFor) { + if (vertex.getCurrentExecutionAttempt().getAttemptId().equals(failedExecutionID)) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(true); + currentPeriodicTrigger = null; + } + + vertex.fail(message.getReason()); Review comment: We can't make `Execution` or `ExecutionGraph` fail in `CheckpointCoordinator`, so we can throw out the decision. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator
yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator URL: https://github.com/apache/flink/pull/7571#discussion_r257125937 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ## @@ -308,8 +309,12 @@ public void apply( env.execute("Tumbling Window Test"); } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + if (e.getCause().getCause() instanceof CancellationException) { Review comment: There is a concurrent cancellation exception, which is initially suspected to be caused by a mini cluster (multithreaded analog flink cluster) executing a job. So, I am currently ignoring this exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10819) The instability problem of CI, JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test fail.
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769016#comment-16769016 ] Congxian Qiu commented on FLINK-10819: -- Another instance: https://travis-ci.org/klion26/flink/jobs/493604011 > The instability problem of CI, > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test > fail. > --- > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11618) [state] Refactor operator state repartition mechanism
[ https://issues.apache.org/jira/browse/FLINK-11618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11618: --- Labels: pull-request-available (was: ) > [state] Refactor operator state repartition mechanism > - > > Key: FLINK-11618 > URL: https://issues.apache.org/jira/browse/FLINK-11618 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently we have state assignment strategy of operator state below: > * When parallelism not changed: > ** If we only have even-split redistributed state, state assignment would > try to keep as the same as previously (actually not always the same). > ** If we have union redistributed state, all the operator state would be > redistributed as the new state assignment. > * When parallelism changed: > ** all the operator state would be redistributed as the new state assignment. > There existed two problems *when parallelism not changed*: > # If we only have even-split redistributed state, current implementation > actually cannot ensure state assignment to keep as the same as previously. > This is because current > {{StateAssignmentOperation#collectPartitionableStates}} would repartition > {{managedOperatorStates}} without subtask-index information. Take an example, > if we have a operator-state with parallelism as 2, and subtask-0's > managed-operatorstate is empty while subtask-1 not. Although new parallelism > still keeps as 2, after > {{StateAssignmentOperation#collectPartitionableStates}} and state assigned, > subtask-0 would be assigned the managed-operatorstate while subtask-1 got > none. > # We should only redistribute union state and not touch the even-split > state. Redistribute even-split state would cause unexpected behavior after > {{RestartPipelinedRegionStrategy}} supported to restore state. > We should fix the above two problems and this issue is a prerequisite of > FLINK-10712 and FLINK-10713 . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7711: [FLINK-11618][state] Refactor operator state repartition mechanism
flinkbot commented on issue #7711: [FLINK-11618][state] Refactor operator state repartition mechanism URL: https://github.com/apache/flink/pull/7711#issuecomment-463928531 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka opened a new pull request #7711: [FLINK-11618][state] Refactor operator state repartition mechanism
Myasuka opened a new pull request #7711: [FLINK-11618][state] Refactor operator state repartition mechanism URL: https://github.com/apache/flink/pull/7711 ## What is the purpose of the change There existed two problems during state assignment when parallelism not changed: 1. If we only have even-split redistributed state, current implementation actually cannot ensure state assignment to keep as the same as previously. This is because current `StateAssignmentOperation#collectPartitionableStates` would repartition managedOperatorStates without subtask-index information. Take an example, if we have a operator-state with parallelism as 2, and subtask-0's managed-operatorstate is empty while subtask-1 not. Although new parallelism still keeps as 2, after `StateAssignmentOperation#collectPartitionableStates` and state assigned, subtask-0 would be assigned the managed-operatorstate while subtask-1 got none. 1. We should only redistribute union state and not touch the even-split state. Redistribute even-split state would cause unexpected behavior after `RestartPipelinedRegionStrategy` supported to restore state. This PR will fix the above two problems. ## Brief change log - Refactor `StateAssignmentOperation#collectPartitionableStates` to let returned `managedOperatorStates` has subtask-index information. - Refactor API of `OperatorStateRepartitioner` and its implementation `RoundRobinOperatorStateRepartitioner`. The logical of comparing old and new parallelism is now located in `OperatorStateRepartitioner#repartitionState`. - Refactor `AbstractStreamOperatorTestHarness` to extract the logic of repartitioning operator state into a new method `AbstractStreamOperatorTestHarness#repartitionOperatorState` instead of previously contained in `AbstractStreamOperatorTestHarness#initializeState`. ## Verifying this change This change added tests and can be verified as follows: - Added `StateAssignmentOperationTest` to verify the logic of `StateAssignmentOperation#reDistributePartitionableStates`. - Existed all tests using `AbstractStreamOperatorTestHarness` could verify the refactor of `StateAssignmentOperation` generate the expected results as before. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, this could affect state assignment when restoring checkpoint - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable, this is a internal implementation, current doc should be enough. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10937) Add scripts to create docker image for k8s
[ https://issues.apache.org/jira/browse/FLINK-10937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10937: --- Assignee: Chunhui Shi (was: JIN SUN) > Add scripts to create docker image for k8s > -- > > Key: FLINK-10937 > URL: https://issues.apache.org/jira/browse/FLINK-10937 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > Add script to build docker image for flink on native k8s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10936) Implement Command line tools
[ https://issues.apache.org/jira/browse/FLINK-10936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10936: --- Assignee: Chunhui Shi (was: JIN SUN) > Implement Command line tools > > > Key: FLINK-10936 > URL: https://issues.apache.org/jira/browse/FLINK-10936 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > Implement command tools to start kubernetes sessions: > * k8s-session.sh to start and stop a session like we did in yarn-session.sh > * customized command line that will be invoked by CliFrontEnd and > ./bin/flink run to submit job to kubernetes cluster -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10935) Implement KubeClient with Faric8 Kubernetes clients
[ https://issues.apache.org/jira/browse/FLINK-10935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10935: --- Assignee: Chunhui Shi (was: JIN SUN) > Implement KubeClient with Faric8 Kubernetes clients > > > Key: FLINK-10935 > URL: https://issues.apache.org/jira/browse/FLINK-10935 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > Implement KubeClient with Faric8 Kubernetes clients and add tests -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9495) Implement ResourceManager for Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-9495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-9495: -- Assignee: Chunhui Shi (was: JIN SUN) > Implement ResourceManager for Kubernetes > > > Key: FLINK-9495 > URL: https://issues.apache.org/jira/browse/FLINK-9495 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Elias Levy >Assignee: Chunhui Shi >Priority: Major > > I noticed there is no issue for developing a Kubernetes specific > ResourceManager under FLIP-6, so I am creating this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10934) Implement KubernetesJobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-10934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10934: --- Assignee: Chunhui Shi (was: JIN SUN) > Implement KubernetesJobClusterEntrypoint > > > Key: FLINK-10934 > URL: https://issues.apache.org/jira/browse/FLINK-10934 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > * Implement KubernetesJobClusterEntrypoint -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10939) Add documents for Flink on native k8s
[ https://issues.apache.org/jira/browse/FLINK-10939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10939: --- Assignee: Chunhui Shi (was: JIN SUN) > Add documents for Flink on native k8s > - > > Key: FLINK-10939 > URL: https://issues.apache.org/jira/browse/FLINK-10939 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10938) Enable Flink on native k8s E2E Tests in Travis CI
[ https://issues.apache.org/jira/browse/FLINK-10938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10938: --- Assignee: Chunhui Shi (was: JIN SUN) > Enable Flink on native k8s E2E Tests in Travis CI > -- > > Key: FLINK-10938 > URL: https://issues.apache.org/jira/browse/FLINK-10938 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > Add E2E tests to verify Flink on K8s integration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10968) Implement TaskManager Entrypoint
[ https://issues.apache.org/jira/browse/FLINK-10968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10968: --- Assignee: Chunhui Shi (was: JIN SUN) > Implement TaskManager Entrypoint > > > Key: FLINK-10968 > URL: https://issues.apache.org/jira/browse/FLINK-10968 > Project: Flink > Issue Type: Sub-task >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > implement the main() entrypoint to start task manager pod. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10933) Implement KubernetesSessionClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-10933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10933: --- Assignee: Chunhui Shi (was: JIN SUN) > Implement KubernetesSessionClusterEntrypoint > > > Key: FLINK-10933 > URL: https://issues.apache.org/jira/browse/FLINK-10933 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > > * Implement KubernetesSessionClusterEntrypoint > * Implement TaskManager Entrypoint -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes.
sunjincheng121 commented on a change in pull request #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes. URL: https://github.com/apache/flink/pull/7664#discussion_r255337933 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ## @@ -414,15 +414,15 @@ abstract class BatchTableEnvironment( * @tparam T The type of the [[DataSet]]. */ protected def registerDataSetInternal[T]( - name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = { + name: String, dataSet: DataSet[T], fields: Array[PlannerExpression]): Unit = { Review comment: The `XXXInternal` methods are internal methods that the user does not perceive, so IMO there is no need to become an Expression? ( just reduce the change code for current PR?) So far I have not found the benefits of PlannerExpression to inherit Expression(Except reduce the some change code), because we already have the visitor mode to establish the conversion bridge from Expression to PlannerExpression. Expression is used at the user API level, and Expression is converted to PlannerExpression at the implementation level. We can indeed inherit PlannerExpression as Expression. Can you elaborate on the benefits of doing this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes.
sunjincheng121 commented on a change in pull request #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes. URL: https://github.com/apache/flink/pull/7664#discussion_r255365157 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/UnresolvedFieldReference.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +/** + * Unresolved field reference expression. + */ +public class UnresolvedFieldReference extends LeafExpression { Review comment: How about call it `FieldReference` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-9955) Kubernetes ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-9955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-9955: -- Assignee: Chunhui Shi (was: JIN SUN) > Kubernetes ClusterDescriptor > > > Key: FLINK-9955 > URL: https://issues.apache.org/jira/browse/FLINK-9955 > Project: Flink > Issue Type: Sub-task > Components: Client >Reporter: Till Rohrmann >Assignee: Chunhui Shi >Priority: Major > Fix For: 1.8.0 > > > In order to start programmatically a Flink cluster on Kubernetes, we need a > {{KubernetesClusterDescriptor}} implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9953) Active Kubernetes integration
[ https://issues.apache.org/jira/browse/FLINK-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-9953: -- Assignee: Chunhui Shi (was: JIN SUN) > Active Kubernetes integration > - > > Key: FLINK-9953 > URL: https://issues.apache.org/jira/browse/FLINK-9953 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, ResourceManager >Reporter: Till Rohrmann >Assignee: Chunhui Shi >Priority: Major > Fix For: 1.8.0 > > > This is the umbrella issue tracking Flink's active Kubernetes integration. > Active means in this context that the {{ResourceManager}} can talk to > Kubernetes to launch new pods similar to Flink's Yarn and Mesos integration. > > Document can be found here: > [https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit?usp=sharing] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10932) Initial flink-kubernetes module with empty implementation
[ https://issues.apache.org/jira/browse/FLINK-10932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi reassigned FLINK-10932: --- Assignee: Chunhui Shi (was: JIN SUN) > Initial flink-kubernetes module with empty implementation > - > > Key: FLINK-10932 > URL: https://issues.apache.org/jira/browse/FLINK-10932 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > Labels: pull-request-available > > Initial the skeleton module to start native kubernetes integration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11619) Make ScheduleMode configurable via user code or configuration file
yuqi created FLINK-11619: Summary: Make ScheduleMode configurable via user code or configuration file Key: FLINK-11619 URL: https://issues.apache.org/jira/browse/FLINK-11619 Project: Flink Issue Type: Improvement Reporter: yuqi Assignee: yuqi Currently, Schedule mode for stream job is always see StreamingJobGraphGenerator#createJobGraph {code:java} // make sure that all vertices start immediately jobGraph.setScheduleMode(ScheduleMode.EAGER); {code} on this point, we can make ScheduleMode configurable to user so as to adapt different environment. Users can set this option via env.setScheduleMode() in code, or make it optional in configuration. Anyone's help and suggestions is welcomed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11334) Migrate enum serializers to use new serialization compatibility abstractions
[ https://issues.apache.org/jira/browse/FLINK-11334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11334: --- Labels: pull-request-available (was: ) > Migrate enum serializers to use new serialization compatibility abstractions > > > Key: FLINK-11334 > URL: https://issues.apache.org/jira/browse/FLINK-11334 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: boshu Zheng >Priority: Major > Labels: pull-request-available > > This subtask covers migration of: > * EnumSerializerConfigSnapshot > * ScalaEnumSerializerConfigSnapshot > to use the new serialization compatibility APIs ({{TypeSerializerSnapshot}} > and {{TypeSerializerSchemaCompatibility). > The enum serializer snapshots should be implemented so that on restore the > order of Enum constants can be reordered (a case for serializer > reconfiguration), as well as adding new Enum constants. > Serializers are only considered to have completed migration according to the > defined list of things to check in FLINK-11327. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7710: [FLINK-11334][core] Migrate enum serializers to use new serialization compatibility abstractions
flinkbot commented on issue #7710: [FLINK-11334][core] Migrate enum serializers to use new serialization compatibility abstractions URL: https://github.com/apache/flink/pull/7710#issuecomment-463891813 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 opened a new pull request #7710: [FLINK-11334][core] Migrate enum serializers to use new serialization compatibility abstractions
klion26 opened a new pull request #7710: [FLINK-11334][core] Migrate enum serializers to use new serialization compatibility abstractions URL: https://github.com/apache/flink/pull/7710 ## What is the purpose of the change Migrate enum serializers to use new serialization compatibility abstractions ## Brief change log This commit 097a3b9 touches EnumSerializer: - replace an `EnumSerializerSnapshot` when calling `EnumSerializer#snapshotConfiguration`. - add a test `EnumSerializerSnapshotMigrationTest` to varify the change. - remove function `EnumSerializer#ensureCompatibility`. ## Verifying this change This change is already covered by existing tests, such as `EnumSerializerTest`. and added a migration test `EnumSerializerSnapshotMigrationTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**yes**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung edited a comment on issue #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes.
KurtYoung edited a comment on issue #7664: [FLINK-11449][table] Uncouple the Expression class from RexNodes. URL: https://github.com/apache/flink/pull/7664#issuecomment-463553997 Thanks @sunjincheng121 for this great effort. I just quickly go through the changes, and have some design comments: 1. It appears to me that i can't tell the differences between `Expression` and `PlannerExpression`. What are the scenarios for both expressions? Take `FilterableTableSource` for example, currently it takes a list of `Expression` for input. But clearly the method `applyPredicate` happens during optimize phase, my feeling is we should use `PlannerExpression` during optimize. (I know `FilterableTableSource` is API and should only accept `Expression` which is also an API, but the logic seems a little weird). 2. What about we just use current `Expression` for API, by getting rid of the `toRexNode` method. The only problem is the number of expressions is quite big and keeps increasing. To solve this, we could follow the advices @twalthr had suggested, we categorize all expressions we currently have, and squash some all them. For example, we can squash "IF", "ABS" into `CallExpression`, and use `FunctionDefinition` to represent details information. Quite similar with Calcite's `RexNode` and `SqlOperator`. And for `ExpressionVisitor`, we can directly mapping `Expression` to `RexNode`, which seems to be much clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r257088835 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackendBuilder.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.util.TernaryBoolean; + +import java.util.Collection; + +public class OperatorStateBackendBuilder implements StateBackendBuilder { Review comment: Ok, will focus on the rocksdb backend refactor here and maybe we could use the builder way for all backends in other PRs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r257088134 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackendBuilder.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.util.TernaryBoolean; + +import java.util.Collection; + +public class OperatorStateBackendBuilder implements StateBackendBuilder { + private final Environment env; + private final String operatorIdentifier; + protected Collection restoredState; + private TernaryBoolean asynchronousSnapshots; + + public OperatorStateBackendBuilder(Environment env, + String operatorIdentifier) { + this.env = env; + this.operatorIdentifier = operatorIdentifier; + } + + @Override + public OperatorStateBackend build() { + return new DefaultOperatorStateBackend( + env.getUserClassLoader(), + env.getExecutionConfig(), + isUsingAsynchronousSnapshots()); + } + + @Override + public OperatorStateBackendBuilder setRestoreStateHandles(Collection restoredStateHandles) { Review comment: The `setRestoreStateHandles` method has been removed from the builder interface following previous review suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise merged pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions
tweise merged pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions URL: https://github.com/apache/flink/pull/7672 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions
flinkbot edited a comment on issue #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions URL: https://github.com/apache/flink/pull/7672#issuecomment-461938418 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tweise [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tweise [committer] * ❔ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @tweise [committer] * ✅ 5. Overall code [quality] is good. - Approved by @tweise [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on issue #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions
tweise commented on issue #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions URL: https://github.com/apache/flink/pull/7672#issuecomment-463882250 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions
tweise commented on a change in pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions URL: https://github.com/apache/flink/pull/7672#discussion_r257085977 ## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/AlwaysThrowsDeserializationSchema.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.testutils; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.core.testutils.OneShotLatch; + +import java.io.IOException; + +/** + * A DeserializationSchema which always throws an exception when the deserialize method is called. Also supports + * waiting on a latch until at least one exception has been thrown. + */ +public class AlwaysThrowsDeserializationSchema implements DeserializationSchema { Review comment: This class could have probably also been defined nested in the test and some of the boilerplate avoided by extending an existing schema with override for what is relevant here. Just something to keep in mind for future work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11613) Translate the "Project Template for Scala" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Goong reassigned FLINK-11613: - Assignee: Tom Goong > Translate the "Project Template for Scala" page into Chinese > > > Key: FLINK-11613 > URL: https://issues.apache.org/jira/browse/FLINK-11613 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Tom Goong >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/scala_api_quickstart.html > The markdown file is located in > flink/docs/dev/projectsetup/scala_api_quickstart.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599#issuecomment-463874242 Thanks @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11373) CliFrontend cuts off reason for error messages
[ https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768840#comment-16768840 ] leesf commented on FLINK-11373: --- [~AT-Fieldless], hi, thanks for your advise, but i am still working on this issue. > CliFrontend cuts off reason for error messages > -- > > Key: FLINK-11373 > URL: https://issues.apache.org/jira/browse/FLINK-11373 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Maximilian Michels >Assignee: leesf >Priority: Minor > Labels: pull-request-available, starter > Time Spent: 10m > Remaining Estimate: 0h > > The CliFrontend seems to only print the first message in the strace trace and > not any of its causes. > {noformat} > bin/flink run /non-existing/path > Could not build the program from JAR file. > Use the help option (-h or --help) to get help on the command. > {noformat} > Notice, the underlying cause of this message is FileNotFoundException. > Consider changing > a) the error message for this particular case > b) the way the stack trace messages are trimmed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11613) Translate the "Project Template for Scala" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf reassigned FLINK-11613: - Assignee: (was: leesf) > Translate the "Project Template for Scala" page into Chinese > > > Key: FLINK-11613 > URL: https://issues.apache.org/jira/browse/FLINK-11613 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/scala_api_quickstart.html > The markdown file is located in > flink/docs/dev/projectsetup/scala_api_quickstart.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257072070 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ## @@ -311,4 +323,19 @@ private static void setDeserializer(Properties props) { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); } + + + // + // Rate Limiter specific methods + // + + /** +* Set a rate limiter to ratelimit bytes read from Kafka. +* @param kafkaRateLimiter +* @param maxBytesPerSecond +*/ + public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter, long maxBytesPerSecond) { Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257072024 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/DefaultKafkaRateLimiter.java ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.api.common.functions.RuntimeContext; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; + +/** + * A default KafkaRateLimiter that uses Guava's RateLimiter to throttle the bytes read from Kafka. + */ +public class DefaultKafkaRateLimiter implements FlinkConnectorRateLimiter { Review comment: I think that is a better name. Thanks for the suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257072051 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FlinkConnectorRateLimiter.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * An interface to create a ratelimiter + * + * The ratelimiter is configured via {@link #setRate(long)} and + * created via {@link #create()}. + * An example implementation can be found {@link DefaultKafkaRateLimiter}. + * */ + +@PublicEvolving +public interface FlinkConnectorRateLimiter { + + void open(RuntimeContext runtimeContext); + + /** Creates a rate limiter. */ +T create(); Review comment: This does look confusing. Essentially, we need `runtimeContext` to be initialized to be able to determine the per-subtask rate. Perhaps, only the `open()` method should suffice (and all related logic can be implemented there)? As for `acquire()` not being here, that was my bad. I have fixed this to include `acquire()` and completely removed the Guava import in the `KafkaConsumerThread` class (which was not the case before). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7706: [FLINK-11617] [kinesis connector] Kinesis Connector getRecords() failure logging is misleading
tweise commented on a change in pull request #7706: [FLINK-11617] [kinesis connector] Kinesis Connector getRecords() failure logging is misleading URL: https://github.com/apache/flink/pull/7706#discussion_r257053063 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -262,8 +262,8 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th } if (getRecordsResult == null) { - throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxRetries + - " retry attempts returned ProvisionedThroughputExceededException."); + throw new RuntimeException("Retries exceeded for getRecords operation - all " + getRecordsMaxRetries + Review comment: Please also fix same issue further down in `getShardIterator` (https://github.com/apache/flink/pull/7706/files#diff-ed02b5340df65de06c19eb93fe90a920L340) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257046152 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/DefaultKafkaRateLimiter.java ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.api.common.functions.RuntimeContext; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; + +/** + * A default KafkaRateLimiter that uses Guava's RateLimiter to throttle the bytes read from Kafka. + */ +public class DefaultKafkaRateLimiter implements FlinkConnectorRateLimiter { Review comment: Since there is nothing Kafka specific here, but the implementation is specific to Guava, would `GuavaFlinkConnectorRateLimiter` be a better name? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257047597 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FlinkConnectorRateLimiter.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * An interface to create a ratelimiter + * + * The ratelimiter is configured via {@link #setRate(long)} and + * created via {@link #create()}. + * An example implementation can be found {@link DefaultKafkaRateLimiter}. + * */ + +@PublicEvolving +public interface FlinkConnectorRateLimiter { + + void open(RuntimeContext runtimeContext); + + /** Creates a rate limiter. */ +T create(); Review comment: This is confusing, the rate limiter was already created by the user. Why is `acquire` not defined here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
tweise commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257045352 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ## @@ -311,4 +323,19 @@ private static void setDeserializer(Properties props) { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); } + + + // + // Rate Limiter specific methods + // + + /** +* Set a rate limiter to ratelimit bytes read from Kafka. +* @param kafkaRateLimiter +* @param maxBytesPerSecond +*/ + public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter, long maxBytesPerSecond) { Review comment: Why the second parameter? That should be a setter on `DefaultKafkaRateLimiter` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 commented on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 commented on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#issuecomment-463830228 @tweise @becketqin I updated the code to include a `FlinkConnectorRateLimiter` and also included a `DefaultKafkaRateLimiter` that implements the interface using Guava's RateLimiter. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257040349 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; + +import java.util.Properties; + +/** + * A RateLimiterFactory that configures and creates a rate limiter. + */ +public class RateLimiterFactory { + + /** Flag that indicates if ratelimiting is enabled. */ + private static final String RATELIMITING_FLAG = "consumer.ratelimiting.enabled"; Review comment: I ended up passing the rate value to the setter that sets the ratelimiter on the consumer. Let me know if that looks alright. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r257040325 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; + +import java.util.Properties; + +/** + * A RateLimiterFactory that configures and creates a rate limiter. + */ +public class RateLimiterFactory { Review comment: I have not made this change yet. Perhaps we could do this as a part of augmenting the consumer and making the code more customizable (in another PR i.e.) ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r257039386 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,90 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.8-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.70.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided Review comment: Done, I rebased the our commits on upstream master This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r257039169 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -562,6 +578,43 @@ under the License. + + + PubSub + package + + shade + + + false + true + false + + + org.apache.flink.streaming.examples.pubsub.PubSubExample + + + PubSub + + + org.apache.flink:flink-connector-pubsub_${scala.binary.version} + + org/apache/flink/streaming/connectors/pubsub/** Review comment: - Fixed the javadoc comment. - I've changed where the example is stored (flink-examples/flink-examples-build-helper/flink-examples-streaming-pubsub/), could you have a look at this again? Indentation is fixed through this change as well. - I've removed `SerializableCredentialsProvider`. The reason I added it is because several PubSub classes require a `CredentialsProvider` rather than `Credentials`. But I agree we might as well rewrap Credentials using Google's `FixedCredentialsProvider` (which is bascially the `SerializableCredentialsProvider`). Good one! **Regarding the BoundedPubSubSource:** The idea was this would be useful when running integration tests for 'end users'. Hence it's not in test scope. Now that we've been using this code ourselves for a while now, in our projects we prefer using the 'normal' PubSubSource and start/restart the job should it be needed for tests. Although I do like the concept of being able to `Bound` SourceFunctions like this (either a max received messages or idle time). I would propose I remove the `BoundedPubSubSource` all together, this allows us to simplify the Builder inside of the PubSubSource as well and make everything a bit cleaner. What do you think? @zentol @rmetzger This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base
GJL commented on issue #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base URL: https://github.com/apache/flink/pull/7697#issuecomment-463793079 Thanks for the review... merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base
GJL commented on a change in pull request #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base URL: https://github.com/apache/flink/pull/7697#discussion_r257008935 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java ## @@ -39,277 +32,200 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; +import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Simple back pressured task test. + * + * @see BackPressureStatsTrackerImpl */ public class BackPressureStatsTrackerImplITCase extends TestLogger { - private static NetworkBufferPool networkBufferPool; - private static ActorSystem testActorSystem; + private static final int TIMEOUT_SECONDS = 10; Review comment: Will change to long. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11597) Remove legacy JobManagerActorTestUtils
[ https://issues.apache.org/jira/browse/FLINK-11597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-11597: - Description: Remove legacy JobManagerActorTestUtils > Remove legacy JobManagerActorTestUtils > -- > > Key: FLINK-11597 > URL: https://issues.apache.org/jira/browse/FLINK-11597 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.8.0 > > > Remove legacy JobManagerActorTestUtils -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7709: [FLINK-11577][tests] Replace StackTraceSampleCoordinatorITCase
flinkbot commented on issue #7709: [FLINK-11577][tests] Replace StackTraceSampleCoordinatorITCase URL: https://github.com/apache/flink/pull/7709#issuecomment-463787310 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL opened a new pull request #7709: [FLINK-11577][tests] Replace StackTraceSampleCoordinatorITCase
GJL opened a new pull request #7709: [FLINK-11577][tests] Replace StackTraceSampleCoordinatorITCase URL: https://github.com/apache/flink/pull/7709 ## What is the purpose of the change *This removes the StackTraceSampleCoordinatorITCase test, and replaces it with `StackTraceSampleServiceTest#testShouldReturnPartialResultIfTaskStopsRunningDuringSampling`* ## Brief change log - *See commits* ## Verifying this change This change added tests and can be verified as follows: - *Added unit test `StackTraceSampleServiceTest`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Reopened] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-10457: -- > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.7.0 >Reporter: Jihyun Cho >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > Time Spent: 10m > Remaining Estimate: 0h > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-11487) Support for writing data to Apache Flume
[ https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-11487: -- > Support for writing data to Apache Flume > > > Key: FLINK-11487 > URL: https://issues.apache.org/jira/browse/FLINK-11487 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.7.1 > Environment: JDK 1.8 > Scala 2.11 > Flink 1.7.1 > Apache Flume 1.6.0 >Reporter: ambition >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Flume is a distributed, reliable, and available service for efficiently > collecting, aggregating, and moving large amounts of data, has many users. > Unfortunately, Flink does not currently support with data to Flume. > The following is the official website of flume and github source address: > [Apache Flume website|http://flume.apache.org/index.html] > [Apache Flume github|https://github.com/apache/flume] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10457: - Fix Version/s: 1.8.0 > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.7.0 >Reporter: Jihyun Cho >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11487) Support for writing data to Apache Flume
[ https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-11487. Resolution: Invalid Release Note: (was: [FLINK-4446] Remove Flume connector (now in Bahir)) Bahir contains a Flume connector > Support for writing data to Apache Flume > > > Key: FLINK-11487 > URL: https://issues.apache.org/jira/browse/FLINK-11487 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.7.1 > Environment: JDK 1.8 > Scala 2.11 > Flink 1.7.1 > Apache Flume 1.6.0 >Reporter: ambition >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Flume is a distributed, reliable, and available service for efficiently > collecting, aggregating, and moving large amounts of data, has many users. > Unfortunately, Flink does not currently support with data to Flume. > The following is the official website of flume and github source address: > [Apache Flume website|http://flume.apache.org/index.html] > [Apache Flume github|https://github.com/apache/flume] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10457. Resolution: Fixed > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.7.0 >Reporter: Jihyun Cho >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11487) Support for writing data to Apache Flume
[ https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11487: - Fix Version/s: (was: 1.7.2) (was: 1.8.0) > Support for writing data to Apache Flume > > > Key: FLINK-11487 > URL: https://issues.apache.org/jira/browse/FLINK-11487 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.7.1 > Environment: JDK 1.8 > Scala 2.11 > Flink 1.7.1 > Apache Flume 1.6.0 >Reporter: ambition >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Flume is a distributed, reliable, and available service for efficiently > collecting, aggregating, and moving large amounts of data, has many users. > Unfortunately, Flink does not currently support with data to Flume. > The following is the official website of flume and github source address: > [Apache Flume website|http://flume.apache.org/index.html] > [Apache Flume github|https://github.com/apache/flume] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10910) Harden Kubernetes e2e test
[ https://issues.apache.org/jira/browse/FLINK-10910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10910: - Issue Type: Improvement (was: Bug) > Harden Kubernetes e2e test > -- > > Key: FLINK-10910 > URL: https://issues.apache.org/jira/browse/FLINK-10910 > Project: Flink > Issue Type: Improvement > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Dawid Wysakowicz >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The {{Kubernetes test}} (e2e test) sometimes fails with the following output: > {code} > == > Running 'Run Kubernetes test' > == > TEST_DATA_DIR: > /home/admin/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-40594844780 > Flink dist directory: /home/admin/flink-1.7.0 > /home/admin/flink/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh: > line 44: none: command not f > ound > Using flink dist: ../../flink-dist/target/flink-*-bin > ./ > ./flink-1.7-SNAPSHOT/ > ./flink-1.7-SNAPSHOT/LICENSE > ./flink-1.7-SNAPSHOT/examples/ > ./flink-1.7-SNAPSHOT/examples/gelly/ > ./flink-1.7-SNAPSHOT/examples/gelly/flink-gelly-examples_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/examples/streaming/ > ./flink-1.7-SNAPSHOT/examples/streaming/WindowJoin.jar > ./flink-1.7-SNAPSHOT/examples/streaming/SocketWindowWordCount.jar > ./flink-1.7-SNAPSHOT/examples/streaming/StateMachineExample.jar > ./flink-1.7-SNAPSHOT/examples/streaming/Kafka010Example.jar > ./flink-1.7-SNAPSHOT/examples/streaming/Kafka011Example.jar > ./flink-1.7-SNAPSHOT/examples/streaming/SessionWindowing.jar > ./flink-1.7-SNAPSHOT/examples/streaming/IncrementalLearning.jar > ./flink-1.7-SNAPSHOT/examples/streaming/KafkaExample.jar > ./flink-1.7-SNAPSHOT/examples/streaming/WordCount.jar > ./flink-1.7-SNAPSHOT/examples/streaming/Twitter.jar > ./flink-1.7-SNAPSHOT/examples/streaming/Iteration.jar > ./flink-1.7-SNAPSHOT/examples/streaming/TopSpeedWindowing.jar > ./flink-1.7-SNAPSHOT/examples/batch/ > ./flink-1.7-SNAPSHOT/examples/batch/KMeans.jar > ./flink-1.7-SNAPSHOT/examples/batch/PageRank.jar > ./flink-1.7-SNAPSHOT/examples/batch/WebLogAnalysis.jar > ./flink-1.7-SNAPSHOT/examples/batch/WordCount.jar > ./flink-1.7-SNAPSHOT/examples/batch/EnumTriangles.jar > ./flink-1.7-SNAPSHOT/examples/batch/DistCp.jar > ./flink-1.7-SNAPSHOT/examples/batch/TransitiveClosure.jar > ./flink-1.7-SNAPSHOT/examples/batch/ConnectedComponents.jar > ./flink-1.7-SNAPSHOT/examples/python/ > ./flink-1.7-SNAPSHOT/examples/python/streaming/ > ./flink-1.7-SNAPSHOT/examples/python/streaming/fibonacci.py > ./flink-1.7-SNAPSHOT/examples/python/streaming/word_count.py > ./flink-1.7-SNAPSHOT/examples/python/batch/ > ./flink-1.7-SNAPSHOT/examples/python/batch/TriangleEnumeration.py > ./flink-1.7-SNAPSHOT/examples/python/batch/TPCHQuery3.py > ./flink-1.7-SNAPSHOT/examples/python/batch/WebLogAnalysis.py > ./flink-1.7-SNAPSHOT/examples/python/batch/TPCHQuery10.py > ./flink-1.7-SNAPSHOT/examples/python/batch/WordCount.py > ./flink-1.7-SNAPSHOT/examples/python/batch/__init__.py > ./flink-1.7-SNAPSHOT/log/ > ./flink-1.7-SNAPSHOT/opt/ > ./flink-1.7-SNAPSHOT/opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-table_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-metrics-dropwizard-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-cep_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-metrics-graphite-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-gelly-scala_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-s3-fs-hadoop-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-metrics-statsd-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-queryable-state-runtime_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-streaming-python_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-ml_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-gelly_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-swift-fs-hadoop-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-cep-scala_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-s3-fs-presto-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-metrics-datadog-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/opt/flink-sql-client_2.11-1.7-SNAPSHOT.jar > ./flink-1.7-SNAPSHOT/NOTICE > ./flink-1.7-SNAPSHOT/README.txt > ./flink-1.7-SNAPSHOT/lib/ > ./flink-1.7-SNAPSHOT/lib/flink-python_2.11-1.7-SNAPSHOT.jar >
[jira] [Updated] (FLINK-11201) Document SBT dependency requirements when using MiniClusterResource
[ https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11201: - Summary: Document SBT dependency requirements when using MiniClusterResource (was: flink-test-utils dependency issue) > Document SBT dependency requirements when using MiniClusterResource > --- > > Key: FLINK-11201 > URL: https://issues.apache.org/jira/browse/FLINK-11201 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: eugen yushin >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.2, 1.8.0 > > > Starting with Flink 1.7, there's lack of > `runtime.testutils.MiniClusterResource` class in `flink-test-utils` > distribution. > Steps to reproduce (Scala code) > build.sbt > {code} > name := "flink-17-test-issue" > organization := "x.y.z" > scalaVersion := "2.11.12" > val flinkVersion = "1.7.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.scalatest" %% "scalatest" % "3.0.5" % Test, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test > // ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier > Artifact.TestsClassifier > ) > {code} > test class: > {code} > class SimpleTest extends AbstractTestBase with FlatSpecLike { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > env.setRestartStrategy(RestartStrategies.noRestart()) > "SimpleTest" should "work" in { > val inputDs = env.fromElements(1,2,3) > inputDs.print() > env.execute() > } > } > {code} > Results in: > {code} > A needed class was not found. This could be due to an error in your runpath. > Missing class: org/apache/flink/runtime/testutils/MiniClusterResource > java.lang.NoClassDefFoundError: > org/apache/flink/runtime/testutils/MiniClusterResource > ... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.testutils.MiniClusterResource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 31 more > {code} > This can be fixed by adding flink-runtime distribution with test classifier > into dependencies list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann merged pull request #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever
tillrohrmann merged pull request #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever URL: https://github.com/apache/flink/pull/7701 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-11602) Remove legacy AkkaJobManagerRetriever
[ https://issues.apache.org/jira/browse/FLINK-11602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11602. --- Resolution: Fixed Fixed via https://github.com/apache/flink/commit/02ff4bfe90d8e8b896c9f1a1bdbe8d43a48f5de7 > Remove legacy AkkaJobManagerRetriever > - > > Key: FLINK-11602 > URL: https://issues.apache.org/jira/browse/FLINK-11602 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10721) Kafka discovery-loop exceptions may be swallowed
[ https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10721: - Summary: Kafka discovery-loop exceptions may be swallowed (was: kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method ) > Kafka discovery-loop exceptions may be swallowed > > > Key: FLINK-10721 > URL: https://issues.apache.org/jira/browse/FLINK-10721 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.6.2 >Reporter: zhaoshijie >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > In FlinkKafkaConsumerBase run method on line 721(master branch), if > kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw > exception then finally execute cancel method, cancel method will execute > kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute > handover.close, then result in handover.pollNext throw ClosedException),then > next code will not execute,especially discoveryLoopError not be throwed,so, > real culprit exception will be Swallowed. > failed log like this: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695) > at java.lang.Thread.run(Thread.java:745) > {code} > Shoud we modify it as follows? > {code:java} > try { > kafkaFetcher.runFetchLoop(); > } catch (Exception e) { > // if discoveryLoopErrorRef not null ,we should > throw real culprit exception > if (discoveryLoopErrorRef.get() != null){ > throw new > RuntimeException(discoveryLoopErrorRef.get()); > } else { > throw e; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10807) KafkaConsumer still consume removed topic after changing topics list
[ https://issues.apache.org/jira/browse/FLINK-10807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-10807. Resolution: Duplicate > KafkaConsumer still consume removed topic after changing topics list > > > Key: FLINK-10807 > URL: https://issues.apache.org/jira/browse/FLINK-10807 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.6.2 >Reporter: bupt_ljy >Assignee: bupt_ljy >Priority: Major > > subscribedPartitionsToStartOffsets in KafkaConsumerBase is getting values > from restoredState, which is initialized in initializeState and discovering > partitions. However, if we remove a topic in topics list and restore the > Flink program, the restoredState still keeps removed topic, and the fetcher > still fetches the data of the removed topic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11618) [state] Refactor operator state repartition mechanism
[ https://issues.apache.org/jira/browse/FLINK-11618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-11618: - Description: Currently we have state assignment strategy of operator state below: * When parallelism not changed: ** If we only have even-split redistributed state, state assignment would try to keep as the same as previously (actually not always the same). ** If we have union redistributed state, all the operator state would be redistributed as the new state assignment. * When parallelism changed: ** all the operator state would be redistributed as the new state assignment. There existed two problems *when parallelism not changed*: # If we only have even-split redistributed state, current implementation actually cannot ensure state assignment to keep as the same as previously. This is because current {{StateAssignmentOperation#collectPartitionableStates}} would repartition {{managedOperatorStates}} without subtask-index information. Take an example, if we have a operator-state with parallelism as 2, and subtask-0's managed-operatorstate is empty while subtask-1 not. Although new parallelism still keeps as 2, after {{StateAssignmentOperation#collectPartitionableStates}} and state assigned, subtask-0 would be assigned the managed-operatorstate while subtask-1 got none. # We should only redistribute union state and not touch the even-split state. Redistribute even-split state would cause unexpected behavior after {{RestartPipelinedRegionStrategy}} supported to restore state. We should fix the above two problems and this issue is a prerequisite of FLINK-10712 and FLINK-10713 . was: Currently we have state assignment strategy of operator state below: * When parallelism not changed: ** If we only have even-split redistributed state, state assignment would try to keep as the same as previously (actually not always the same). ** If we have union redistributed state, all the operator state would be redistributed as the new state assignment. * When parallelism changed: ** all the operator state would be redistributed as the new state assignment. There existed two problems *when parallelism not changed*: # If we only have even-split redistributed state, current implementation actually cannot ensure state assignment to keep as the same as previously. This is because current {{StateAssignmentOperation#collectPartitionableStates}} would repartition {{managedOperatorStates}} without subtask-index information. Take and example, if we have a operator-state with parallelism as 2, and subtask-0's managed-operatorstate is empty while subtask-1 not. Although new parallelism still keeps as 2, after {{StateAssignmentOperation#collectPartitionableStates}}, subtask-0 would be assigned the managed-operatorstate but subtask-1 get none. # We should only redistribute union state and not touch the even-split state. Redistribute even-split state would cause unexpected behavior after {{RestartPipelinedRegionStrategy}} supported to restore state. We should fix the above two problems and this issue is a prerequisite of FLINK-10712 and FLINK-10713 . > [state] Refactor operator state repartition mechanism > - > > Key: FLINK-11618 > URL: https://issues.apache.org/jira/browse/FLINK-11618 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.8.0 > > > Currently we have state assignment strategy of operator state below: > * When parallelism not changed: > ** If we only have even-split redistributed state, state assignment would > try to keep as the same as previously (actually not always the same). > ** If we have union redistributed state, all the operator state would be > redistributed as the new state assignment. > * When parallelism changed: > ** all the operator state would be redistributed as the new state assignment. > There existed two problems *when parallelism not changed*: > # If we only have even-split redistributed state, current implementation > actually cannot ensure state assignment to keep as the same as previously. > This is because current > {{StateAssignmentOperation#collectPartitionableStates}} would repartition > {{managedOperatorStates}} without subtask-index information. Take an example, > if we have a operator-state with parallelism as 2, and subtask-0's > managed-operatorstate is empty while subtask-1 not. Although new parallelism > still keeps as 2, after > {{StateAssignmentOperation#collectPartitionableStates}} and state assigned, > subtask-0 would be assigned the managed-operatorstate while subtask-1 got > none. > # We should only redistribute union state and not touch
[jira] [Created] (FLINK-11618) [state] Refactor operator state repartition mechanism
Yun Tang created FLINK-11618: Summary: [state] Refactor operator state repartition mechanism Key: FLINK-11618 URL: https://issues.apache.org/jira/browse/FLINK-11618 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.7.0 Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.8.0 Currently we have state assignment strategy of operator state below: * When parallelism not changed: ** If we only have even-split redistributed state, state assignment would try to keep as the same as previously (actually not always the same). ** If we have union redistributed state, all the operator state would be redistributed as the new state assignment. * When parallelism changed: ** all the operator state would be redistributed as the new state assignment. There existed two problems *when parallelism not changed*: # If we only have even-split redistributed state, current implementation actually cannot ensure state assignment to keep as the same as previously. This is because current {{StateAssignmentOperation#collectPartitionableStates}} would repartition {{managedOperatorStates}} without subtask-index information. Take and example, if we have a operator-state with parallelism as 2, and subtask-0's managed-operatorstate is empty while subtask-1 not. Although new parallelism still keeps as 2, after {{StateAssignmentOperation#collectPartitionableStates}}, subtask-0 would be assigned the managed-operatorstate but subtask-1 get none. # We should only redistribute union state and not touch the even-split state. Redistribute even-split state would cause unexpected behavior after {{RestartPipelinedRegionStrategy}} supported to restore state. We should fix the above two problems and this issue is a prerequisite of FLINK-10712 and FLINK-10713 . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256882174 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java ## @@ -77,8 +76,8 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway(); - slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + final SlotPoolImpl slotPoolGateway = slotPoolResource.getSlotPool(); Review comment: But can be rewritten This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256881369 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java ## @@ -77,8 +76,8 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway(); - slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + final SlotPoolImpl slotPoolGateway = slotPoolResource.getSlotPool(); Review comment: It is actually required for method `offerSlot` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7692: [FLINK-11121][license] Check and update licensing notes for Aliyun FS
zentol commented on issue #7692: [FLINK-11121][license] Check and update licensing notes for Aliyun FS URL: https://github.com/apache/flink/pull/7692#issuecomment-463667794 Needs a rebase now that #7599 was merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256878807 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java ## @@ -77,9 +77,18 @@ public TestingComponentMainThreadExecutorServiceAdapter getMainThreadExecutor() */ public static class Resource extends ExternalResource { + private long shutdownTimeoutMillis; private TestingComponentMainThreadExecutor componentMainThreadTestExecutor; private ScheduledExecutorService innerExecutorService; + public Resource() { + this(500L); Review comment: Yes, I think 500ms are (more) than enough for what the test need. Technically I think they could also run all run fine with 0L. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256852009 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ## @@ -125,8 +146,10 @@ public void testScheduleToDeploying() { new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext(; final SimpleSlot slot = instance.allocateSimpleSlot(); + Scheduler scheduler = mock(Scheduler.class); CompletableFuture future = new CompletableFuture<>(); future.complete(slot); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), any(SlotProfile.class), anyBoolean(), any(Time.class))).thenReturn(future); Review comment: Please remove mocks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256859785 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java ## @@ -72,8 +71,8 @@ public void testSingleQueuedSharedSlotScheduling() throws Exception { (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway(); - slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + final SlotPoolImpl slotPool = slotPoolResource.getSlotPool(); + slotPoolResource.executeInMainThreadAndJoin(() -> slotPool.registerTaskManager(taskManagerLocation.getResourceID())); Review comment: I think `executeInMainThreadAndJoin` is not needed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256864344 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ## @@ -730,31 +676,33 @@ public void testReleasingIdleSlotFailed() throws Exception { // expected } - } finally { - RpcUtils.terminateRpcEndpoint(slotPool, timeout); } } +// private V executeInMainThreadAndJoin(Supplier supplier) throws Exception { +// return CompletableFuture.supplyAsync(supplier, mainThreadExecutor).get(); +// } Review comment: Please remove This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256842090 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.TestLogger; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +public abstract class SlotSelectionStrategyTestBase extends TestLogger { + + protected final ResourceProfile resourceProfile = new ResourceProfile(2, 1024); + protected final ResourceProfile biggerResourceProfile = new ResourceProfile(3, 1024); + + protected final AllocationID aid1 = new AllocationID(); + protected final AllocationID aid2 = new AllocationID(); + protected final AllocationID aid3 = new AllocationID(); + protected final AllocationID aid4 = new AllocationID(); + protected final AllocationID aidX = new AllocationID(); + + protected final TaskManagerLocation tml1 = new TaskManagerLocation(new ResourceID("tm-1"), InetAddress.getLoopbackAddress(), 42); + protected final TaskManagerLocation tml2 = new TaskManagerLocation(new ResourceID("tm-2"), InetAddress.getLoopbackAddress(), 43); + protected final TaskManagerLocation tml3 = new TaskManagerLocation(new ResourceID("tm-3"), InetAddress.getLoopbackAddress(), 44); + protected final TaskManagerLocation tml4 = new TaskManagerLocation(new ResourceID("tm-4"), InetAddress.getLoopbackAddress(), 45); + protected final TaskManagerLocation tmlX = new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46); + + protected final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + protected SimpleSlotContext ssc1 = new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, resourceProfile); + protected SimpleSlotContext ssc2 = new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, biggerResourceProfile); + protected SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile); + protected SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile); Review comment: Can these fields be `final`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256859879 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java ## @@ -102,8 +97,10 @@ protected void after() { } private void terminateSlotPool() { - slotPool.shutDown(); - CompletableFuture terminationFuture = slotPool.getTerminationFuture(); - terminationFuture.join(); + slotPool.close(); + } + + public V executeInMainThreadAndJoin(Supplier supplier) throws Exception { Review comment: I think this method should not be necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256868137 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -70,9 +70,10 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; +import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; Review comment: Wrong import order This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256851648 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ## @@ -58,14 +69,21 @@ public void testSlotReleasedWhenScheduledImmediately() { CompletableFuture future = new CompletableFuture<>(); future.complete(slot); + Scheduler scheduler = new TestingScheduler() { + @Override + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + SlotProfile slotProfile, + boolean allowQueuedScheduling, + Time allocationTimeout) { + return future; + } + }; assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot - vertex.scheduleForExecution( - new TestingSlotProvider(ignore -> future), - false, - LocationPreferenceConstraint.ALL, - Collections.emptySet()); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet()); Review comment: This can be done easier by replacing `scheduler` with `new TestingSlotProvider(ignored -> future)`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256855501 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestingScheduler.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +public class TestingScheduler implements Scheduler { + @Override + public void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor) { + + } + + @Override + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + SlotProfile slotProfile, + boolean allowQueuedScheduling, + Time allocationTimeout) { + return null; Review comment: Since we don't need this class, it doesn't matter, but I think it would be better to return some sane default values which would not break the method contract in a testing implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256855186 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestingScheduler.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +public class TestingScheduler implements Scheduler { Review comment: I think we don't need this class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256857264 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java ## @@ -43,7 +41,7 @@ Review comment: `DEFAULT_TESTING_BIG_PROFILE` is no longer needed here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256852629 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java ## @@ -77,9 +77,18 @@ public TestingComponentMainThreadExecutorServiceAdapter getMainThreadExecutor() */ public static class Resource extends ExternalResource { + private long shutdownTimeoutMillis; private TestingComponentMainThreadExecutor componentMainThreadTestExecutor; private ScheduledExecutorService innerExecutorService; + public Resource() { + this(500L); Review comment: Not equivalent with the old shut down timeout. Was this change in value intended? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256871460 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -525,12 +525,12 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { slotProvider.allocateSlot( Review comment: I think callbacks on the `logicalSlotFuture` no longer need to be executed in the main thread executor, because the `logicalSlotFuture` should be completed by the main thread. This should affect line `Execution.java:548`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256867000 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java ## @@ -281,13 +250,11 @@ public void testExtraSlotsAreKept() throws Exception { final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + testMainThreadExecutor.execute(() ->pool.registerTaskManager(taskManagerLocation.getResourceID())); Review comment: whitespace missing after `->` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256859426 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java ## @@ -52,17 +52,16 @@ import static org.junit.Assert.fail; /** - * Test cases for slot sharing with the {@link SlotPool}. + * Test cases for slot sharing with the {@link SlotPoolImpl}. */ public class SlotPoolSlotSharingTest extends TestLogger { @ClassRule public static final TestingRpcServiceResource testingRpcServiceResource = new TestingRpcServiceResource(); Review comment: Should no longer be needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256863219 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ## @@ -97,34 +93,30 @@ private TestingResourceManagerGateway resourceManagerGateway; + private ComponentMainThreadExecutor mainThreadExecutor = + TestingComponentMainThreadExecutorServiceAdapter.forMainThread(); + @Before public void setUp() throws Exception { - this.rpcService = new TestingRpcService(); this.jobId = new JobID(); taskManagerLocation = new LocalTaskManagerLocation(); taskManagerGateway = new SimpleAckingTaskManagerGateway(); resourceManagerGateway = new TestingResourceManagerGateway(); } - @After - public void tearDown() throws Exception { - RpcUtils.terminateRpcService(rpcService, timeout); - } - @Test public void testAllocateSimpleSlot() throws Exception { CompletableFuture slotRequestFuture = new CompletableFuture<>(); - resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest)); - - final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance()); + resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try { - SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); + try (SlotPoolImpl newSlotPool = new SlotPoolImpl(jobId)) { + SlotPoolImpl slotPool = setupSlotPool(newSlotPool, resourceManagerGateway, mainThreadExecutor); Review comment: The aliasing of `newSlotPool` with `slotPool` is a bit confusing. I think we can change `setupSlotPool` to return `void`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256858255 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java ## @@ -77,8 +76,8 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway(); - slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + final SlotPoolImpl slotPoolGateway = slotPoolResource.getSlotPool(); Review comment: could be `final SlotPool slotPool` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256863744 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ## @@ -160,19 +150,18 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { } }); - final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance()); - - try { - SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + try (SlotPoolImpl newSlotPool = new SlotPoolImpl(jobId)) { + SlotPoolImpl slotPool = setupSlotPool(newSlotPool, resourceManagerGateway, mainThreadExecutor); Review comment: The same applies for the other tests I assume with aliasing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256849683 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ## @@ -357,11 +355,8 @@ public void testActionsWhileCancelling() { AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); - vertex.scheduleForExecution( - new TestingSlotProvider(ignore -> new CompletableFuture<>()), - false, - LocationPreferenceConstraint.ALL, - Collections.emptySet()); + Scheduler scheduler = mock(Scheduler.class); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet()); Review comment: We could avoid the mock by using the `ProgrammedSlotProvider` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256851926 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ## @@ -92,6 +110,9 @@ public void testSlotReleasedWhenScheduledQueued() { final CompletableFuture future = new CompletableFuture<>(); + Scheduler scheduler = mock(Scheduler.class); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), any(SlotProfile.class), anyBoolean(), any(Time.class))).thenReturn(future); Review comment: Please remove mocks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256864748 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ## @@ -805,53 +750,61 @@ public void testFreeFailedSlots() throws Exception { */ @Test public void testFailingAllocationFailsPendingSlotRequests() throws Exception { - final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance()); - try { + try (SlotPoolImpl newSlotPool = new SlotPoolImpl(jobId)) { final CompletableFuture allocationIdFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); - final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - final CompletableFuture slotFuture = allocateSlot(slotPoolGateway, new SlotRequestId()); + SlotPool slotPool = setupSlotPool(newSlotPool, resourceManagerGateway, mainThreadExecutor); + Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + + final CompletableFuture slotFuture = allocateSlot(scheduler, new SlotRequestId()); final AllocationID allocationId = allocationIdFuture.get(); assertThat(slotFuture.isDone(), is(false)); final FlinkException cause = new FlinkException("Fail pending slot request failure."); - final CompletableFuture> responseFuture = slotPoolGateway.failAllocation(allocationId, cause); + final Optional responseFuture = slotPool.failAllocation(allocationId, cause); - assertThat(responseFuture.get().isPresent(), is(false)); + assertThat(responseFuture.isPresent(), is(false)); try { slotFuture.get(); fail("Expected a slot allocation failure."); } catch (ExecutionException ee) { assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause)); } - } finally { - RpcUtils.terminateRpcEndpoint(slotPool, timeout); } } - private CompletableFuture allocateSlot(SlotPoolGateway slotPoolGateway, SlotRequestId slotRequestId) { - return slotPoolGateway.allocateSlot( + private CompletableFuture allocateSlot(Scheduler scheduler, SlotRequestId slotRequestId) { + return scheduler.allocateSlot( slotRequestId, new DummyScheduledUnit(), SlotProfile.noRequirements(), true, timeout); } - private static SlotPoolGateway setupSlotPool( - SlotPool slotPool, - ResourceManagerGateway resourceManagerGateway) throws Exception { + private static SlotPoolImpl setupSlotPool( Review comment: Let's change it to `void` in order to solve the aliasing problem in the tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
tillrohrmann commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r256860858 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java ## @@ -1,531 +1,531 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster.slotpool; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlotContext; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.SlotContext; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; -import org.apache.flink.util.AbstractID; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.Collections; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * Test cases for the {@link SlotSharingManager}. - */ -public class SlotSharingManagerTest extends TestLogger { - - private static final SlotSharingGroupId SLOT_SHARING_GROUP_ID = new SlotSharingGroupId(); - - private static final DummySlotOwner SLOT_OWNER = new DummySlotOwner(); - - @Test - public void testRootSlotCreation() { - final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions(); - - final SlotSharingManager slotSharingManager = new SlotSharingManager( - SLOT_SHARING_GROUP_ID, - allocatedSlotActions, - SLOT_OWNER); - - SlotRequestId slotRequestId = new SlotRequestId(); - SlotRequestId allocatedSlotRequestId = new SlotRequestId(); - - final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot( - slotRequestId, - new CompletableFuture<>(), - allocatedSlotRequestId); - - assertEquals(slotRequestId, multiTaskSlot.getSlotRequestId()); - assertNotNull(slotSharingManager.getTaskSlot(slotRequestId)); - } - - @Test - public void testRootSlotRelease() throws ExecutionException, InterruptedException { - final CompletableFuture slotReleasedFuture = new CompletableFuture<>(); - final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions(); - - allocatedSlotActions.setReleaseSlotConsumer( - tuple3 -> slotReleasedFuture.complete(tuple3.f0)); - - final SlotSharingManager slotSharingManager = new SlotSharingManager( - SLOT_SHARING_GROUP_ID, - allocatedSlotActions, - SLOT_OWNER); - - SlotRequestId slotRequestId = new SlotRequestId(); - SlotRequestId allocatedSlotRequestId = new SlotRequestId(); - - CompletableFuture slotContextFuture = new CompletableFuture<>(); - - SlotSharingManager.MultiTaskSlot rootSlot = slotSharingManager.createRootSlot( - slotRequestId, - slotContextFuture, -
[GitHub] flinkbot commented on issue #7708: [WIP][FLINK-10569][tests] Clean up uses of Scheduler and Instance in valid tests(Batch 2)
flinkbot commented on issue #7708: [WIP][FLINK-10569][tests] Clean up uses of Scheduler and Instance in valid tests(Batch 2) URL: https://github.com/apache/flink/pull/7708#issuecomment-463659717 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun opened a new pull request #7708: [FLINK-10569][tests] Clean up uses of Scheduler and Instance in valid tests(Batch 2)
TisonKun opened a new pull request #7708: [FLINK-10569][tests] Clean up uses of Scheduler and Instance in valid tests(Batch 2) URL: https://github.com/apache/flink/pull/7708 ## What is the purpose of the change 1. Remove `Instance` usage in ExecutionGraphDeploymentTest ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11373) CliFrontend cuts off reason for error messages
[ https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768320#comment-16768320 ] 金钟镛 commented on FLINK-11373: - Hello,is anyone still working on this issue?If not,do you mind assigning to me? > CliFrontend cuts off reason for error messages > -- > > Key: FLINK-11373 > URL: https://issues.apache.org/jira/browse/FLINK-11373 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Maximilian Michels >Assignee: leesf >Priority: Minor > Labels: pull-request-available, starter > Time Spent: 10m > Remaining Estimate: 0h > > The CliFrontend seems to only print the first message in the strace trace and > not any of its causes. > {noformat} > bin/flink run /non-existing/path > Could not build the program from JAR file. > Use the help option (-h or --help) to get help on the command. > {noformat} > Notice, the underlying cause of this message is FileNotFoundException. > Consider changing > a) the error message for this particular case > b) the way the stack trace messages are trimmed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading
[ https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768330#comment-16768330 ] Jamie Grier commented on FLINK-11617: - Here's an example: Stacktrace is: {{java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3 retry attempts returned ProvisionedThroughputExceededException.}} {{ at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:234)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:373)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:216)}} {{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}} {{ at java.lang.Thread.run(Thread.java:748)}} But the root cause is actually given by this log line: {{Got recoverable SdkClientException. Backing off for 140 millis (null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: c49c8e5b-a068-9733-9043-b215d51b0aa1))}} > Kinesis Connector getRecords() failure logging is misleading > > > Key: FLINK-11617 > URL: https://issues.apache.org/jira/browse/FLINK-11617 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There isn't enough information in the current logging to diagnose a > getRecords() failure. Also there is a hardcoded string that states the > failure cause was always ProvisionedThroughputExceededException which isn't > true. There are many possible causes of failures. This is misleading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter merged pull request #7599: [FLINK-11442] upgrade oss sdk version
StefanRRichter merged pull request #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11616) Flink official document has an error
[ https://issues.apache.org/jira/browse/FLINK-11616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768307#comment-16768307 ] Fabian Hueske commented on FLINK-11616: --- You would need to provide a pull request. Once it got merged you'll be the author of a commit in Flink. > Flink official document has an error > > > Key: FLINK-11616 > URL: https://issues.apache.org/jira/browse/FLINK-11616 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: xulinjie >Assignee: xulinjie >Priority: Major > Attachments: wx20190214-214...@2x.png > > > The page url is > [https://ci.apache.org/projects/flink/flink-docs-master/tutorials/flink_on_windows.html] > The mistake is in paragraph “Installing Flink from Git”. > “The solution is to adjust the Cygwin settings to deal with the correct line > endings by following these three steps:”, > The sequence of steps you wrote was "1, 2, 1".But I think you might want to > write "1, 2, 3". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10241) Reduce performance/stability impact of latency metrics
[ https://issues.apache.org/jira/browse/FLINK-10241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10241: Fix Version/s: (was: 1.6.4) 1.6.5 > Reduce performance/stability impact of latency metrics > -- > > Key: FLINK-10241 > URL: https://issues.apache.org/jira/browse/FLINK-10241 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.8.0, 1.6.5 > > > Umbrella issue for performance/stability improvements around the latency > metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10954) Hardlink from files of previous local stored state might cross devices
[ https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10954: Fix Version/s: (was: 1.6.4) 1.6.5 > Hardlink from files of previous local stored state might cross devices > -- > > Key: FLINK-10954 > URL: https://issues.apache.org/jira/browse/FLINK-10954 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.7.3, 1.8.0, 1.6.5 > > > Currently, local recovery's base directories is initialized from > '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not > set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', > which might consist of directories from different devices, such as > /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB > is initialized from IOManager's spillingDirectories, which might located in > different device from local recovery's folder. However, hard-link between > different devices is not allowed, it will throw exception below: > {code:java} > java.nio.file.FileSystemException: target -> souce: Invalid cross-device link > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)