[jira] [Commented] (FLINK-8966) Port AvroExternalJarProgramITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425088#comment-16425088 ] ASF GitHub Bot commented on FLINK-8966: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5766 merging. > Port AvroExternalJarProgramITCase to flip6 > -- > > Key: FLINK-8966 > URL: https://issues.apache.org/jira/browse/FLINK-8966 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5766: [FLINK-8966][tests] Port AvroExternalJarProgramITCase to ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5766 merging. ---
[jira] [Assigned] (FLINK-6924) ADD LOG(X) supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi reassigned FLINK-6924: Assignee: Jiayi (was: zjuwangg) > ADD LOG(X) supported in TableAPI > > > Key: FLINK-6924 > URL: https://issues.apache.org/jira/browse/FLINK-6924 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Jiayi >Priority: Major > Labels: starter > > See FLINK-6891 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8953) Resolve unresolved field references in FieldComputer expressions
[ https://issues.apache.org/jira/browse/FLINK-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu reassigned FLINK-8953: - Assignee: Renjie Liu > Resolve unresolved field references in FieldComputer expressions > > > Key: FLINK-8953 > URL: https://issues.apache.org/jira/browse/FLINK-8953 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > > When implementing the {{FieldComputer.getExpression}} method, it is not > possible to use API classes but only internal expression case classes. > It would be great to also define timestamp extractors like: > {code} > def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression > = { > // 'x.cast(Types.LONG) > // ExpressionParser.parseExpression("x.cast(LONG)") > } > {code} > An even better solution would be to provide different `getExpression()` > methods that an implementor can override. The general goal should be to > define this as natural as possible. In the future we should also support SQL: > {code} > def getJavaExpression(fieldAccesses: Array[ResolvedFieldReference]): String > = { > "x.cast(LONG)" > } > def getSQLExpression(fieldAccesses: Array[ResolvedFieldReference]): String > = { > "CAST(x AS LONG)" > } > {code} > The final design is still up for discussion. These are just ideas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9088) Upgrade Nifi connector dependency to 1.6.0
[ https://issues.apache.org/jira/browse/FLINK-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9088: -- Component/s: Streaming Connectors > Upgrade Nifi connector dependency to 1.6.0 > -- > > Key: FLINK-9088 > URL: https://issues.apache.org/jira/browse/FLINK-9088 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > > Currently dependency of Nifi is 0.6.1 > We should upgrade to 1.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8971) Create general purpose testing job
[ https://issues.apache.org/jira/browse/FLINK-8971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421148#comment-16421148 ] mingleizhang edited comment on FLINK-8971 at 4/4/18 1:49 AM: - Hi, [~till.rohrmann] The link you pushed here page not found [heavily misbehaved job|https://github.com/rmetzger/flink-testing-jobs/blob/flink-1.4/basic-jobs/src/main/java/com/dataartisans/heavymisbehaved/HeavyMisbehavedJob.java] and [state machine job|https://github.com/dataArtisans/flink-testing-jobs/tree/master/streaming-state-machine] Could you please check that again ? was (Author: mingleizhang): Page not found [heavily misbehaved job|https://github.com/rmetzger/flink-testing-jobs/blob/flink-1.4/basic-jobs/src/main/java/com/dataartisans/heavymisbehaved/HeavyMisbehavedJob.java] and state machine job also. > Create general purpose testing job > -- > > Key: FLINK-8971 > URL: https://issues.apache.org/jira/browse/FLINK-8971 > Project: Flink > Issue Type: Task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.5.0 > > > In order to write better end-to-end tests we need a general purpose testing > job which comprises as many Flink aspects as possible. These include > different types for records and state, user defined components, state types > and operators. > The job should allow to activate a certain misbehavior, such as slowing > certain paths down or throwing exceptions to simulate failures. > The job should come with a data generator which generates input data such > that the job can verify it's own behavior. This includes the state as well as > the input/output records. > We already have the [heavily misbehaved > job|https://github.com/rmetzger/flink-testing-jobs/blob/flink-1.4/basic-jobs/src/main/java/com/dataartisans/heavymisbehaved/HeavyMisbehavedJob.java] > which simulates some misbehavior. There is also the [state machine > job|https://github.com/dataArtisans/flink-testing-jobs/tree/master/streaming-state-machine] > which can verify itself for invalid state changes which indicate data loss. > We should incorporate their characteristics into the new general purpose job. > Additionally, the general purpose job should contain the following aspects: > * Job containing a sliding window aggregation > * At least one generic Kryo type > * At least one generic Avro type > * At least one Avro specific record type > * At least one input type for which we register a Kryo serializer > * At least one input type for which we provide a user defined serializer > * At least one state type for which we provide a user defined serializer > * At least one state type which uses the AvroSerializer > * Include an operator with ValueState > * Value state changes should be verified (e.g. predictable series of values) > * Include an operator with operator state > * Include an operator with broadcast state > * Broadcast state changes should be verified (e.g. predictable series of > values) > * Include union state > * User defined watermark assigner > The job should be made available in the {{flink-end-to-end-tests}} module. > This issue is intended to serve as an umbrella issue for developing and > extending this job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9091: -- Component/s: Build System > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8933) Avoid calling Class#newInstance
[ https://issues.apache.org/jira/browse/FLINK-8933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8933: -- Description: Class#newInstance is deprecated starting in Java 9 - https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw undeclared checked exceptions. The suggested replacement is getDeclaredConstructor().newInstance(), which wraps the checked exceptions in InvocationException. was: Class#newInstance is deprecated starting in Java 9 - https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw undeclared checked exceptions. The suggested replacement is getDeclaredConstructor().newInstance(), which wraps the checked exceptions in InvocationException. > Avoid calling Class#newInstance > --- > > Key: FLINK-8933 > URL: https://issues.apache.org/jira/browse/FLINK-8933 > Project: Flink > Issue Type: Task >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Class#newInstance is deprecated starting in Java 9 - > https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw > undeclared checked exceptions. > The suggested replacement is getDeclaredConstructor().newInstance(), which > wraps the checked exceptions in InvocationException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9127) Filesystem State Backend logged incorrectly
[ https://issues.apache.org/jira/browse/FLINK-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424831#comment-16424831 ] ASF GitHub Bot commented on FLINK-9127: --- GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/5810 [FLINK-9127] [Core] Filesystem State Backend logged incorrectly ## What is the purpose of the change This pull-request fixes a message logged when during startup of the Flink Task-Manager and Job-Manager when a filesystem backend is in use. The old incorrect behavior produced a log message indicating that a heap-memory backend is use in. ## Brief change log - Fix the log message produced by the StateBackendLoader class in Core. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink FLINK-9127 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5810 commit 9e3774bb2a0ceedacde584a47aa9146bcbf47a3a Author: Scott Kidder Date: 2018-04-04T01:03:46Z [FLINK-9127] Fix log message for filesystem backend > Filesystem State Backend logged incorrectly > --- > > Key: FLINK-9127 > URL: https://issues.apache.org/jira/browse/FLINK-9127 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.2, 1.4.2 >Reporter: Scott Kidder >Priority: Trivial > > When using a filesystem backend, the > '[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]' > class produces a log message stating: "State backend is set to heap memory". > Example: > {{2018-04-04 00:45:49,591 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - State backend > is set to heap memory (checkpoints to filesystem > "hdfs://hdfs:8020/flink/checkpoints")}} > It looks like this resulted from some copy-pasta of the previous > case-statement that matches on the memory backend. This bug is also present > in earlier releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' > class. > This log statement should be corrected to indicate that a filesystem backend > is in use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5810: [FLINK-9127] [Core] Filesystem State Backend logge...
GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/5810 [FLINK-9127] [Core] Filesystem State Backend logged incorrectly ## What is the purpose of the change This pull-request fixes a message logged when during startup of the Flink Task-Manager and Job-Manager when a filesystem backend is in use. The old incorrect behavior produced a log message indicating that a heap-memory backend is use in. ## Brief change log - Fix the log message produced by the StateBackendLoader class in Core. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink FLINK-9127 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5810 commit 9e3774bb2a0ceedacde584a47aa9146bcbf47a3a Author: Scott Kidder Date: 2018-04-04T01:03:46Z [FLINK-9127] Fix log message for filesystem backend ---
[jira] [Created] (FLINK-9127) Filesystem State Backend logged incorrectly
Scott Kidder created FLINK-9127: --- Summary: Filesystem State Backend logged incorrectly Key: FLINK-9127 URL: https://issues.apache.org/jira/browse/FLINK-9127 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.4.2, 1.3.2 Reporter: Scott Kidder When using a filesystem backend, the '[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]' class produces a log message stating: "State backend is set to heap memory". Example: {{2018-04-04 00:45:49,591 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoints to filesystem "hdfs://hdfs:8020/flink/checkpoints")}} It looks like this resulted from some copy-pasta of the previous case-statement that matches on the memory backend. This bug is also present in earlier releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' class. This log statement should be corrected to indicate that a filesystem backend is in use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8835) Fix TaskManager config keys
[ https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424817#comment-16424817 ] ASF GitHub Bot commented on FLINK-8835: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5808 cc @StephanEwen @aljoscha > Fix TaskManager config keys > --- > > Key: FLINK-8835 > URL: https://issues.apache.org/jira/browse/FLINK-8835 > Project: Flink > Issue Type: Bug > Components: TaskManager >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Blocker > Labels: easy-fix > Fix For: 1.5.0 > > > Many new config keys in the TaskManager don't follow the proper naming > scheme. We need to clear those up before the release. I would also suggest to > keep the key names short, because that makes it easier for users. > When doing this cleanup pass over the config keys, I would suggest to also > make some of the existing keys more hierarchical harmonize them with the > common scheme in Flink. > h1. New Keys > * {{taskmanager.network.credit-based-flow-control.enabled}} to > {{taskmanager.network.credit-model}}. > h1. Existing Keys > * {{taskmanager.debug.memory.startLogThread}} => > {{taskmanager.debug.memory.log}} > * {{taskmanager.debug.memory.logIntervalMs}} => > {{taskmanager.debug.memory.log-interval}} > * {{taskmanager.initial-registration-pause}} => > {{taskmanager.registration.initial-backoff}} > * {{taskmanager.max-registration-pause}} => > {{taskmanager.registration.max-backoff}} > * {{taskmanager.refused-registration-pause}} > {{taskmanager.registration.refused-backoff}} > * {{taskmanager.maxRegistrationDuration}} ==> * > {{taskmanager.registration.timeout}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5808: [FLINK-8835] [taskmanager] Fix TaskManager config keys
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5808 cc @StephanEwen @aljoscha ---
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:43 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization, which means it stayed the same for the operator lifecycle. but different operators got different entropy, which is the key for scaling parallel operators. Conceptually, a better way is probably doing entropy substitution for each S3 write. We can make the change if this is desired. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. We can make the change if this is desired. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:24 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. We can make the change if this is desired. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:22 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625 ] Steven Zhen Wu commented on FLINK-9061: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8697) Rename DummyFlinkKafkaConsumer in Kinesis tests
[ https://issues.apache.org/jira/browse/FLINK-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424586#comment-16424586 ] ASF GitHub Bot commented on FLINK-8697: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5809 [FLINK-8697] [Kinesis Connector] Rename DummyFlinkKafkaConsumer in KinesisDataFetcherTest ## What is the purpose of the change `DummyFlinkKafkaConsumer` in `KinesisDataFetcherTest` should be named `DummyFlinkKinesisConsumer` ## Brief change log Rename `DummyFlinkKafkaConsumer` to `DummyFlinkKinesisConsumer` in Kinesis tests ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8697 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5809 commit 6c836a62248b51c762558e87a3e80410a19262c0 Author: Bowen Li Date: 2018-04-03T20:53:23Z [FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests > Rename DummyFlinkKafkaConsumer in Kinesis tests > --- > > Key: FLINK-8697 > URL: https://issues.apache.org/jira/browse/FLINK-8697 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector, Tests >Affects Versions: 1.5.0, 1.4.1 >Reporter: Chesnay Schepler >Assignee: Bowen Li >Priority: Trivial > Labels: easy-fix, starter > Fix For: 1.5.0, 1.6.0, 1.4.3 > > > In {{KinesisDataFetcherTest}} exists a class > {code} > private static class DummyFlinkKafkaConsumer extends > FlinkKinesisConsumer { > {code} > The class should be called {{DummyFlinkKinesisConsumer}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8697) Rename DummyFlinkKafkaConsumer in Kinesis tests
[ https://issues.apache.org/jira/browse/FLINK-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8697: Fix Version/s: 1.6.0 > Rename DummyFlinkKafkaConsumer in Kinesis tests > --- > > Key: FLINK-8697 > URL: https://issues.apache.org/jira/browse/FLINK-8697 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector, Tests >Affects Versions: 1.5.0, 1.4.1 >Reporter: Chesnay Schepler >Assignee: Bowen Li >Priority: Trivial > Labels: easy-fix, starter > Fix For: 1.5.0, 1.6.0, 1.4.3 > > > In {{KinesisDataFetcherTest}} exists a class > {code} > private static class DummyFlinkKafkaConsumer extends > FlinkKinesisConsumer { > {code} > The class should be called {{DummyFlinkKinesisConsumer}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5809: [FLINK-8697] [Kinesis Connector] Rename DummyFlink...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5809 [FLINK-8697] [Kinesis Connector] Rename DummyFlinkKafkaConsumer in KinesisDataFetcherTest ## What is the purpose of the change `DummyFlinkKafkaConsumer` in `KinesisDataFetcherTest` should be named `DummyFlinkKinesisConsumer` ## Brief change log Rename `DummyFlinkKafkaConsumer` to `DummyFlinkKinesisConsumer` in Kinesis tests ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8697 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5809 commit 6c836a62248b51c762558e87a3e80410a19262c0 Author: Bowen Li Date: 2018-04-03T20:53:23Z [FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests ---
[jira] [Assigned] (FLINK-8697) Rename DummyFlinkKafkaConsumer in Kinesis tests
[ https://issues.apache.org/jira/browse/FLINK-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-8697: --- Assignee: Bowen Li > Rename DummyFlinkKafkaConsumer in Kinesis tests > --- > > Key: FLINK-8697 > URL: https://issues.apache.org/jira/browse/FLINK-8697 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector, Tests >Affects Versions: 1.5.0, 1.4.1 >Reporter: Chesnay Schepler >Assignee: Bowen Li >Priority: Trivial > Labels: easy-fix, starter > Fix For: 1.5.0, 1.4.3 > > > In {{KinesisDataFetcherTest}} exists a class > {code} > private static class DummyFlinkKafkaConsumer extends > FlinkKinesisConsumer { > {code} > The class should be called {{DummyFlinkKinesisConsumer}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424552#comment-16424552 ] Steven Zhen Wu commented on FLINK-9061: --- it seems that S3 walk through the prefix from left to right until it finds some randomness for partitioning. it is more sophisticated than just first a few chars. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5797: [FLINK-9104][doc]Re-generate REST API documentatio...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5797#discussion_r178913403 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -258,6 +265,37 @@ private static String createMessageHtmlEntry(Class messageClass, Class emp return json; } + /** +* Create character escapes for HTML when generating JSON request/response string. +*/ + private static class HTMLCharacterEscapes extends CharacterEscapes { --- End diff -- good point. added in comments to illustrate the necessity. ---
[jira] [Commented] (FLINK-9104) Re-generate REST API documentation for FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-9104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424376#comment-16424376 ] ASF GitHub Bot commented on FLINK-9104: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5797#discussion_r178913403 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -258,6 +265,37 @@ private static String createMessageHtmlEntry(Class messageClass, Class emp return json; } + /** +* Create character escapes for HTML when generating JSON request/response string. +*/ + private static class HTMLCharacterEscapes extends CharacterEscapes { --- End diff -- good point. added in comments to illustrate the necessity. > Re-generate REST API documentation for FLIP-6 > -- > > Key: FLINK-9104 > URL: https://issues.apache.org/jira/browse/FLINK-9104 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Rong Rong >Priority: Blocker > Labels: flip-6 > > The API documentation is missing for several handlers, e.g., > {{SavepointHandlers}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3117 @ShashwatRastogi-Reflektion I'm not entirely sure, but one thing you could try is to explicitly disable chaining (`StreamExecutionEnvironment#disableOperatorChaining`). This way the ID of each operator (that now runs as a separate task) should be logged / be visible in the UI. ---
[jira] [Commented] (FLINK-5480) User-provided hashes for operators
[ https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424315#comment-16424315 ] ASF GitHub Bot commented on FLINK-5480: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3117 @ShashwatRastogi-Reflektion I'm not entirely sure, but one thing you could try is to explicitly disable chaining (`StreamExecutionEnvironment#disableOperatorChaining`). This way the ID of each operator (that now runs as a separate task) should be logged / be visible in the UI. > User-provided hashes for operators > -- > > Key: FLINK-5480 > URL: https://issues.apache.org/jira/browse/FLINK-5480 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.2.0, 1.3.0 > > > We could allow users to provided (alternative) hashes for operators in a > StreamGraph. This can make migration between Flink versions easier, in case > the automatically produced hashes between versions are incompatible. For > example, users could just copy the old hashes from the web ui to their job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9010) NoResourceAvailableException with FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424252#comment-16424252 ] Piotr Nowojski commented on FLINK-9010: --- No, I didn't experience any issues. > NoResourceAvailableException with FLIP-6 > - > > Key: FLINK-9010 > URL: https://issues.apache.org/jira/browse/FLINK-9010 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) > with FLIP-6 mode and a checkpointing interval of 1000 and got the following > exception: > {code} > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000101 - Remaining pending container > requests: 302 > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000101 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,155 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml > 2018-03-16 03:41:20,165 INFO org.apache.flink.yarn.YarnResourceManager > - Prepared local resource for modified yaml: resource { scheme: > "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: > "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml" > } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION > 2018-03-16 03:41:20,168 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2018-03-16 03:41:20,168 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers with command: $JAVA_HOME/bin/java > -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m > -Dlog.file=/taskmanager.log > -Dlogback.configurationFile=file:./logback.xml > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /taskmanager.out 2> /taskmanager.err > 2018-03-16 03:41:20,176 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000102 - Remaining pending container > requests: 301 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000102 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,181 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/ha
[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes
Github user ShashwatRastogi-Reflektion commented on the issue: https://github.com/apache/flink/pull/3117 @zentol I am using flink 1.3.2. Normally, I don't use `--allowNonRestoredState` and I get an error while restarting the job. After using this `allowNonRestoredState` the job starts up fine but there is a data loss because of lost state, which is what I want to avoid. I think, the problem with my implementation is that I am wrongly assigning some ids to the operators. Is there sure-shot way of finding out uid or uid-hash generated by flink for each operator(with/without chaining) ? ---
[jira] [Commented] (FLINK-5480) User-provided hashes for operators
[ https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424244#comment-16424244 ] ASF GitHub Bot commented on FLINK-5480: --- Github user ShashwatRastogi-Reflektion commented on the issue: https://github.com/apache/flink/pull/3117 @zentol I am using flink 1.3.2. Normally, I don't use `--allowNonRestoredState` and I get an error while restarting the job. After using this `allowNonRestoredState` the job starts up fine but there is a data loss because of lost state, which is what I want to avoid. I think, the problem with my implementation is that I am wrongly assigning some ids to the operators. Is there sure-shot way of finding out uid or uid-hash generated by flink for each operator(with/without chaining) ? > User-provided hashes for operators > -- > > Key: FLINK-5480 > URL: https://issues.apache.org/jira/browse/FLINK-5480 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.2.0, 1.3.0 > > > We could allow users to provided (alternative) hashes for operators in a > StreamGraph. This can make migration between Flink versions easier, in case > the automatically produced hashes between versions are incompatible. For > example, users could just copy the old hashes from the web ui to their job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424237#comment-16424237 ] Ted Yu commented on FLINK-9087: --- [~NicoK]: Since you were recently working on related code, mind sharing your thought ? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5480) User-provided hashes for operators
[ https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424206#comment-16424206 ] ASF GitHub Bot commented on FLINK-5480: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3117 @ShashwatRastogi-Reflektion It's a bit odd that there are different ID's shown, I will have to look into that. It may be that one display accounts for the uid while the other one doesn't; in any case one of them should be the task ID. I should've asked earlier; which version of Flink are you using? If it is 1.2 or below, then I don't know at the moment what the problem could be. If it is 1.3 or above, the steps I mentioned have to be done for each operator, and not task. That said, I'm not sure if we actually expose the id of each operator anywhere in 1.3 in a nice way... You may have to resort to trial&error; if a state can't be assigned to an operator you should get an exception containing the ID of the state (unless you explicitly allow [non-restored state](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#allowing-non-restored-state), which you can then use as the uid hash. If you are already on 1.4 you can figure them out with the metric system (for example with the JMXReporter). > User-provided hashes for operators > -- > > Key: FLINK-5480 > URL: https://issues.apache.org/jira/browse/FLINK-5480 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.2.0, 1.3.0 > > > We could allow users to provided (alternative) hashes for operators in a > StreamGraph. This can make migration between Flink versions easier, in case > the automatically produced hashes between versions are incompatible. For > example, users could just copy the old hashes from the web ui to their job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3117 @ShashwatRastogi-Reflektion It's a bit odd that there are different ID's shown, I will have to look into that. It may be that one display accounts for the uid while the other one doesn't; in any case one of them should be the task ID. I should've asked earlier; which version of Flink are you using? If it is 1.2 or below, then I don't know at the moment what the problem could be. If it is 1.3 or above, the steps I mentioned have to be done for each operator, and not task. That said, I'm not sure if we actually expose the id of each operator anywhere in 1.3 in a nice way... You may have to resort to trial&error; if a state can't be assigned to an operator you should get an exception containing the ID of the state (unless you explicitly allow [non-restored state](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#allowing-non-restored-state), which you can then use as the uid hash. If you are already on 1.4 you can figure them out with the metric system (for example with the JMXReporter). ---
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424169#comment-16424169 ] Triones Deng commented on FLINK-9087: - [~yuzhih...@gmail.com] when i run the test. i found that in {code:java} public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException { try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { ... // retain the buffer so that it can be recycled by each channel of targetPartition targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); } ... return eventBufferConsumer; } } {code} which call targetPartition.addBufferConsumer() , here make use of the copy of the eventBufferConsumer, so, all the BufferConsumer produced by copy share the same buffer.and this will call AbstractReferenceCountedByteBuf.retain() , here AbstractReferenceCountedByteBuf.java is netty class all the targetPartition like AbstractCollectingResultPartitionWriter and ResultPartition will call close method of BufferConsumer, at last the buffer in eventBufferConsumer will be released. ResultPartition will call notifyDataAvailable which is async to consume the data. so here we'd better to let the return value alone, what do you think. or just change the method signature to void ? notice that in FLINK-7315, plan to use flink's buffers in netty, one sub task FLINK-7518 which have a solution. i am a new here, any suggestions? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9126) Ability for CassandraInputFormat to output data into a Custom Cassandra Annotated Pojo
Jeffrey Carter created FLINK-9126: - Summary: Ability for CassandraInputFormat to output data into a Custom Cassandra Annotated Pojo Key: FLINK-9126 URL: https://issues.apache.org/jira/browse/FLINK-9126 Project: Flink Issue Type: New Feature Components: DataSet API Affects Versions: 1.4.2 Reporter: Jeffrey Carter Fix For: 1.5.0 Attachments: CassandraPojoInputFormatText.rtf *First time proposing new update so apologies if I missed anything* Currently the DataSet API only has the ability to output data received from Cassandra as a source in as a Tuple. This would be allow the data to be output as a custom POJO that the user has created that has been annotated using Datastax API. This would remove the need of very long Tuples to be created by the DataSet and then mapped to the custom POJO. The changes to the CassandraInputFormat object would be minimal, but would require importing the Datastax API into the class. Another option is to make a similar, but slightly different class called CassandraPojoInputFormat. I have already gotten code for this working in my own project, but want other thoughts as to the best way this should go about being implemented. //Example of its use in main CassandraPojoInputFormat cassandraInputFormat = new CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, CustomCassandraPojo.class); cassandraInputFormat.configure(null); cassandraInputFormat.open(null); DataSet outputTestSet = exEnv.createInput(cassandraInputFormat, TypeInformation.of(new TypeHint(){})); //The class that I currently have set up [^CassandraPojoInputFormatText.rtf] Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9076) Make credit-based floating buffers optional
[ https://issues.apache.org/jira/browse/FLINK-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424137#comment-16424137 ] Nico Kruber commented on FLINK-9076: The max can already be configured via {{taskmanager.network.memory.floating-buffers-per-gate}}. This is about lowering the min value to 1 or 0 as before. > Make credit-based floating buffers optional > --- > > Key: FLINK-9076 > URL: https://issues.apache.org/jira/browse/FLINK-9076 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > Currently, floating buffers (per gate) are always required in case > credit-based flow control is enabled. This, however, increases our minimum > number of required network buffers. > Instead, without changing too much, we could already work with a minimum of > one or zero floating buffers and set the max to the configured value. This > way, if there are not enough buffers, all {{LocalBufferPool}}s will at least > share the available ones. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9076) Make credit-based floating buffers optional
[ https://issues.apache.org/jira/browse/FLINK-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9076: --- Description: Currently, floating buffers (per gate) are always required in case credit-based flow control is enabled. This, however, increases our minimum number of required network buffers. Instead, without changing too much, we could already work with a minimum of one or zero floating buffers and set the max to the configured value. This way, if there are not enough buffers, all {{LocalBufferPool}}s will at least share the available ones. was: Currently, floating buffers (per gate) are always required in case credit-based flow control is enabled. This, however, increases our minimum number of required network buffers. Instead, without changing too much, we could already work with a minimum of one floating buffer and set the max to the configured value. This way, if there are not enough buffers, all {{LocalBufferPool}}s will at least share the available ones. > Make credit-based floating buffers optional > --- > > Key: FLINK-9076 > URL: https://issues.apache.org/jira/browse/FLINK-9076 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > Currently, floating buffers (per gate) are always required in case > credit-based flow control is enabled. This, however, increases our minimum > number of required network buffers. > Instead, without changing too much, we could already work with a minimum of > one or zero floating buffers and set the max to the configured value. This > way, if there are not enough buffers, all {{LocalBufferPool}}s will at least > share the available ones. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5480) User-provided hashes for operators
[ https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424128#comment-16424128 ] ASF GitHub Bot commented on FLINK-5480: --- Github user ShashwatRastogi-Reflektion commented on the issue: https://github.com/apache/flink/pull/3117 Hey @zentol Thank you for replying back. I was trying to do exactly the same thing, but I think I am messing something up that is why it isn't working in my case. In my logs, i get the task description like: `2018-04-03 12:24:45,876 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom Source -> Filter -> events-with-timestamp -> filtered-events-with-timestamp -> events-mapped-to-session (1/1) (5c69d4ad31a844978a740c1d24297b68) switched from CREATED to SCHEDULED.` Is the hash present in this log statement called uid hash? This hash is not the same which I get in the UI. ![image](https://user-images.githubusercontent.com/29359103/38256652-6a5e7d58-377c-11e8-8124-16bc2cd7e6e0.png) I have tried using both and both of them doesn't work. Also, my operators are chained together so I will get one uid-hash for the entire chain, right? And, I would be setting the same uid-hash for all operators in the chain? > User-provided hashes for operators > -- > > Key: FLINK-5480 > URL: https://issues.apache.org/jira/browse/FLINK-5480 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.2.0, 1.3.0 > > > We could allow users to provided (alternative) hashes for operators in a > StreamGraph. This can make migration between Flink versions easier, in case > the automatically produced hashes between versions are incompatible. For > example, users could just copy the old hashes from the web ui to their job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424125#comment-16424125 ] Nico Kruber commented on FLINK-8707: I still need to look at the detailed logs you two provided, but let me already add one note about {{lsof | wc -l}}: Whenever you spawn a child process (on Linux), the new process inherits the file descriptors of the parent and hence the duplication in this report. Basically, a single file descriptor is reported for every spawned thread. Using {{lsof -p }} will not show the task's file descriptors, as will {{lsof -K i}} for all PIDs. Therefore, whenever you want to count the overall number of descriptors for a machine, you should use the latter. > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box >Reporter: Alexander Gardner >Assignee: Piotr Nowojski >Priority: Critical > Fix For: 1.5.0 > > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes
Github user ShashwatRastogi-Reflektion commented on the issue: https://github.com/apache/flink/pull/3117 Hey @zentol Thank you for replying back. I was trying to do exactly the same thing, but I think I am messing something up that is why it isn't working in my case. In my logs, i get the task description like: `2018-04-03 12:24:45,876 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom Source -> Filter -> events-with-timestamp -> filtered-events-with-timestamp -> events-mapped-to-session (1/1) (5c69d4ad31a844978a740c1d24297b68) switched from CREATED to SCHEDULED.` Is the hash present in this log statement called uid hash? This hash is not the same which I get in the UI. ![image](https://user-images.githubusercontent.com/29359103/38256652-6a5e7d58-377c-11e8-8124-16bc2cd7e6e0.png) I have tried using both and both of them doesn't work. Also, my operators are chained together so I will get one uid-hash for the entire chain, right? And, I would be setting the same uid-hash for all operators in the chain? ---
[jira] [Closed] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI
[ https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-6567. Resolution: Fixed Fix Version/s: (was: 1.6.0) 1.5.0 Fixed via master: db366cd3d02a823f93185f29ca7ae93da9e2a04b 1.5.0: 515069e14f770ebfb86df27dc27b858ded51c6d5 > ExecutionGraphMetricsTest fails on Windows CI > - > > Key: FLINK-6567 > URL: https://issues.apache.org/jira/browse/FLINK-6567 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on > AppVeyor. It also very rarely failed for me locally. > The test fails at Line 235 if the RUNNING timestamp is equal to the > RESTARTING timestamp, which may happen when combining a fast test with a low > resolution clock. > A simple fix would be to increase the timestamp between RUNNING and > RESTARTING by adding a 50ms sleep timeout into the > {{TestingRestartStrategy#canRestart()}} method, as this one is called before > transitioning to the RESTARTING state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9094) AccumulatorLiveITCase unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9094. Resolution: Fixed Fix Version/s: 1.5.0 Fixed via master: 78c3d9b0c657bf06a712ce453edb02da13fa3acf 1.5.0: b6982c502196b5059b1bf576f620f45ce0e3aa72 > AccumulatorLiveITCase unstable on Travis > > > Key: FLINK-9094 > URL: https://issues.apache.org/jira/browse/FLINK-9094 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.5.0 > > > {{AccumulatorLiveITCase}} unstable on Travis. > https://api.travis-ci.org/v3/job/358509206/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI
[ https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424105#comment-16424105 ] ASF GitHub Bot commented on FLINK-6567: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5782 > ExecutionGraphMetricsTest fails on Windows CI > - > > Key: FLINK-6567 > URL: https://issues.apache.org/jira/browse/FLINK-6567 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.6.0 > > > The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on > AppVeyor. It also very rarely failed for me locally. > The test fails at Line 235 if the RUNNING timestamp is equal to the > RESTARTING timestamp, which may happen when combining a fast test with a low > resolution clock. > A simple fix would be to increase the timestamp between RUNNING and > RESTARTING by adding a 50ms sleep timeout into the > {{TestingRestartStrategy#canRestart()}} method, as this one is called before > transitioning to the RESTARTING state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9094) AccumulatorLiveITCase unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424106#comment-16424106 ] ASF GitHub Bot commented on FLINK-9094: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5771 > AccumulatorLiveITCase unstable on Travis > > > Key: FLINK-9094 > URL: https://issues.apache.org/jira/browse/FLINK-9094 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > > {{AccumulatorLiveITCase}} unstable on Travis. > https://api.travis-ci.org/v3/job/358509206/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5782: [FLINK-6567] [tests] Harden ExecutionGraphMetricsT...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5782 ---
[GitHub] flink pull request #5771: [FLINK-9094] [tests] Harden AccumulatorLiveITCase
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5771 ---
[jira] [Commented] (FLINK-9120) Task Manager Fault Tolerance issue
[ https://issues.apache.org/jira/browse/FLINK-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424091#comment-16424091 ] Sihua Zhou commented on FLINK-9120: --- Hi [~dhirajpraj] sorry for the delay reply, I was busy on something before, I think [~till.rohrmann]'s reply is the best explanation for my "TM doesn't unregister from JM properly in standalone model". > Task Manager Fault Tolerance issue > -- > > Key: FLINK-9120 > URL: https://issues.apache.org/jira/browse/FLINK-9120 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Critical > Attachments: flink-dhiraj.prajapati-client-ip-10-14-25-115.log, > flink-dhiraj.prajapati-client-ip-10-14-25-115.log, > flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, > flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, > flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log, > flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log > > > HI, > I have set up a flink 1.4 cluster with 1 job manager and two task managers. > The configs taskmanager.numberOfTaskSlots and parallelism.default were set > to 2 on each node. I submitted a job to this cluster and it runs fine. To > test fault tolerance, I killed one task manager. I was expecting the job to > run fine because one of the 2 task managers was still up and running. > However, the job failed. Am I missing something? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9076) Make credit-based floating buffers optional
[ https://issues.apache.org/jira/browse/FLINK-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424083#comment-16424083 ] Piotr Nowojski commented on FLINK-9076: --- [~NicoK], do you mean max should be configured by a user, or should it be a dynamically adjusted value? > Make credit-based floating buffers optional > --- > > Key: FLINK-9076 > URL: https://issues.apache.org/jira/browse/FLINK-9076 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > Currently, floating buffers (per gate) are always required in case > credit-based flow control is enabled. This, however, increases our minimum > number of required network buffers. > Instead, without changing too much, we could already work with a minimum of > one floating buffer and set the max to the configured value. This way, if > there are not enough buffers, all {{LocalBufferPool}}s will at least share > the available ones. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)
[ https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424072#comment-16424072 ] ASF GitHub Bot commented on FLINK-8910: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r178836145 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- What I find problematic about deleting the DFS files is that not all state is (yet) covered by local recovery and it is also a lot harder to debug the cause if there is an actual scheduling problem. With the current code, you can easily see which allocation was lost. REST API might be an option if it is somehow exposed there. > Introduce automated end-to-end test for local recovery (including sticky > scheduling) > > > Key: FLINK-8910 > URL: https://issues.apache.org/jira/browse/FLINK-8910 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > We should have an automated end-to-end test that can run nightly to check > that sticky allocation and local recovery work as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r178836145 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- What I find problematic about deleting the DFS files is that not all state is (yet) covered by local recovery and it is also a lot harder to debug the cause if there is an actual scheduling problem. With the current code, you can easily see which allocation was lost. REST API might be an option if it is somehow exposed there. ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424060#comment-16424060 ] ASF GitHub Bot commented on FLINK-8982: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5807 Thanks! I addressed the changes in the last two commits > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5807: [FLINK-8982][E2E Tests] Add test for known failure of que...
Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5807 Thanks! I addressed the changes in the last two commits ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424052#comment-16424052 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178828372 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR" EXIT_CODE=0 --- End diff -- I would recommend to move it to the nightly tests. Queryable state is not a core component and the normal builds are already timing out. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827723 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); + stuff = new ArrayList<>(); + stuff.add("1"); + stuff.add("2"); + stuff.add("3"); + label = email.getLabel(); + } + + public EmailId getEmailId() { + return emailId; + } + +// //public Instant getTimestamp() { +// return timestamp; +// } + + public List getStuff() { + return stuff; + } + + public Long getAsdf() { + return asdf; + } + + public LabelSurrogate getLabel() { + return label; + } + + public void setLabel(LabelSurrogate label) { + this.label = label; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EmailInformation that = (EmailInformation) o; + return Objects.equals(emailId, that.emailId) && +// Objects.equals(timestamp, that.timestamp) && --- End diff -- remove. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178828967 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Javadoc. + */ +public class QsBugPoc { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:///tmp/deleteme-rocksdb"); + env.setStateBackend(rocksDb); --- End diff -- The dir to checkpoint can be a parameter and here it should be a path in the `TEST_DIR` of the test itself. In addition, everything should be explicitly cleaned up, e.g. checkpoints, potential output/input data, etc. ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424053#comment-16424053 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827698 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); --- End diff -- remove. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178828372 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR" EXIT_CODE=0 --- End diff -- I would recommend to move it to the nightly tests. Queryable state is not a core component and the normal builds are already timing out. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827995 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +/** + * Javadoc. + */ +public class LabelSurrogate { + + private Type type; + private String foo; + + public LabelSurrogate(Type type, String foo) { + this.type = type; + this.foo = foo; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + + @Override + public String toString() { + return "LabelSurrogate{" + + "type=" + type + + ", foo='" + foo + '\'' + + '}'; + } + + /** +* Javadoc. +*/ --- End diff -- Same here. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827936 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +/** + * Javadoc. + */ --- End diff -- This is just a placeholder comment for checkstyle verification to pass. Please write a real comment. This holds also for other places. ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424048#comment-16424048 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827936 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +/** + * Javadoc. + */ --- End diff -- This is just a placeholder comment for checkstyle verification to pass. Please write a real comment. This holds also for other places. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178829164 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Javadoc. + */ +public class QsBugPoc { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:///tmp/deleteme-rocksdb"); + env.setStateBackend(rocksDb); --- End diff -- Also check for different backends, i.e. file and rocks. You can have a look to the `test_ha.sh`. ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424054#comment-16424054 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827995 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +/** + * Javadoc. + */ +public class LabelSurrogate { + + private Type type; + private String foo; + + public LabelSurrogate(Type type, String foo) { + this.type = type; + this.foo = foo; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + + @Override + public String toString() { + return "LabelSurrogate{" + + "type=" + type + + ", foo='" + foo + '\'' + + '}'; + } + + /** +* Javadoc. +*/ --- End diff -- Same here. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827415 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { --- End diff -- Remove commented methods. ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424045#comment-16424045 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827543 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; --- End diff -- same here (remove commented field). > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424049#comment-16424049 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827723 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); + stuff = new ArrayList<>(); + stuff.add("1"); + stuff.add("2"); + stuff.add("3"); + label = email.getLabel(); + } + + public EmailId getEmailId() { + return emailId; + } + +// //public Instant getTimestamp() { +// return timestamp; +// } + + public List getStuff() { + return stuff; + } + + public Long getAsdf() { + return asdf; + } + + public LabelSurrogate getLabel() { + return label; + } + + public void setLabel(LabelSurrogate label) { + this.label = label; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EmailInformation that = (EmailInformation) o; + return Objects.equals(emailId, that.emailId) && +// Objects.equals(timestamp, that.timestamp) && --- End diff -- remove. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424051#comment-16424051 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827678 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); + stuff = new ArrayList<>(); + stuff.add("1"); + stuff.add("2"); + stuff.add("3"); + label = email.getLabel(); + } + + public EmailId getEmailId() { + return emailId; + } + +// //public Instant getTimestamp() { --- End diff -- remove. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424050#comment-16424050 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178828967 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Javadoc. + */ +public class QsBugPoc { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:///tmp/deleteme-rocksdb"); + env.setStateBackend(rocksDb); --- End diff -- The dir to checkpoint can be a parameter and here it should be a path in the `TEST_DIR` of the test itself. In addition, everything should be explicitly cleaned up, e.g. checkpoints, potential output/input data, etc. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424046#comment-16424046 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827415 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { --- End diff -- Remove commented methods. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424047#comment-16424047 ] ASF GitHub Bot commented on FLINK-8982: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178829164 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Javadoc. + */ +public class QsBugPoc { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:///tmp/deleteme-rocksdb"); + env.setStateBackend(rocksDb); --- End diff -- Also check for different backends, i.e. file and rocks. You can have a look to the `test_ha.sh`. > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827698 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); --- End diff -- remove. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827543 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; --- End diff -- same here (remove commented field). ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827678 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); + stuff = new ArrayList<>(); + stuff.add("1"); + stuff.add("2"); + stuff.add("3"); + label = email.getLabel(); + } + + public EmailId getEmailId() { + return emailId; + } + +// //public Instant getTimestamp() { --- End diff -- remove. ---
[jira] [Commented] (FLINK-9078) End-to-end test: Add test that verifies that a specific classloading issue with avro is fixed
[ https://issues.apache.org/jira/browse/FLINK-9078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424042#comment-16424042 ] ASF GitHub Bot commented on FLINK-9078: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5762#discussion_r178828282 --- Diff: flink-end-to-end-tests/flink-avro-classloading-test/src/main/avro/user.avsc --- @@ -0,0 +1,9 @@ +{ + "type": "record", + "namespace": "org.apache.flink.tests.streaming", + "name": "User", + "fields": [ + { "name": "first", "type": "string" }, + { "name": "last", "type": "string" } --- End diff -- I added some more complex examples and verified that the initial test case still fails on 1.3.X and works on 1.4.X as expected > End-to-end test: Add test that verifies that a specific classloading issue > with avro is fixed > - > > Key: FLINK-9078 > URL: https://issues.apache.org/jira/browse/FLINK-9078 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5762: [FLINK-9078][E2ETests] Add test that verifies corr...
Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5762#discussion_r178828282 --- Diff: flink-end-to-end-tests/flink-avro-classloading-test/src/main/avro/user.avsc --- @@ -0,0 +1,9 @@ +{ + "type": "record", + "namespace": "org.apache.flink.tests.streaming", + "name": "User", + "fields": [ + { "name": "first", "type": "string" }, + { "name": "last", "type": "string" } --- End diff -- I added some more complex examples and verified that the initial test case still fails on 1.3.X and works on 1.4.X as expected ---
[jira] [Commented] (FLINK-9120) Task Manager Fault Tolerance issue
[ https://issues.apache.org/jira/browse/FLINK-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424035#comment-16424035 ] Till Rohrmann commented on FLINK-9120: -- Hi [~dhirajpraj], the logs show that the JM did not yet recognize the killed TM as killed when trying to restart. Thus, it tries to re-deploy tasks to this machine. When it finally realizes that the TM has been killed, it fails the jobs. At this point, it would try to recover the job, however, since the number of restart attempts are depleted (set to 3), it will fail the job terminally. Please try to raise the number of retry attempts. This should hopefully fix your problem. > Task Manager Fault Tolerance issue > -- > > Key: FLINK-9120 > URL: https://issues.apache.org/jira/browse/FLINK-9120 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Critical > Attachments: flink-dhiraj.prajapati-client-ip-10-14-25-115.log, > flink-dhiraj.prajapati-client-ip-10-14-25-115.log, > flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, > flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, > flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log, > flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log > > > HI, > I have set up a flink 1.4 cluster with 1 job manager and two task managers. > The configs taskmanager.numberOfTaskSlots and parallelism.default were set > to 2 on each node. I submitted a job to this cluster and it runs fine. To > test fault tolerance, I killed one task manager. I was expecting the job to > run fine because one of the 2 task managers was still up and running. > However, the job failed. Am I missing something? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8835) Fix TaskManager config keys
[ https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424030#comment-16424030 ] ASF GitHub Bot commented on FLINK-8835: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5808 [FLINK-8835] [taskmanager] Fix TaskManager config keys ## What is the purpose of the change Fix TaskManager config keys to make it easier for users. ## Brief change log Change the original key and variable name to a easier way. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-8835-taskmanager-config-key Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5808.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5808 commit a4df41079edd228ea346227f2f04b41d694c0420 Author: zhangminglei Date: 2018-04-03T13:23:19Z [FLINK-8835] [taskmanager] Fix TaskManager config keys > Fix TaskManager config keys > --- > > Key: FLINK-8835 > URL: https://issues.apache.org/jira/browse/FLINK-8835 > Project: Flink > Issue Type: Bug > Components: TaskManager >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Blocker > Labels: easy-fix > Fix For: 1.5.0 > > > Many new config keys in the TaskManager don't follow the proper naming > scheme. We need to clear those up before the release. I would also suggest to > keep the key names short, because that makes it easier for users. > When doing this cleanup pass over the config keys, I would suggest to also > make some of the existing keys more hierarchical harmonize them with the > common scheme in Flink. > h1. New Keys > * {{taskmanager.network.credit-based-flow-control.enabled}} to > {{taskmanager.network.credit-model}}. > h1. Existing Keys > * {{taskmanager.debug.memory.startLogThread}} => > {{taskmanager.debug.memory.log}} > * {{taskmanager.debug.memory.logIntervalMs}} => > {{taskmanager.debug.memory.log-interval}} > * {{taskmanager.initial-registration-pause}} => > {{taskmanager.registration.initial-backoff}} > * {{taskmanager.max-registration-pause}} => > {{taskmanager.registration.max-backoff}} > * {{taskmanager.refused-registration-pause}} > {{taskmanager.registration.refused-backoff}} > * {{taskmanager.maxRegistrationDuration}} ==> * > {{taskmanager.registration.timeout}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5808: [FLINK-8835] [taskmanager] Fix TaskManager config ...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5808 [FLINK-8835] [taskmanager] Fix TaskManager config keys ## What is the purpose of the change Fix TaskManager config keys to make it easier for users. ## Brief change log Change the original key and variable name to a easier way. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-8835-taskmanager-config-key Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5808.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5808 commit a4df41079edd228ea346227f2f04b41d694c0420 Author: zhangminglei Date: 2018-04-03T13:23:19Z [FLINK-8835] [taskmanager] Fix TaskManager config keys ---
[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)
[ https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424024#comment-16424024 ] ASF GitHub Bot commented on FLINK-8910: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r178824035 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- Hmm, what about testing it a bit more indirectly, by removing the checkpoint files on the DFS. Then you can only recover if you recover locally. Or by querying the REST interface? We might have to add the information to the `VertexTaskDetail`. Otherwise, we start mixing concerns and expose unnecessary information to the user via the `AbstractRuntimeUDFContext`. Moreover, not every function has access to this information right now. For example the `RichAsyncFunctionRuntimeContext` does not expose it. > Introduce automated end-to-end test for local recovery (including sticky > scheduling) > > > Key: FLINK-8910 > URL: https://issues.apache.org/jira/browse/FLINK-8910 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > We should have an automated end-to-end test that can run nightly to check > that sticky allocation and local recovery work as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r178824035 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- Hmm, what about testing it a bit more indirectly, by removing the checkpoint files on the DFS. Then you can only recover if you recover locally. Or by querying the REST interface? We might have to add the information to the `VertexTaskDetail`. Otherwise, we start mixing concerns and expose unnecessary information to the user via the `AbstractRuntimeUDFContext`. Moreover, not every function has access to this information right now. For example the `RichAsyncFunctionRuntimeContext` does not expose it. ---
[jira] [Comment Edited] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006 ] Alon Galant edited comment on FLINK-8707 at 4/3/18 1:24 PM: Hey, I have a similar problem, this is my thread on Flink ML: [https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E] I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic into Flink, so I need to have around the same number of files that the data is being written to at once. I attached 3 files: lsof.txt - the result for $lsof > lsof.txt lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink proccess) ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt /tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it works on, before uploading them to s3. I ran these commands on my task manager (I run 2 tasks manager and a total of 8 task slots, 4 on each tm) Here are some lsof | wc -l results: {code:java} less lsof.txt | wc -l --> 44228 less lsofp.txt | wc -l --> 403 less ll.txt | wc -l --> 64 less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an example of an output file, I think all of them gives the same result) {code} >From ll.txt we can see that there are 4 files for each customerId (for each >partition), so I guess that every task slot opens its own file. For each 'output' file there are 108 FDs. My problem is that I want to be able to handle around 500 customers, and I want to still be able to use high concurrency. When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on lsof This is the exception I got when I've enabled all of the Ids: {code:java} java.lang.RuntimeException: Error parsing YAML configuration. at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml (Too many open files) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:146) at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144) ... 14 more {code} I increased the ulimit to 500,000 but it's still not enough, and I guess this is too much anyhow. I'd love to get some help :) Thanks, Alon [^ll.txt] [^lsof.txt] [^lsofp.txt] was (Author: galantaa): Hey, I have a similar problem, this is my thread on Flink ML: [https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E] I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic into Flink, so I need to have around the same number of files that the data is being written to at once. I attached 3 files: lsof.txt - the result for $lsof > lsof.txt lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink proccess) ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt /tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it works on, before uploading them to s3. I ran these commands on my task manager (I run 2 tasks manager and a total of 8 task slots, 4 on each tm) Here are some lsof | w
[jira] [Comment Edited] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006 ] Alon Galant edited comment on FLINK-8707 at 4/3/18 1:22 PM: Hey, I have a similar problem, this is my thread on Flink ML: [https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E] I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic into Flink, so I need to have around the same number of files that the data is being written to at once. I attached 3 files: lsof.txt - the result for $lsof > lsof.txt lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink proccess) ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt /tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it works on, before uploading them to s3. I ran these commands on my task manager (I run 2 tasks manager and a total of 8 task slots, 4 on each tm) Here are some lsof | wc -l results: {code:java} less lsof.txt | wc -l --> 44228 less lsofp.txt | wc -l --> 403 less ll.txt | wc -l --> 64 less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an example of an output file, I think all of them gives the same result) {code} >From ll.txt we can see that there are 4 files for each customerId (for each >partition), so I guess that every task slot opens its own file. For each 'output' file there are 108 FDs. My problem is that I want to be able to handle around 500 customers, and I want to still be able to use high concurrency. When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on lsof This is the exception I got when I've enabled all of the Ids: {code:java} java.lang.RuntimeException: Error parsing YAML configuration. at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml (Too many open files) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:146) at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144) ... 14 more {code} I increased the ulimit to 500,000 but it's still not enough, and I guess this is too much anyhow. I'd love to get some help! Thanks, Alon [^ll.txt] [^lsof.txt] [^lsofp.txt] was (Author: galantaa): Hey, I have a similar problem, this is my thread on Flink ML: [https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E] I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic into Flink, so I need to have around the same number of files that the data is being written to at once. I attached 3 files: lsof.txt - the result for $lsof > lsof.txt lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink proccess) ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt /tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it works on, before uploading them to s3. I ran these commands on my task manager (I run 2 tasks manager and a total of 8 task slots, 4 on each tm) Here are some lsof | wc
[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006 ] Alon Galant commented on FLINK-8707: Hey, I have a similar problem, this is my thread on Flink ML: [https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E] I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic into Flink, so I need to have around the same number of files that the data is being written to at once. I attached 3 files: lsof.txt - the result for $lsof > lsof.txt lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink proccess) ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt /tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it works on, before uploading them to s3. I ran these commands on my task manager (I run 2 tasks manager and a total of 8 task slots, 4 on each tm) Here are some lsof | wc -l results: {code:java} // code placeholder less lsof.txt | wc -l --> 44228 less lsofp.txt | wc -l --> 403 less ll.txt | wc -l --> 64 less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an example of an output file, I think all of them gives the same result) {code} >From ll.txt we can see that there are 4 files for each customerId (for each >partition), so I guess that every task slot opens its own file. For each 'output' file there are 108 FDs. My problem is that I want to be able to handle around 500 customers, and I want to still be able to use high concurrency. When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on lsof This is the exception I got when I've enabled all of the Ids: {code:java} // code placeholder java.lang.RuntimeException: Error parsing YAML configuration. at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml (Too many open files) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:146) at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144) ... 14 more {code} I increased the ulimit to 500,000 but it's still not enough, and I guess this is too much anyhow. I'd love to get some help! Thanks, Alon [^ll.txt] [^lsof.txt] [^lsofp.txt] > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints
[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)
[ https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424002#comment-16424002 ] ASF GitHub Bot commented on FLINK-8910: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r178819991 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- Previously, I had a "hacky" version that extracted the allocation id from a to string method of the state store but I think stephan was more in favour of exposing this information through the context if needed. I cannot see how we can test if we have the same allocation id without having some way to access it. > Introduce automated end-to-end test for local recovery (including sticky > scheduling) > > > Key: FLINK-8910 > URL: https://issues.apache.org/jira/browse/FLINK-8910 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > We should have an automated end-to-end test that can run nightly to check > that sticky allocation and local recovery work as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r178819991 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- Previously, I had a "hacky" version that extracted the allocation id from a to string method of the state store but I think stephan was more in favour of exposing this information through the context if needed. I cannot see how we can test if we have the same allocation id without having some way to access it. ---
[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alon Galant updated FLINK-8707: --- Attachment: lsof.txt > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box >Reporter: Alexander Gardner >Assignee: Piotr Nowojski >Priority: Critical > Fix For: 1.5.0 > > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alon Galant updated FLINK-8707: --- Attachment: lsofp.txt lsof.txt ll.txt > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box >Reporter: Alexander Gardner >Assignee: Piotr Nowojski >Priority: Critical > Fix For: 1.5.0 > > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alon Galant updated FLINK-8707: --- Attachment: lsofp.txt > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box >Reporter: Alexander Gardner >Assignee: Piotr Nowojski >Priority: Critical > Fix For: 1.5.0 > > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager
[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alon Galant updated FLINK-8707: --- Attachment: ll.txt > Excessive amount of files opened by flink task manager > -- > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box >Reporter: Alexander Gardner >Assignee: Piotr Nowojski >Priority: Critical > Fix For: 1.5.0 > > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5480) User-provided hashes for operators
[ https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423983#comment-16423983 ] ASF GitHub Bot commented on FLINK-5480: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3117 @ShashwatRastogi-Reflektion 1. For each task to load from the savepoint S1: * Determine the existing hash by searching for the task ID in the logs/UI * Set the uidHash for that task to that value via `SingleOutputStreamOperator#setUidHash` 2. For each task: * Set the uid to whatever value you wish in the future to use via `SingleOutputStreamOperator#setUid` 3. Resume the job from the savepoint S1. 4. Create a new savepoint S2, and remove all calls to `setUidHash` 5. Resume the job from the savepoint S2. > User-provided hashes for operators > -- > > Key: FLINK-5480 > URL: https://issues.apache.org/jira/browse/FLINK-5480 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.2.0, 1.3.0 > > > We could allow users to provided (alternative) hashes for operators in a > StreamGraph. This can make migration between Flink versions easier, in case > the automatically produced hashes between versions are incompatible. For > example, users could just copy the old hashes from the web ui to their job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3117 @ShashwatRastogi-Reflektion 1. For each task to load from the savepoint S1: * Determine the existing hash by searching for the task ID in the logs/UI * Set the uidHash for that task to that value via `SingleOutputStreamOperator#setUidHash` 2. For each task: * Set the uid to whatever value you wish in the future to use via `SingleOutputStreamOperator#setUid` 3. Resume the job from the savepoint S1. 4. Create a new savepoint S2, and remove all calls to `setUidHash` 5. Resume the job from the savepoint S2. ---
[jira] [Commented] (FLINK-8966) Port AvroExternalJarProgramITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423978#comment-16423978 ] ASF GitHub Bot commented on FLINK-8966: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5766 @tillrohrmann Could you take another look? I moved the blob upload into a separate method, > Port AvroExternalJarProgramITCase to flip6 > -- > > Key: FLINK-8966 > URL: https://issues.apache.org/jira/browse/FLINK-8966 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5766: [FLINK-8966][tests] Port AvroExternalJarProgramITCase to ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5766 @tillrohrmann Could you take another look? I moved the blob upload into a separate method, ---
[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423973#comment-16423973 ] ASF GitHub Bot commented on FLINK-8982: --- GitHub user florianschmidt1994 opened a pull request: https://github.com/apache/flink/pull/5807 [FLINK-8982][E2E Tests] Add test for known failure of queryable state ## What is the purpose of the change Add an end-to-end test to verify that the changes that @kl0u introduced in https://github.com/apache/flink/pull/5691 fix a known issue with concurrent access to queryable state, by verifying that access to queryable state works as expected. ## Brief change log - Add flink app with queryable state the continuously updates mapstate - Add queryable state client that periodically queries map state - Add end-to-end test that runs client against app and verifies that no unexpected exceptions occur - Integrate end-to-end test in testsuite ## Verifying this change This change added tests and can be verified as follows: - Run `./run-pre-commit-tests.sh` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/florianschmidt1994/flink end-to-end-tests-for-queryable-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5807 commit a94b06728c25df9a5c71a484f49cfabb36eb1460 Author: Florian Schmidt Date: 2018-03-13T13:13:08Z [FLINK-8982][E2E Tests] Add test for known failure of queryable state > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
GitHub user florianschmidt1994 opened a pull request: https://github.com/apache/flink/pull/5807 [FLINK-8982][E2E Tests] Add test for known failure of queryable state ## What is the purpose of the change Add an end-to-end test to verify that the changes that @kl0u introduced in https://github.com/apache/flink/pull/5691 fix a known issue with concurrent access to queryable state, by verifying that access to queryable state works as expected. ## Brief change log - Add flink app with queryable state the continuously updates mapstate - Add queryable state client that periodically queries map state - Add end-to-end test that runs client against app and verifies that no unexpected exceptions occur - Integrate end-to-end test in testsuite ## Verifying this change This change added tests and can be verified as follows: - Run `./run-pre-commit-tests.sh` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/florianschmidt1994/flink end-to-end-tests-for-queryable-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5807 commit a94b06728c25df9a5c71a484f49cfabb36eb1460 Author: Florian Schmidt Date: 2018-03-13T13:13:08Z [FLINK-8982][E2E Tests] Add test for known failure of queryable state ---
[jira] [Commented] (FLINK-9094) AccumulatorLiveITCase unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423961#comment-16423961 ] ASF GitHub Bot commented on FLINK-9094: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5771 Thanks for the review @zentol. Merging this PR. > AccumulatorLiveITCase unstable on Travis > > > Key: FLINK-9094 > URL: https://issues.apache.org/jira/browse/FLINK-9094 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > > {{AccumulatorLiveITCase}} unstable on Travis. > https://api.travis-ci.org/v3/job/358509206/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI
[ https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423962#comment-16423962 ] ASF GitHub Bot commented on FLINK-6567: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5782 Thanks for the review @zentol. Merging this PR. > ExecutionGraphMetricsTest fails on Windows CI > - > > Key: FLINK-6567 > URL: https://issues.apache.org/jira/browse/FLINK-6567 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.6.0 > > > The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on > AppVeyor. It also very rarely failed for me locally. > The test fails at Line 235 if the RUNNING timestamp is equal to the > RESTARTING timestamp, which may happen when combining a fast test with a low > resolution clock. > A simple fix would be to increase the timestamp between RUNNING and > RESTARTING by adding a 50ms sleep timeout into the > {{TestingRestartStrategy#canRestart()}} method, as this one is called before > transitioning to the RESTARTING state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5782: [FLINK-6567] [tests] Harden ExecutionGraphMetricsTest
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5782 Thanks for the review @zentol. Merging this PR. ---
[GitHub] flink issue #5771: [FLINK-9094] [tests] Harden AccumulatorLiveITCase
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5771 Thanks for the review @zentol. Merging this PR. ---
[jira] [Updated] (FLINK-8835) Fix TaskManager config keys
[ https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-8835: Description: Many new config keys in the TaskManager don't follow the proper naming scheme. We need to clear those up before the release. I would also suggest to keep the key names short, because that makes it easier for users. When doing this cleanup pass over the config keys, I would suggest to also make some of the existing keys more hierarchical harmonize them with the common scheme in Flink. h1. New Keys * {{taskmanager.network.credit-based-flow-control.enabled}} to {{taskmanager.network.credit-model}}. h1. Existing Keys * {{taskmanager.debug.memory.startLogThread}} => {{taskmanager.debug.memory.log}} * {{taskmanager.debug.memory.logIntervalMs}} => {{taskmanager.debug.memory.log-interval}} * {{taskmanager.initial-registration-pause}} => {{taskmanager.registration.initial-backoff}} * {{taskmanager.max-registration-pause}} => {{taskmanager.registration.max-backoff}} * {{taskmanager.refused-registration-pause}} {{taskmanager.registration.refused-backoff}} * {{taskmanager.maxRegistrationDuration}} ==> * {{taskmanager.registration.timeout}} was: Many new config keys in the TaskManager don't follow the proper naming scheme. We need to clear those up before the release. I would also suggest to keep the key names short, because that makes it easier for users. When doing this cleanup pass over the config keys, I would suggest to also make some of the existing keys more hierarchical harmonize them with the common scheme in Flink. h1. New Keys * {{taskmanager.network.credit-based-flow-control.enabled}} to {{taskmanager.network.credit-model}}. * {{taskmanager.exactly-once.blocking.data.enabled}} to {{task.checkpoint.alignment.blocking}} (we already have {{task.checkpoint.alignment.max-size}}) h1. Existing Keys * {{taskmanager.debug.memory.startLogThread}} => {{taskmanager.debug.memory.log}} * {{taskmanager.debug.memory.logIntervalMs}} => {{taskmanager.debug.memory.log-interval}} * {{taskmanager.initial-registration-pause}} => {{taskmanager.registration.initial-backoff}} * {{taskmanager.max-registration-pause}} => {{taskmanager.registration.max-backoff}} * {{taskmanager.refused-registration-pause}} {{taskmanager.registration.refused-backoff}} * {{taskmanager.maxRegistrationDuration}} ==> * {{taskmanager.registration.timeout}} > Fix TaskManager config keys > --- > > Key: FLINK-8835 > URL: https://issues.apache.org/jira/browse/FLINK-8835 > Project: Flink > Issue Type: Bug > Components: TaskManager >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Blocker > Labels: easy-fix > Fix For: 1.5.0 > > > Many new config keys in the TaskManager don't follow the proper naming > scheme. We need to clear those up before the release. I would also suggest to > keep the key names short, because that makes it easier for users. > When doing this cleanup pass over the config keys, I would suggest to also > make some of the existing keys more hierarchical harmonize them with the > common scheme in Flink. > h1. New Keys > * {{taskmanager.network.credit-based-flow-control.enabled}} to > {{taskmanager.network.credit-model}}. > h1. Existing Keys > * {{taskmanager.debug.memory.startLogThread}} => > {{taskmanager.debug.memory.log}} > * {{taskmanager.debug.memory.logIntervalMs}} => > {{taskmanager.debug.memory.log-interval}} > * {{taskmanager.initial-registration-pause}} => > {{taskmanager.registration.initial-backoff}} > * {{taskmanager.max-registration-pause}} => > {{taskmanager.registration.max-backoff}} > * {{taskmanager.refused-registration-pause}} > {{taskmanager.registration.refused-backoff}} > * {{taskmanager.maxRegistrationDuration}} ==> * > {{taskmanager.registration.timeout}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)