[jira] [Commented] (FLINK-8966) Port AvroExternalJarProgramITCase to flip6

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425088#comment-16425088
 ] 

ASF GitHub Bot commented on FLINK-8966:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5766
  
merging.


> Port AvroExternalJarProgramITCase to flip6
> --
>
> Key: FLINK-8966
> URL: https://issues.apache.org/jira/browse/FLINK-8966
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5766: [FLINK-8966][tests] Port AvroExternalJarProgramITCase to ...

2018-04-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5766
  
merging.


---


[jira] [Assigned] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-04-03 Thread Jiayi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi reassigned FLINK-6924:


Assignee: Jiayi  (was: zjuwangg)

> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Jiayi
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8953) Resolve unresolved field references in FieldComputer expressions

2018-04-03 Thread Renjie Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu reassigned FLINK-8953:
-

Assignee: Renjie Liu

> Resolve unresolved field references in FieldComputer expressions
> 
>
> Key: FLINK-8953
> URL: https://issues.apache.org/jira/browse/FLINK-8953
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>
> When implementing the {{FieldComputer.getExpression}} method, it is not 
> possible to use API classes but only internal expression case classes.
> It would be great to also define timestamp extractors like:
> {code}
>   def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression 
> = {
> // 'x.cast(Types.LONG)
> // ExpressionParser.parseExpression("x.cast(LONG)")
>   }
> {code}
> An even better solution would be to provide different `getExpression()` 
> methods that an implementor can override. The general goal should be to 
> define this as natural as possible. In the future we should also support SQL:
> {code}
>   def getJavaExpression(fieldAccesses: Array[ResolvedFieldReference]): String 
> = {
> "x.cast(LONG)"
>   }
>   def getSQLExpression(fieldAccesses: Array[ResolvedFieldReference]): String 
> = {
> "CAST(x AS LONG)"
>   }
> {code}
> The final design is still up for discussion. These are just ideas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9088) Upgrade Nifi connector dependency to 1.6.0

2018-04-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9088:
--
Component/s: Streaming Connectors

> Upgrade Nifi connector dependency to 1.6.0
> --
>
> Key: FLINK-9088
> URL: https://issues.apache.org/jira/browse/FLINK-9088
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
>
> Currently dependency of Nifi is 0.6.1
> We should upgrade to 1.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8971) Create general purpose testing job

2018-04-03 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421148#comment-16421148
 ] 

mingleizhang edited comment on FLINK-8971 at 4/4/18 1:49 AM:
-

Hi, [~till.rohrmann] The link you pushed here page not found [heavily 
misbehaved 
job|https://github.com/rmetzger/flink-testing-jobs/blob/flink-1.4/basic-jobs/src/main/java/com/dataartisans/heavymisbehaved/HeavyMisbehavedJob.java]
 and [state machine 
job|https://github.com/dataArtisans/flink-testing-jobs/tree/master/streaming-state-machine]
 Could you please check that again ?


was (Author: mingleizhang):
Page not found [heavily misbehaved 
job|https://github.com/rmetzger/flink-testing-jobs/blob/flink-1.4/basic-jobs/src/main/java/com/dataartisans/heavymisbehaved/HeavyMisbehavedJob.java]
 and state machine job also.

> Create general purpose testing job
> --
>
> Key: FLINK-8971
> URL: https://issues.apache.org/jira/browse/FLINK-8971
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.5.0
>
>
> In order to write better end-to-end tests we need a general purpose testing 
> job which comprises as many Flink aspects as possible. These include 
> different types for records and state, user defined components, state types 
> and operators.
> The job should allow to activate a certain misbehavior, such as slowing 
> certain paths down or throwing exceptions to simulate failures.
> The job should come with a data generator which generates input data such 
> that the job can verify it's own behavior. This includes the state as well as 
> the input/output records.
> We already have the [heavily misbehaved 
> job|https://github.com/rmetzger/flink-testing-jobs/blob/flink-1.4/basic-jobs/src/main/java/com/dataartisans/heavymisbehaved/HeavyMisbehavedJob.java]
>  which simulates some misbehavior. There is also the [state machine 
> job|https://github.com/dataArtisans/flink-testing-jobs/tree/master/streaming-state-machine]
>  which can verify itself for invalid state changes which indicate data loss. 
> We should incorporate their characteristics into the new general purpose job.
> Additionally, the general purpose job should contain the following aspects:
> * Job containing a sliding window aggregation
> * At least one generic Kryo type
> * At least one generic Avro type
> * At least one Avro specific record type
> * At least one input type for which we register a Kryo serializer
> * At least one input type for which we provide a user defined serializer
> * At least one state type for which we provide a user defined serializer
> * At least one state type which uses the AvroSerializer
> * Include an operator with ValueState
> * Value state changes should be verified (e.g. predictable series of values)
> * Include an operator with operator state
> * Include an operator with broadcast state
> * Broadcast state changes should be verified (e.g. predictable series of 
> values)
> * Include union state
> * User defined watermark assigner
> The job should be made available in the {{flink-end-to-end-tests}} module.
> This issue is intended to serve as an umbrella issue for developing and 
> extending this job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-04-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9091:
--
Component/s: Build System

> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8933) Avoid calling Class#newInstance

2018-04-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8933:
--
Description: 
Class#newInstance is deprecated starting in Java 9 - 
https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw 
undeclared checked exceptions.


The suggested replacement is getDeclaredConstructor().newInstance(), which 
wraps the checked exceptions in InvocationException.

  was:
Class#newInstance is deprecated starting in Java 9 - 
https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw 
undeclared checked exceptions.

The suggested replacement is getDeclaredConstructor().newInstance(), which 
wraps the checked exceptions in InvocationException.


> Avoid calling Class#newInstance
> ---
>
> Key: FLINK-8933
> URL: https://issues.apache.org/jira/browse/FLINK-8933
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Class#newInstance is deprecated starting in Java 9 - 
> https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw 
> undeclared checked exceptions.
> The suggested replacement is getDeclaredConstructor().newInstance(), which 
> wraps the checked exceptions in InvocationException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9127) Filesystem State Backend logged incorrectly

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424831#comment-16424831
 ] 

ASF GitHub Bot commented on FLINK-9127:
---

GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/5810

[FLINK-9127] [Core] Filesystem State Backend logged incorrectly

## What is the purpose of the change

This pull-request fixes a message logged when during startup of the Flink 
Task-Manager and Job-Manager when a filesystem backend is in use.

The old incorrect behavior produced a log message indicating that a 
heap-memory backend is use in.

## Brief change log

  - Fix the log message produced by the StateBackendLoader class in Core.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink FLINK-9127

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5810


commit 9e3774bb2a0ceedacde584a47aa9146bcbf47a3a
Author: Scott Kidder 
Date:   2018-04-04T01:03:46Z

[FLINK-9127] Fix log message for filesystem backend




> Filesystem State Backend logged incorrectly
> ---
>
> Key: FLINK-9127
> URL: https://issues.apache.org/jira/browse/FLINK-9127
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.2, 1.4.2
>Reporter: Scott Kidder
>Priority: Trivial
>
> When using a filesystem backend, the 
> '[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]'
>  class produces a log message stating: "State backend is set to heap memory". 
> Example:
> {{2018-04-04 00:45:49,591 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend 
> is set to heap memory (checkpoints to filesystem 
> "hdfs://hdfs:8020/flink/checkpoints")}}
> It looks like this resulted from some copy-pasta of the previous 
> case-statement that matches on the memory backend. This bug is also present 
> in earlier releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' 
> class.
> This log statement should be corrected to indicate that a filesystem backend 
> is in use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5810: [FLINK-9127] [Core] Filesystem State Backend logge...

2018-04-03 Thread skidder
GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/5810

[FLINK-9127] [Core] Filesystem State Backend logged incorrectly

## What is the purpose of the change

This pull-request fixes a message logged when during startup of the Flink 
Task-Manager and Job-Manager when a filesystem backend is in use.

The old incorrect behavior produced a log message indicating that a 
heap-memory backend is use in.

## Brief change log

  - Fix the log message produced by the StateBackendLoader class in Core.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink FLINK-9127

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5810


commit 9e3774bb2a0ceedacde584a47aa9146bcbf47a3a
Author: Scott Kidder 
Date:   2018-04-04T01:03:46Z

[FLINK-9127] Fix log message for filesystem backend




---


[jira] [Created] (FLINK-9127) Filesystem State Backend logged incorrectly

2018-04-03 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-9127:
---

 Summary: Filesystem State Backend logged incorrectly
 Key: FLINK-9127
 URL: https://issues.apache.org/jira/browse/FLINK-9127
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2, 1.3.2
Reporter: Scott Kidder


When using a filesystem backend, the 
'[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]'
 class produces a log message stating: "State backend is set to heap memory". 

Example:

{{2018-04-04 00:45:49,591 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend 
is set to heap memory (checkpoints to filesystem 
"hdfs://hdfs:8020/flink/checkpoints")}}

It looks like this resulted from some copy-pasta of the previous case-statement 
that matches on the memory backend. This bug is also present in earlier 
releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' class.

This log statement should be corrected to indicate that a filesystem backend is 
in use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8835) Fix TaskManager config keys

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424817#comment-16424817
 ] 

ASF GitHub Bot commented on FLINK-8835:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5808
  
cc @StephanEwen @aljoscha 


> Fix TaskManager config keys
> ---
>
> Key: FLINK-8835
> URL: https://issues.apache.org/jira/browse/FLINK-8835
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Blocker
>  Labels: easy-fix
> Fix For: 1.5.0
>
>
> Many new config keys in the TaskManager don't follow the proper naming 
> scheme. We need to clear those up before the release. I would also suggest to 
> keep the key names short, because that makes it easier for users.
> When doing this cleanup pass over the config keys, I would suggest to also 
> make some of the existing keys more hierarchical harmonize them with the 
> common scheme in Flink.
> h1. New Keys
> * {{taskmanager.network.credit-based-flow-control.enabled}} to 
> {{taskmanager.network.credit-model}}.
> h1. Existing Keys
> * {{taskmanager.debug.memory.startLogThread}} => 
> {{taskmanager.debug.memory.log}}
> * {{taskmanager.debug.memory.logIntervalMs}} => 
> {{taskmanager.debug.memory.log-interval}}
> * {{taskmanager.initial-registration-pause}} => 
> {{taskmanager.registration.initial-backoff}}
> * {{taskmanager.max-registration-pause}} => 
> {{taskmanager.registration.max-backoff}}
> * {{taskmanager.refused-registration-pause}} 
> {{taskmanager.registration.refused-backoff}}
> * {{taskmanager.maxRegistrationDuration}} ==> * 
> {{taskmanager.registration.timeout}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5808: [FLINK-8835] [taskmanager] Fix TaskManager config keys

2018-04-03 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5808
  
cc @StephanEwen @aljoscha 


---


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:43 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization, which means it stayed the same for the operator lifecycle. but 
different operators got different entropy, which is the key for scaling 
parallel operators. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent 

[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:24 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:22 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8697) Rename DummyFlinkKafkaConsumer in Kinesis tests

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424586#comment-16424586
 ] 

ASF GitHub Bot commented on FLINK-8697:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5809

[FLINK-8697] [Kinesis Connector] Rename DummyFlinkKafkaConsumer in 
KinesisDataFetcherTest

## What is the purpose of the change

`DummyFlinkKafkaConsumer` in `KinesisDataFetcherTest` should be named 
`DummyFlinkKinesisConsumer`

## Brief change log

Rename `DummyFlinkKafkaConsumer` to `DummyFlinkKinesisConsumer` in Kinesis 
tests

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8697

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5809


commit 6c836a62248b51c762558e87a3e80410a19262c0
Author: Bowen Li 
Date:   2018-04-03T20:53:23Z

[FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests




> Rename DummyFlinkKafkaConsumer in Kinesis tests
> ---
>
> Key: FLINK-8697
> URL: https://issues.apache.org/jira/browse/FLINK-8697
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Chesnay Schepler
>Assignee: Bowen Li
>Priority: Trivial
>  Labels: easy-fix, starter
> Fix For: 1.5.0, 1.6.0, 1.4.3
>
>
> In {{KinesisDataFetcherTest}} exists a class
> {code}
> private static class DummyFlinkKafkaConsumer extends 
> FlinkKinesisConsumer {
> {code}
> The class should be called {{DummyFlinkKinesisConsumer}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8697) Rename DummyFlinkKafkaConsumer in Kinesis tests

2018-04-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8697:

Fix Version/s: 1.6.0

> Rename DummyFlinkKafkaConsumer in Kinesis tests
> ---
>
> Key: FLINK-8697
> URL: https://issues.apache.org/jira/browse/FLINK-8697
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Chesnay Schepler
>Assignee: Bowen Li
>Priority: Trivial
>  Labels: easy-fix, starter
> Fix For: 1.5.0, 1.6.0, 1.4.3
>
>
> In {{KinesisDataFetcherTest}} exists a class
> {code}
> private static class DummyFlinkKafkaConsumer extends 
> FlinkKinesisConsumer {
> {code}
> The class should be called {{DummyFlinkKinesisConsumer}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5809: [FLINK-8697] [Kinesis Connector] Rename DummyFlink...

2018-04-03 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5809

[FLINK-8697] [Kinesis Connector] Rename DummyFlinkKafkaConsumer in 
KinesisDataFetcherTest

## What is the purpose of the change

`DummyFlinkKafkaConsumer` in `KinesisDataFetcherTest` should be named 
`DummyFlinkKinesisConsumer`

## Brief change log

Rename `DummyFlinkKafkaConsumer` to `DummyFlinkKinesisConsumer` in Kinesis 
tests

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8697

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5809


commit 6c836a62248b51c762558e87a3e80410a19262c0
Author: Bowen Li 
Date:   2018-04-03T20:53:23Z

[FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests




---


[jira] [Assigned] (FLINK-8697) Rename DummyFlinkKafkaConsumer in Kinesis tests

2018-04-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-8697:
---

Assignee: Bowen Li

> Rename DummyFlinkKafkaConsumer in Kinesis tests
> ---
>
> Key: FLINK-8697
> URL: https://issues.apache.org/jira/browse/FLINK-8697
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Chesnay Schepler
>Assignee: Bowen Li
>Priority: Trivial
>  Labels: easy-fix, starter
> Fix For: 1.5.0, 1.4.3
>
>
> In {{KinesisDataFetcherTest}} exists a class
> {code}
> private static class DummyFlinkKafkaConsumer extends 
> FlinkKinesisConsumer {
> {code}
> The class should be called {{DummyFlinkKinesisConsumer}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424552#comment-16424552
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

it seems that S3 walk through the prefix from left to right until it finds some 
randomness for partitioning. it is more sophisticated than just first a few 
chars.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5797: [FLINK-9104][doc]Re-generate REST API documentatio...

2018-04-03 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5797#discussion_r178913403
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -258,6 +265,37 @@ private static String createMessageHtmlEntry(Class 
messageClass, Class emp
return json;
}
 
+   /**
+* Create character escapes for HTML when generating JSON 
request/response string.
+*/
+   private static class HTMLCharacterEscapes extends CharacterEscapes {
--- End diff --

good point. added in comments to illustrate the necessity. 


---


[jira] [Commented] (FLINK-9104) Re-generate REST API documentation for FLIP-6

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424376#comment-16424376
 ] 

ASF GitHub Bot commented on FLINK-9104:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5797#discussion_r178913403
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -258,6 +265,37 @@ private static String createMessageHtmlEntry(Class 
messageClass, Class emp
return json;
}
 
+   /**
+* Create character escapes for HTML when generating JSON 
request/response string.
+*/
+   private static class HTMLCharacterEscapes extends CharacterEscapes {
--- End diff --

good point. added in comments to illustrate the necessity. 


> Re-generate REST API documentation for FLIP-6 
> --
>
> Key: FLINK-9104
> URL: https://issues.apache.org/jira/browse/FLINK-9104
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Rong Rong
>Priority: Blocker
>  Labels: flip-6
>
> The API documentation is missing for several handlers, e.g., 
> {{SavepointHandlers}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes

2018-04-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3117
  
@ShashwatRastogi-Reflektion I'm not entirely sure, but one thing you could 
try is to explicitly disable chaining 
(`StreamExecutionEnvironment#disableOperatorChaining`). This way the ID of each 
operator (that now runs as a separate task) should be logged / be visible in 
the UI.


---


[jira] [Commented] (FLINK-5480) User-provided hashes for operators

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424315#comment-16424315
 ] 

ASF GitHub Bot commented on FLINK-5480:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3117
  
@ShashwatRastogi-Reflektion I'm not entirely sure, but one thing you could 
try is to explicitly disable chaining 
(`StreamExecutionEnvironment#disableOperatorChaining`). This way the ID of each 
operator (that now runs as a separate task) should be logged / be visible in 
the UI.


> User-provided hashes for operators
> --
>
> Key: FLINK-5480
> URL: https://issues.apache.org/jira/browse/FLINK-5480
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.2.0, 1.3.0
>
>
> We could allow users to provided (alternative) hashes for operators in a 
> StreamGraph. This can make migration between Flink versions easier, in case 
> the automatically produced hashes between versions are incompatible. For 
> example, users could just copy the old hashes from the web ui to their job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9010) NoResourceAvailableException with FLIP-6

2018-04-03 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424252#comment-16424252
 ] 

Piotr Nowojski commented on FLINK-9010:
---

No, I didn't experience any issues. 

> NoResourceAvailableException with FLIP-6 
> -
>
> Key: FLINK-9010
> URL: https://issues.apache.org/jira/browse/FLINK-9010
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) 
> with FLIP-6 mode and a checkpointing interval of 1000 and got the following 
> exception:
> {code}
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000101 - Remaining pending container 
> requests: 302
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000101 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,155 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
> 2018-03-16 03:41:20,165 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> 2018-03-16 03:41:20,176 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/ha

[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes

2018-04-03 Thread ShashwatRastogi-Reflektion
Github user ShashwatRastogi-Reflektion commented on the issue:

https://github.com/apache/flink/pull/3117
  
@zentol I am using flink 1.3.2.
Normally, I don't use `--allowNonRestoredState` and I get an error while 
restarting the job. After using this `allowNonRestoredState` the job starts up 
fine but there is a data loss because of lost state, which is what I want to 
avoid.
I think, the problem with my implementation is that I am wrongly assigning 
some ids to the operators. Is there sure-shot way of finding out uid or 
uid-hash generated by flink for each operator(with/without chaining) ?


---


[jira] [Commented] (FLINK-5480) User-provided hashes for operators

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424244#comment-16424244
 ] 

ASF GitHub Bot commented on FLINK-5480:
---

Github user ShashwatRastogi-Reflektion commented on the issue:

https://github.com/apache/flink/pull/3117
  
@zentol I am using flink 1.3.2.
Normally, I don't use `--allowNonRestoredState` and I get an error while 
restarting the job. After using this `allowNonRestoredState` the job starts up 
fine but there is a data loss because of lost state, which is what I want to 
avoid.
I think, the problem with my implementation is that I am wrongly assigning 
some ids to the operators. Is there sure-shot way of finding out uid or 
uid-hash generated by flink for each operator(with/without chaining) ?


> User-provided hashes for operators
> --
>
> Key: FLINK-5480
> URL: https://issues.apache.org/jira/browse/FLINK-5480
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.2.0, 1.3.0
>
>
> We could allow users to provided (alternative) hashes for operators in a 
> StreamGraph. This can make migration between Flink versions easier, in case 
> the automatically produced hashes between versions are incompatible. For 
> example, users could just copy the old hashes from the web ui to their job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424237#comment-16424237
 ] 

Ted Yu commented on FLINK-9087:
---

[~NicoK]:
Since you were recently working on related code, mind sharing your thought ?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> for (StreamRecordWriter>> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5480) User-provided hashes for operators

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424206#comment-16424206
 ] 

ASF GitHub Bot commented on FLINK-5480:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3117
  
@ShashwatRastogi-Reflektion It's a bit odd that there are different ID's 
shown, I will have to look into that. It may be that one display accounts for 
the uid while the other one doesn't; in any case one of them should be the task 
ID.

I should've asked earlier; which version of Flink are you using?

If it is 1.2 or below, then I don't know at the moment what the problem 
could be.
If it is 1.3 or above, the steps I mentioned have to be done for each 
operator, and not task. That said, I'm not sure if we actually expose the id of 
each operator anywhere in 1.3 in a nice way... You may have to resort to 
trial&error; if a state can't be assigned to an operator you should get an 
exception containing the ID of the state (unless you explicitly allow 
[non-restored 
state](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#allowing-non-restored-state),
 which you can then use as the uid hash. If you are already on 1.4 you can 
figure them out with the metric system (for example with the JMXReporter).


> User-provided hashes for operators
> --
>
> Key: FLINK-5480
> URL: https://issues.apache.org/jira/browse/FLINK-5480
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.2.0, 1.3.0
>
>
> We could allow users to provided (alternative) hashes for operators in a 
> StreamGraph. This can make migration between Flink versions easier, in case 
> the automatically produced hashes between versions are incompatible. For 
> example, users could just copy the old hashes from the web ui to their job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes

2018-04-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3117
  
@ShashwatRastogi-Reflektion It's a bit odd that there are different ID's 
shown, I will have to look into that. It may be that one display accounts for 
the uid while the other one doesn't; in any case one of them should be the task 
ID.

I should've asked earlier; which version of Flink are you using?

If it is 1.2 or below, then I don't know at the moment what the problem 
could be.
If it is 1.3 or above, the steps I mentioned have to be done for each 
operator, and not task. That said, I'm not sure if we actually expose the id of 
each operator anywhere in 1.3 in a nice way... You may have to resort to 
trial&error; if a state can't be assigned to an operator you should get an 
exception containing the ID of the state (unless you explicitly allow 
[non-restored 
state](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#allowing-non-restored-state),
 which you can then use as the uid hash. If you are already on 1.4 you can 
figure them out with the metric system (for example with the JMXReporter).


---


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-03 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424169#comment-16424169
 ] 

Triones Deng commented on FLINK-9087:
-

[~yuzhih...@gmail.com]   when i run the test. i found that in 
{code:java}
public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
try (BufferConsumer eventBufferConsumer = 
EventSerializer.toBufferConsumer(event)) {
...
// retain the buffer so that it can be recycled 
by each channel of targetPartition

targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
}
...
return eventBufferConsumer;
}
}
{code}

which call targetPartition.addBufferConsumer() , here make use of the copy of 
the eventBufferConsumer, so, all the BufferConsumer produced by copy share the 
same buffer.and this will call AbstractReferenceCountedByteBuf.retain() , here 
AbstractReferenceCountedByteBuf.java is netty class

all the targetPartition like AbstractCollectingResultPartitionWriter and 
ResultPartition will call close method of BufferConsumer, at last the buffer in 
eventBufferConsumer  will be released. ResultPartition will call 
notifyDataAvailable which is async to consume the data. so here we'd better to 
let the return value alone,  what do you think. or just change the method 
signature to void ?

notice that in FLINK-7315, plan to use flink's buffers in netty, one sub task 
FLINK-7518 which have a solution.  i am a new here, any suggestions?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> for (StreamRecordWriter>> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9126) Ability for CassandraInputFormat to output data into a Custom Cassandra Annotated Pojo

2018-04-03 Thread Jeffrey Carter (JIRA)
Jeffrey Carter created FLINK-9126:
-

 Summary: Ability for CassandraInputFormat to output data into a 
Custom Cassandra Annotated Pojo
 Key: FLINK-9126
 URL: https://issues.apache.org/jira/browse/FLINK-9126
 Project: Flink
  Issue Type: New Feature
  Components: DataSet API
Affects Versions: 1.4.2
Reporter: Jeffrey Carter
 Fix For: 1.5.0
 Attachments: CassandraPojoInputFormatText.rtf

*First time proposing new update so apologies if I missed anything*

Currently the DataSet API only has the ability to output data received from 
Cassandra as a source in as a Tuple. This would be allow the data to be output 
as a custom POJO that the user has created that has been annotated using 
Datastax API. This would remove the need of  very long Tuples to be created by 
the DataSet and then mapped to the custom POJO.

 

The changes to the CassandraInputFormat object would be minimal, but would 
require importing the Datastax API into the class. Another option is to make a 
similar, but slightly different class called CassandraPojoInputFormat.

I have already gotten code for this working in my own project, but want other 
thoughts as to the best way this should go about being implemented.

 

//Example of its use in main

CassandraPojoInputFormat cassandraInputFormat = new 
CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
CustomCassandraPojo.class);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);

DataSet outputTestSet = 
exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
TypeHint(){}));

 

//The class that I currently have set up

[^CassandraPojoInputFormatText.rtf]

 

Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9076) Make credit-based floating buffers optional

2018-04-03 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424137#comment-16424137
 ] 

Nico Kruber commented on FLINK-9076:


The max can already be configured via 
{{taskmanager.network.memory.floating-buffers-per-gate}}. This is about 
lowering the min value to 1 or 0 as before.

> Make credit-based floating buffers optional
> ---
>
> Key: FLINK-9076
> URL: https://issues.apache.org/jira/browse/FLINK-9076
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, floating buffers (per gate) are always required in case 
> credit-based flow control is enabled. This, however, increases our minimum 
> number of required network buffers.
> Instead, without changing too much, we could already work with a minimum of 
> one or zero floating buffers and set the max to the configured value. This 
> way, if there are not enough buffers, all {{LocalBufferPool}}s will at least 
> share the available ones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9076) Make credit-based floating buffers optional

2018-04-03 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9076:
---
Description: 
Currently, floating buffers (per gate) are always required in case credit-based 
flow control is enabled. This, however, increases our minimum number of 
required network buffers.

Instead, without changing too much, we could already work with a minimum of one 
or zero floating buffers and set the max to the configured value. This way, if 
there are not enough buffers, all {{LocalBufferPool}}s will at least share the 
available ones.

  was:
Currently, floating buffers (per gate) are always required in case credit-based 
flow control is enabled. This, however, increases our minimum number of 
required network buffers.

Instead, without changing too much, we could already work with a minimum of one 
floating buffer and set the max to the configured value. This way, if there are 
not enough buffers, all {{LocalBufferPool}}s will at least share the available 
ones.


> Make credit-based floating buffers optional
> ---
>
> Key: FLINK-9076
> URL: https://issues.apache.org/jira/browse/FLINK-9076
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, floating buffers (per gate) are always required in case 
> credit-based flow control is enabled. This, however, increases our minimum 
> number of required network buffers.
> Instead, without changing too much, we could already work with a minimum of 
> one or zero floating buffers and set the max to the configured value. This 
> way, if there are not enough buffers, all {{LocalBufferPool}}s will at least 
> share the available ones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5480) User-provided hashes for operators

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424128#comment-16424128
 ] 

ASF GitHub Bot commented on FLINK-5480:
---

Github user ShashwatRastogi-Reflektion commented on the issue:

https://github.com/apache/flink/pull/3117
  
Hey @zentol 

Thank you for replying back. I was trying to do exactly the same thing, but 
I think I am messing something up that is why it isn't working in my case.

In my logs, i get the task description like:
`2018-04-03 12:24:45,876 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Filter -> events-with-timestamp -> filtered-events-with-timestamp -> 
events-mapped-to-session (1/1) (5c69d4ad31a844978a740c1d24297b68) switched from 
CREATED to SCHEDULED.` Is the hash present in this log statement called uid 
hash? 

This hash is not the same which I get in the UI.

![image](https://user-images.githubusercontent.com/29359103/38256652-6a5e7d58-377c-11e8-8124-16bc2cd7e6e0.png)

I have tried using both and both of them doesn't work.

Also, my operators are chained together so I will get one uid-hash for the 
entire chain, right?
And, I would be setting the same uid-hash for all operators in the chain?


> User-provided hashes for operators
> --
>
> Key: FLINK-5480
> URL: https://issues.apache.org/jira/browse/FLINK-5480
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.2.0, 1.3.0
>
>
> We could allow users to provided (alternative) hashes for operators in a 
> StreamGraph. This can make migration between Flink versions easier, in case 
> the automatically produced hashes between versions are incompatible. For 
> example, users could just copy the old hashes from the web ui to their job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424125#comment-16424125
 ] 

Nico Kruber commented on FLINK-8707:


I still need to look at the detailed logs you two provided, but let me already 
add one note about {{lsof | wc -l}}: Whenever you spawn a child process (on 
Linux), the new process inherits the file descriptors of the parent and hence 
the duplication in this report. Basically, a single file descriptor is reported 
for every spawned thread. Using {{lsof -p }} will not show the task's file 
descriptors, as will {{lsof -K i}} for all PIDs. Therefore, whenever you want 
to count the overall number of descriptors for a machine, you should use the 
latter.

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes

2018-04-03 Thread ShashwatRastogi-Reflektion
Github user ShashwatRastogi-Reflektion commented on the issue:

https://github.com/apache/flink/pull/3117
  
Hey @zentol 

Thank you for replying back. I was trying to do exactly the same thing, but 
I think I am messing something up that is why it isn't working in my case.

In my logs, i get the task description like:
`2018-04-03 12:24:45,876 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Filter -> events-with-timestamp -> filtered-events-with-timestamp -> 
events-mapped-to-session (1/1) (5c69d4ad31a844978a740c1d24297b68) switched from 
CREATED to SCHEDULED.` Is the hash present in this log statement called uid 
hash? 

This hash is not the same which I get in the UI.

![image](https://user-images.githubusercontent.com/29359103/38256652-6a5e7d58-377c-11e8-8124-16bc2cd7e6e0.png)

I have tried using both and both of them doesn't work.

Also, my operators are chained together so I will get one uid-hash for the 
entire chain, right?
And, I would be setting the same uid-hash for all operators in the chain?


---


[jira] [Closed] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI

2018-04-03 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-6567.

   Resolution: Fixed
Fix Version/s: (was: 1.6.0)
   1.5.0

Fixed via
master: db366cd3d02a823f93185f29ca7ae93da9e2a04b
1.5.0: 515069e14f770ebfb86df27dc27b858ded51c6d5

> ExecutionGraphMetricsTest fails on Windows CI
> -
>
> Key: FLINK-6567
> URL: https://issues.apache.org/jira/browse/FLINK-6567
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on 
> AppVeyor. It also very rarely failed for me locally.
> The test fails at Line 235 if the RUNNING timestamp is equal to the 
> RESTARTING timestamp, which may happen when combining a fast test with a low 
> resolution clock.
> A simple fix would be to increase the timestamp between RUNNING and 
> RESTARTING by adding a 50ms sleep timeout into the 
> {{TestingRestartStrategy#canRestart()}} method, as this one is called before 
> transitioning to the RESTARTING state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9094) AccumulatorLiveITCase unstable on Travis

2018-04-03 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9094.

   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via
master: 78c3d9b0c657bf06a712ce453edb02da13fa3acf
1.5.0: b6982c502196b5059b1bf576f620f45ce0e3aa72

> AccumulatorLiveITCase unstable on Travis
> 
>
> Key: FLINK-9094
> URL: https://issues.apache.org/jira/browse/FLINK-9094
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.5.0
>
>
> {{AccumulatorLiveITCase}} unstable on Travis.
> https://api.travis-ci.org/v3/job/358509206/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424105#comment-16424105
 ] 

ASF GitHub Bot commented on FLINK-6567:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5782


> ExecutionGraphMetricsTest fails on Windows CI
> -
>
> Key: FLINK-6567
> URL: https://issues.apache.org/jira/browse/FLINK-6567
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on 
> AppVeyor. It also very rarely failed for me locally.
> The test fails at Line 235 if the RUNNING timestamp is equal to the 
> RESTARTING timestamp, which may happen when combining a fast test with a low 
> resolution clock.
> A simple fix would be to increase the timestamp between RUNNING and 
> RESTARTING by adding a 50ms sleep timeout into the 
> {{TestingRestartStrategy#canRestart()}} method, as this one is called before 
> transitioning to the RESTARTING state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9094) AccumulatorLiveITCase unstable on Travis

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424106#comment-16424106
 ] 

ASF GitHub Bot commented on FLINK-9094:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5771


> AccumulatorLiveITCase unstable on Travis
> 
>
> Key: FLINK-9094
> URL: https://issues.apache.org/jira/browse/FLINK-9094
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> {{AccumulatorLiveITCase}} unstable on Travis.
> https://api.travis-ci.org/v3/job/358509206/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5782: [FLINK-6567] [tests] Harden ExecutionGraphMetricsT...

2018-04-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5782


---


[GitHub] flink pull request #5771: [FLINK-9094] [tests] Harden AccumulatorLiveITCase

2018-04-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5771


---


[jira] [Commented] (FLINK-9120) Task Manager Fault Tolerance issue

2018-04-03 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424091#comment-16424091
 ] 

Sihua Zhou commented on FLINK-9120:
---

Hi [~dhirajpraj] sorry for the delay reply, I was busy on something before, I 
think [~till.rohrmann]'s reply is the best explanation for my "TM doesn't 
unregister from JM properly in standalone model".

> Task Manager Fault Tolerance issue
> --
>
> Key: FLINK-9120
> URL: https://issues.apache.org/jira/browse/FLINK-9120
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Critical
> Attachments: flink-dhiraj.prajapati-client-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-client-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log, 
> flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log
>
>
> HI, 
> I have set up a flink 1.4 cluster with 1 job manager and two task managers. 
> The configs taskmanager.numberOfTaskSlots and parallelism.default were set 
> to 2 on each node. I submitted a job to this cluster and it runs fine. To 
> test fault tolerance, I killed one task manager. I was expecting the job to 
> run fine because one of the 2 task managers was still up and running. 
> However, the job failed. Am I missing something? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9076) Make credit-based floating buffers optional

2018-04-03 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424083#comment-16424083
 ] 

Piotr Nowojski commented on FLINK-9076:
---

[~NicoK], do you mean max should be configured by a user, or should it be a 
dynamically adjusted value?

> Make credit-based floating buffers optional
> ---
>
> Key: FLINK-9076
> URL: https://issues.apache.org/jira/browse/FLINK-9076
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, floating buffers (per gate) are always required in case 
> credit-based flow control is enabled. This, however, increases our minimum 
> number of required network buffers.
> Instead, without changing too much, we could already work with a minimum of 
> one floating buffer and set the max to the configured value. This way, if 
> there are not enough buffers, all {{LocalBufferPool}}s will at least share 
> the available ones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424072#comment-16424072
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r178836145
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

What I find problematic about deleting the DFS files is that not all state 
is (yet) covered by local recovery and it is also a lot harder to debug the 
cause if there is an actual scheduling problem. With the current code, you can 
easily see which allocation was lost. REST API might be an option if it is 
somehow exposed there.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-04-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r178836145
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

What I find problematic about deleting the DFS files is that not all state 
is (yet) covered by local recovery and it is also a lot harder to debug the 
cause if there is an actual scheduling problem. With the current code, you can 
easily see which allocation was lost. REST API might be an option if it is 
somehow exposed there.


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424060#comment-16424060
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5807
  
Thanks! I addressed the changes in the last two commits


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5807: [FLINK-8982][E2E Tests] Add test for known failure of que...

2018-04-03 Thread florianschmidt1994
Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5807
  
Thanks! I addressed the changes in the last two commits


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424052#comment-16424052
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178828372
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR"
 
 EXIT_CODE=0
 
--- End diff --

I would recommend to move it to  the nightly tests. Queryable state is not 
a core component and the normal builds are already timing out.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827723
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
+   stuff = new ArrayList<>();
+   stuff.add("1");
+   stuff.add("2");
+   stuff.add("3");
+   label = email.getLabel();
+   }
+
+   public EmailId getEmailId() {
+   return emailId;
+   }
+
+// //public Instant getTimestamp() {
+// return timestamp;
+// }
+
+   public List getStuff() {
+   return stuff;
+   }
+
+   public Long getAsdf() {
+   return asdf;
+   }
+
+   public LabelSurrogate getLabel() {
+   return label;
+   }
+
+   public void setLabel(LabelSurrogate label) {
+   this.label = label;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   EmailInformation that = (EmailInformation) o;
+   return Objects.equals(emailId, that.emailId) &&
+// Objects.equals(timestamp, that.timestamp) &&
--- End diff --

remove.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178828967
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Javadoc.
+ */
+public class QsBugPoc {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   RocksDBStateBackend rocksDb = new 
RocksDBStateBackend("file:///tmp/deleteme-rocksdb");
+   env.setStateBackend(rocksDb);
--- End diff --

The dir to checkpoint can be a parameter and here it should be a path in 
the `TEST_DIR` of the test itself. In addition, everything should be explicitly 
cleaned up, e.g. checkpoints, potential output/input data, etc.


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424053#comment-16424053
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827698
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
--- End diff --

remove.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178828372
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR"
 
 EXIT_CODE=0
 
--- End diff --

I would recommend to move it to  the nightly tests. Queryable state is not 
a core component and the normal builds are already timing out.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827995
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * Javadoc.
+ */
+public class LabelSurrogate {
+
+   private Type type;
+   private String foo;
+
+   public LabelSurrogate(Type type, String foo) {
+   this.type = type;
+   this.foo = foo;
+   }
+
+   public Type getType() {
+   return type;
+   }
+
+   public void setType(Type type) {
+   this.type = type;
+   }
+
+   public String getFoo() {
+   return foo;
+   }
+
+   public void setFoo(String foo) {
+   this.foo = foo;
+   }
+
+   @Override
+   public String toString() {
+   return "LabelSurrogate{" +
+   "type=" + type +
+   ", foo='" + foo + '\'' +
+   '}';
+   }
+
+   /**
+* Javadoc.
+*/
--- End diff --

Same here.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827936
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * Javadoc.
+ */
--- End diff --

This is just a placeholder comment for checkstyle verification to pass. 
Please write a real comment. 
This holds also for other places.


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424048#comment-16424048
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827936
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * Javadoc.
+ */
--- End diff --

This is just a placeholder comment for checkstyle verification to pass. 
Please write a real comment. 
This holds also for other places.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178829164
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Javadoc.
+ */
+public class QsBugPoc {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   RocksDBStateBackend rocksDb = new 
RocksDBStateBackend("file:///tmp/deleteme-rocksdb");
+   env.setStateBackend(rocksDb);
--- End diff --

Also check for different backends, i.e. file and rocks. You can have a look 
to the `test_ha.sh`.


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424054#comment-16424054
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827995
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * Javadoc.
+ */
+public class LabelSurrogate {
+
+   private Type type;
+   private String foo;
+
+   public LabelSurrogate(Type type, String foo) {
+   this.type = type;
+   this.foo = foo;
+   }
+
+   public Type getType() {
+   return type;
+   }
+
+   public void setType(Type type) {
+   this.type = type;
+   }
+
+   public String getFoo() {
+   return foo;
+   }
+
+   public void setFoo(String foo) {
+   this.foo = foo;
+   }
+
+   @Override
+   public String toString() {
+   return "LabelSurrogate{" +
+   "type=" + type +
+   ", foo='" + foo + '\'' +
+   '}';
+   }
+
+   /**
+* Javadoc.
+*/
--- End diff --

Same here.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827415
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
--- End diff --

Remove commented methods.


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424045#comment-16424045
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827543
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
--- End diff --

same here (remove commented field).


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424049#comment-16424049
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827723
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
+   stuff = new ArrayList<>();
+   stuff.add("1");
+   stuff.add("2");
+   stuff.add("3");
+   label = email.getLabel();
+   }
+
+   public EmailId getEmailId() {
+   return emailId;
+   }
+
+// //public Instant getTimestamp() {
+// return timestamp;
+// }
+
+   public List getStuff() {
+   return stuff;
+   }
+
+   public Long getAsdf() {
+   return asdf;
+   }
+
+   public LabelSurrogate getLabel() {
+   return label;
+   }
+
+   public void setLabel(LabelSurrogate label) {
+   this.label = label;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   EmailInformation that = (EmailInformation) o;
+   return Objects.equals(emailId, that.emailId) &&
+// Objects.equals(timestamp, that.timestamp) &&
--- End diff --

remove.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424051#comment-16424051
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827678
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
+   stuff = new ArrayList<>();
+   stuff.add("1");
+   stuff.add("2");
+   stuff.add("3");
+   label = email.getLabel();
+   }
+
+   public EmailId getEmailId() {
+   return emailId;
+   }
+
+// //public Instant getTimestamp() {
--- End diff --

remove.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424050#comment-16424050
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178828967
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Javadoc.
+ */
+public class QsBugPoc {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   RocksDBStateBackend rocksDb = new 
RocksDBStateBackend("file:///tmp/deleteme-rocksdb");
+   env.setStateBackend(rocksDb);
--- End diff --

The dir to checkpoint can be a parameter and here it should be a path in 
the `TEST_DIR` of the test itself. In addition, everything should be explicitly 
cleaned up, e.g. checkpoints, potential output/input data, etc.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424046#comment-16424046
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827415
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
--- End diff --

Remove commented methods.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424047#comment-16424047
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178829164
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Javadoc.
+ */
+public class QsBugPoc {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   RocksDBStateBackend rocksDb = new 
RocksDBStateBackend("file:///tmp/deleteme-rocksdb");
+   env.setStateBackend(rocksDb);
--- End diff --

Also check for different backends, i.e. file and rocks. You can have a look 
to the `test_ha.sh`.


> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827698
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
--- End diff --

remove.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827543
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
--- End diff --

same here (remove commented field).


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827678
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
+   stuff = new ArrayList<>();
+   stuff.add("1");
+   stuff.add("2");
+   stuff.add("3");
+   label = email.getLabel();
+   }
+
+   public EmailId getEmailId() {
+   return emailId;
+   }
+
+// //public Instant getTimestamp() {
--- End diff --

remove.


---


[jira] [Commented] (FLINK-9078) End-to-end test: Add test that verifies that a specific classloading issue with avro is fixed

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424042#comment-16424042
 ] 

ASF GitHub Bot commented on FLINK-9078:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5762#discussion_r178828282
  
--- Diff: 
flink-end-to-end-tests/flink-avro-classloading-test/src/main/avro/user.avsc ---
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "namespace": "org.apache.flink.tests.streaming",
+ "name": "User",
+ "fields": [
+   { "name": "first", "type": "string" },
+   { "name": "last", "type": "string" }
--- End diff --

I added some more complex examples and verified that the initial test case 
still fails on 1.3.X and works on 1.4.X as expected


> End-to-end test: Add test that verifies that a specific classloading issue 
> with avro is fixed
> -
>
> Key: FLINK-9078
> URL: https://issues.apache.org/jira/browse/FLINK-9078
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5762: [FLINK-9078][E2ETests] Add test that verifies corr...

2018-04-03 Thread florianschmidt1994
Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5762#discussion_r178828282
  
--- Diff: 
flink-end-to-end-tests/flink-avro-classloading-test/src/main/avro/user.avsc ---
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "namespace": "org.apache.flink.tests.streaming",
+ "name": "User",
+ "fields": [
+   { "name": "first", "type": "string" },
+   { "name": "last", "type": "string" }
--- End diff --

I added some more complex examples and verified that the initial test case 
still fails on 1.3.X and works on 1.4.X as expected


---


[jira] [Commented] (FLINK-9120) Task Manager Fault Tolerance issue

2018-04-03 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424035#comment-16424035
 ] 

Till Rohrmann commented on FLINK-9120:
--

Hi [~dhirajpraj],

the logs show that the JM did not yet recognize the killed TM as killed when 
trying to restart. Thus, it tries to re-deploy tasks to this machine. When it 
finally realizes that the TM has been killed, it fails the jobs. At this point, 
it would try to recover the job, however, since the number of restart attempts 
are depleted (set to 3), it will fail the job terminally. Please try to raise 
the number of retry attempts. This should hopefully fix your problem.

> Task Manager Fault Tolerance issue
> --
>
> Key: FLINK-9120
> URL: https://issues.apache.org/jira/browse/FLINK-9120
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Critical
> Attachments: flink-dhiraj.prajapati-client-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-client-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-jobmanager-5-ip-10-14-25-115.log, 
> flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log, 
> flink-dhiraj.prajapati-taskmanager-5-ip-10-14-25-116.log
>
>
> HI, 
> I have set up a flink 1.4 cluster with 1 job manager and two task managers. 
> The configs taskmanager.numberOfTaskSlots and parallelism.default were set 
> to 2 on each node. I submitted a job to this cluster and it runs fine. To 
> test fault tolerance, I killed one task manager. I was expecting the job to 
> run fine because one of the 2 task managers was still up and running. 
> However, the job failed. Am I missing something? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8835) Fix TaskManager config keys

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424030#comment-16424030
 ] 

ASF GitHub Bot commented on FLINK-8835:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5808

[FLINK-8835] [taskmanager] Fix TaskManager config keys

## What is the purpose of the change

Fix TaskManager config keys to make it easier for users.

## Brief change log

Change the original key and variable name to a easier way.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 
flink-8835-taskmanager-config-key

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5808


commit a4df41079edd228ea346227f2f04b41d694c0420
Author: zhangminglei 
Date:   2018-04-03T13:23:19Z

[FLINK-8835] [taskmanager] Fix TaskManager config keys




> Fix TaskManager config keys
> ---
>
> Key: FLINK-8835
> URL: https://issues.apache.org/jira/browse/FLINK-8835
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Blocker
>  Labels: easy-fix
> Fix For: 1.5.0
>
>
> Many new config keys in the TaskManager don't follow the proper naming 
> scheme. We need to clear those up before the release. I would also suggest to 
> keep the key names short, because that makes it easier for users.
> When doing this cleanup pass over the config keys, I would suggest to also 
> make some of the existing keys more hierarchical harmonize them with the 
> common scheme in Flink.
> h1. New Keys
> * {{taskmanager.network.credit-based-flow-control.enabled}} to 
> {{taskmanager.network.credit-model}}.
> h1. Existing Keys
> * {{taskmanager.debug.memory.startLogThread}} => 
> {{taskmanager.debug.memory.log}}
> * {{taskmanager.debug.memory.logIntervalMs}} => 
> {{taskmanager.debug.memory.log-interval}}
> * {{taskmanager.initial-registration-pause}} => 
> {{taskmanager.registration.initial-backoff}}
> * {{taskmanager.max-registration-pause}} => 
> {{taskmanager.registration.max-backoff}}
> * {{taskmanager.refused-registration-pause}} 
> {{taskmanager.registration.refused-backoff}}
> * {{taskmanager.maxRegistrationDuration}} ==> * 
> {{taskmanager.registration.timeout}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5808: [FLINK-8835] [taskmanager] Fix TaskManager config ...

2018-04-03 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5808

[FLINK-8835] [taskmanager] Fix TaskManager config keys

## What is the purpose of the change

Fix TaskManager config keys to make it easier for users.

## Brief change log

Change the original key and variable name to a easier way.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 
flink-8835-taskmanager-config-key

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5808


commit a4df41079edd228ea346227f2f04b41d694c0420
Author: zhangminglei 
Date:   2018-04-03T13:23:19Z

[FLINK-8835] [taskmanager] Fix TaskManager config keys




---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424024#comment-16424024
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r178824035
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

Hmm, what about testing it a bit more indirectly, by removing the 
checkpoint files on the DFS. Then you can only recover if you recover locally. 
Or by querying the REST interface? We might have to add the information to the 
`VertexTaskDetail`.

Otherwise, we start mixing concerns and expose unnecessary information to 
the user via the `AbstractRuntimeUDFContext`. Moreover, not every function has 
access to this information right now. For example the 
`RichAsyncFunctionRuntimeContext` does not expose it.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-04-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r178824035
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

Hmm, what about testing it a bit more indirectly, by removing the 
checkpoint files on the DFS. Then you can only recover if you recover locally. 
Or by querying the REST interface? We might have to add the information to the 
`VertexTaskDetail`.

Otherwise, we start mixing concerns and expose unnecessary information to 
the user via the `AbstractRuntimeUDFContext`. Moreover, not every function has 
access to this information right now. For example the 
`RichAsyncFunctionRuntimeContext` does not expose it.


---


[jira] [Comment Edited] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006
 ] 

Alon Galant edited comment on FLINK-8707 at 4/3/18 1:24 PM:


Hey, 

I have a similar problem, this is my thread on Flink ML:

[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]

 I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic 
into Flink, so I need to have around the same number of files that the data is 
being written to at once.

 

I attached 3 files:

lsof.txt - the result for $lsof > lsof.txt

lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink 
proccess)

ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt

/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it 
works on, before uploading them to s3. 

I ran these commands on my task manager (I run 2 tasks manager and a total of 8 
task slots, 4 on each tm)

 

Here are some lsof | wc -l results:

 
{code:java}
less lsof.txt | wc -l --> 44228
less lsofp.txt | wc -l --> 403
less ll.txt | wc -l --> 64
less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an 
example of an output file, I think all of them gives the same result)
{code}
 

 

>From ll.txt we can see that there are 4 files for each customerId (for each 
>partition), so I guess that every task slot opens its own file.

For each 'output' file there are 108 FDs.

My problem is that I want to be able to handle around 500 customers, and I want 
to still be able to use high concurrency. 

When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on 
lsof

This is the exception I got when I've enabled all of the Ids:

 
{code:java}
java.lang.RuntimeException: Error parsing YAML configuration.
at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178)
at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351)
at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml 
(Too many open files)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144)
... 14 more
{code}
I increased the ulimit to 500,000 but it's still not enough, and I guess this 
is too much anyhow.

 

I'd love to get some help :)

Thanks,

Alon

 

[^ll.txt]

[^lsof.txt]

[^lsofp.txt]


was (Author: galantaa):
Hey, 

I have a similar problem, this is my thread on Flink ML:

[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]

 I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic 
into Flink, so I need to have around the same number of files that the data is 
being written to at once.

 

I attached 3 files:

lsof.txt - the result for $lsof > lsof.txt

lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink 
proccess)

ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt

/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it 
works on, before uploading them to s3. 

I ran these commands on my task manager (I run 2 tasks manager and a total of 8 
task slots, 4 on each tm)

 

Here are some lsof | w

[jira] [Comment Edited] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006
 ] 

Alon Galant edited comment on FLINK-8707 at 4/3/18 1:22 PM:


Hey, 

I have a similar problem, this is my thread on Flink ML:

[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]

 I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic 
into Flink, so I need to have around the same number of files that the data is 
being written to at once.

 

I attached 3 files:

lsof.txt - the result for $lsof > lsof.txt

lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink 
proccess)

ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt

/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it 
works on, before uploading them to s3. 

I ran these commands on my task manager (I run 2 tasks manager and a total of 8 
task slots, 4 on each tm)

 

Here are some lsof | wc -l results:

 
{code:java}
less lsof.txt | wc -l --> 44228
less lsofp.txt | wc -l --> 403
less ll.txt | wc -l --> 64
less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an 
example of an output file, I think all of them gives the same result)
{code}
 

 

>From ll.txt we can see that there are 4 files for each customerId (for each 
>partition), so I guess that every task slot opens its own file.

For each 'output' file there are 108 FDs.

My problem is that I want to be able to handle around 500 customers, and I want 
to still be able to use high concurrency. 

When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on 
lsof

This is the exception I got when I've enabled all of the Ids:

 
{code:java}
java.lang.RuntimeException: Error parsing YAML configuration.
at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178)
at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351)
at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml 
(Too many open files)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144)
... 14 more
{code}
I increased the ulimit to 500,000 but it's still not enough, and I guess this 
is too much anyhow.

 

I'd love to get some help!

Thanks,

Alon

 

[^ll.txt]

[^lsof.txt]

[^lsofp.txt]


was (Author: galantaa):
Hey, 

I have a similar problem, this is my thread on Flink ML:

[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]

 I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic 
into Flink, so I need to have around the same number of files that the data is 
being written to at once.

 

I attached 3 files:

lsof.txt - the result for $lsof > lsof.txt

lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink 
proccess)

ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt

/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it 
works on, before uploading them to s3. 

I ran these commands on my task manager (I run 2 tasks manager and a total of 8 
task slots, 4 on each tm)

 

Here are some lsof | wc 

[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006
 ] 

Alon Galant commented on FLINK-8707:


Hey, 

I have a similar problem, this is my thread on Flink ML:

[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]

 I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic 
into Flink, so I need to have around the same number of files that the data is 
being written to at once.

 

I attached 3 files:

lsof.txt - the result for $lsof > lsof.txt

lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink 
proccess)

ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt

/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it 
works on, before uploading them to s3. 

I ran these commands on my task manager (I run 2 tasks manager and a total of 8 
task slots, 4 on each tm)

 

Here are some lsof | wc -l results:

 
{code:java}
// code placeholder
less lsof.txt | wc -l --> 44228
less lsofp.txt | wc -l --> 403
less ll.txt | wc -l --> 64
less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an 
example of an output file, I think all of them gives the same result)


{code}
 

 

>From ll.txt we can see that there are 4 files for each customerId (for each 
>partition), so I guess that every task slot opens its own file.

For each 'output' file there are 108 FDs.

My problem is that I want to be able to handle around 500 customers, and I want 
to still be able to use high concurrency. 

When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on 
lsof

This is the exception I got when I've enabled all of the Ids:

 
{code:java}
// code placeholder
java.lang.RuntimeException: Error parsing YAML configuration.
at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178)
at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351)
at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml 
(Too many open files)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144)
... 14 more
{code}
I increased the ulimit to 500,000 but it's still not enough, and I guess this 
is too much anyhow.

 

I'd love to get some help!

Thanks,

Alon

 

[^ll.txt]

[^lsof.txt]

[^lsofp.txt]

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424002#comment-16424002
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r178819991
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

Previously, I had a "hacky" version that extracted the allocation id from a 
to string method of the state store but I think stephan was more in favour of 
exposing this information through the context if needed. I cannot see how we 
can test if we have the same allocation id without having some way to access it.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-04-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r178819991
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

Previously, I had a "hacky" version that extracted the allocation id from a 
to string method of the state store but I think stephan was more in favour of 
exposing this information through the context if needed. I cannot see how we 
can test if we have the same allocation id without having some way to access it.


---


[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alon Galant updated FLINK-8707:
---
Attachment: lsof.txt

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alon Galant updated FLINK-8707:
---
Attachment: lsofp.txt
lsof.txt
ll.txt

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alon Galant updated FLINK-8707:
---
Attachment: lsofp.txt

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-04-03 Thread Alon Galant (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alon Galant updated FLINK-8707:
---
Attachment: ll.txt

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5480) User-provided hashes for operators

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423983#comment-16423983
 ] 

ASF GitHub Bot commented on FLINK-5480:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3117
  
@ShashwatRastogi-Reflektion 

1. For each task to load from the savepoint S1:
* Determine the existing hash by searching for the task ID in the logs/UI
* Set the uidHash for that task to that value via 
`SingleOutputStreamOperator#setUidHash`

2. For each task:
* Set the uid to whatever value you wish in the future to use via 
`SingleOutputStreamOperator#setUid`

3. Resume the job from the savepoint S1.
4. Create a new savepoint S2, and remove all calls to `setUidHash`
5. Resume the job from the savepoint S2.


> User-provided hashes for operators
> --
>
> Key: FLINK-5480
> URL: https://issues.apache.org/jira/browse/FLINK-5480
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.2.0, 1.3.0
>
>
> We could allow users to provided (alternative) hashes for operators in a 
> StreamGraph. This can make migration between Flink versions easier, in case 
> the automatically produced hashes between versions are incompatible. For 
> example, users could just copy the old hashes from the web ui to their job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3117: [FLINK-5480] Introduce user-provided hash for JobVertexes

2018-04-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3117
  
@ShashwatRastogi-Reflektion 

1. For each task to load from the savepoint S1:
* Determine the existing hash by searching for the task ID in the logs/UI
* Set the uidHash for that task to that value via 
`SingleOutputStreamOperator#setUidHash`

2. For each task:
* Set the uid to whatever value you wish in the future to use via 
`SingleOutputStreamOperator#setUid`

3. Resume the job from the savepoint S1.
4. Create a new savepoint S2, and remove all calls to `setUidHash`
5. Resume the job from the savepoint S2.


---


[jira] [Commented] (FLINK-8966) Port AvroExternalJarProgramITCase to flip6

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423978#comment-16423978
 ] 

ASF GitHub Bot commented on FLINK-8966:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5766
  
@tillrohrmann Could you take another look? I moved the blob upload into a 
separate method,


> Port AvroExternalJarProgramITCase to flip6
> --
>
> Key: FLINK-8966
> URL: https://issues.apache.org/jira/browse/FLINK-8966
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5766: [FLINK-8966][tests] Port AvroExternalJarProgramITCase to ...

2018-04-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5766
  
@tillrohrmann Could you take another look? I moved the blob upload into a 
separate method,


---


[jira] [Commented] (FLINK-8982) End-to-end test: Queryable state

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423973#comment-16423973
 ] 

ASF GitHub Bot commented on FLINK-8982:
---

GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/5807

[FLINK-8982][E2E Tests] Add test for known failure of queryable state

 ## What is the purpose of the change
Add an end-to-end test to verify that the changes that @kl0u introduced in 
https://github.com/apache/flink/pull/5691 fix a known issue with concurrent 
access to queryable state, by verifying that access to queryable state works as 
expected.

## Brief change log
- Add flink app with queryable state the continuously updates mapstate
- Add queryable state client that periodically queries map state
- Add end-to-end test that runs client against app and verifies that no 
unexpected exceptions occur
- Integrate end-to-end test in testsuite

## Verifying this change
This change added tests and can be verified as follows:
- Run `./run-pre-commit-tests.sh`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
end-to-end-tests-for-queryable-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5807


commit a94b06728c25df9a5c71a484f49cfabb36eb1460
Author: Florian Schmidt 
Date:   2018-03-13T13:13:08Z

[FLINK-8982][E2E Tests] Add test for known failure of queryable state




> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread florianschmidt1994
GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/5807

[FLINK-8982][E2E Tests] Add test for known failure of queryable state

 ## What is the purpose of the change
Add an end-to-end test to verify that the changes that @kl0u introduced in 
https://github.com/apache/flink/pull/5691 fix a known issue with concurrent 
access to queryable state, by verifying that access to queryable state works as 
expected.

## Brief change log
- Add flink app with queryable state the continuously updates mapstate
- Add queryable state client that periodically queries map state
- Add end-to-end test that runs client against app and verifies that no 
unexpected exceptions occur
- Integrate end-to-end test in testsuite

## Verifying this change
This change added tests and can be verified as follows:
- Run `./run-pre-commit-tests.sh`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
end-to-end-tests-for-queryable-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5807


commit a94b06728c25df9a5c71a484f49cfabb36eb1460
Author: Florian Schmidt 
Date:   2018-03-13T13:13:08Z

[FLINK-8982][E2E Tests] Add test for known failure of queryable state




---


[jira] [Commented] (FLINK-9094) AccumulatorLiveITCase unstable on Travis

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423961#comment-16423961
 ] 

ASF GitHub Bot commented on FLINK-9094:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5771
  
Thanks for the review @zentol. Merging this PR.


> AccumulatorLiveITCase unstable on Travis
> 
>
> Key: FLINK-9094
> URL: https://issues.apache.org/jira/browse/FLINK-9094
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> {{AccumulatorLiveITCase}} unstable on Travis.
> https://api.travis-ci.org/v3/job/358509206/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423962#comment-16423962
 ] 

ASF GitHub Bot commented on FLINK-6567:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5782
  
Thanks for the review @zentol. Merging this PR.


> ExecutionGraphMetricsTest fails on Windows CI
> -
>
> Key: FLINK-6567
> URL: https://issues.apache.org/jira/browse/FLINK-6567
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on 
> AppVeyor. It also very rarely failed for me locally.
> The test fails at Line 235 if the RUNNING timestamp is equal to the 
> RESTARTING timestamp, which may happen when combining a fast test with a low 
> resolution clock.
> A simple fix would be to increase the timestamp between RUNNING and 
> RESTARTING by adding a 50ms sleep timeout into the 
> {{TestingRestartStrategy#canRestart()}} method, as this one is called before 
> transitioning to the RESTARTING state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5782: [FLINK-6567] [tests] Harden ExecutionGraphMetricsTest

2018-04-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5782
  
Thanks for the review @zentol. Merging this PR.


---


[GitHub] flink issue #5771: [FLINK-9094] [tests] Harden AccumulatorLiveITCase

2018-04-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5771
  
Thanks for the review @zentol. Merging this PR.


---


[jira] [Updated] (FLINK-8835) Fix TaskManager config keys

2018-04-03 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-8835:

Description: 
Many new config keys in the TaskManager don't follow the proper naming scheme. 
We need to clear those up before the release. I would also suggest to keep the 
key names short, because that makes it easier for users.

When doing this cleanup pass over the config keys, I would suggest to also make 
some of the existing keys more hierarchical harmonize them with the common 
scheme in Flink.

h1. New Keys

* {{taskmanager.network.credit-based-flow-control.enabled}} to 
{{taskmanager.network.credit-model}}.

h1. Existing Keys

* {{taskmanager.debug.memory.startLogThread}} => 
{{taskmanager.debug.memory.log}}

* {{taskmanager.debug.memory.logIntervalMs}} => 
{{taskmanager.debug.memory.log-interval}}

* {{taskmanager.initial-registration-pause}} => 
{{taskmanager.registration.initial-backoff}}

* {{taskmanager.max-registration-pause}} => 
{{taskmanager.registration.max-backoff}}

* {{taskmanager.refused-registration-pause}} 
{{taskmanager.registration.refused-backoff}}

* {{taskmanager.maxRegistrationDuration}} ==> * 
{{taskmanager.registration.timeout}}


  was:
Many new config keys in the TaskManager don't follow the proper naming scheme. 
We need to clear those up before the release. I would also suggest to keep the 
key names short, because that makes it easier for users.

When doing this cleanup pass over the config keys, I would suggest to also make 
some of the existing keys more hierarchical harmonize them with the common 
scheme in Flink.

h1. New Keys

* {{taskmanager.network.credit-based-flow-control.enabled}} to 
{{taskmanager.network.credit-model}}.

* {{taskmanager.exactly-once.blocking.data.enabled}} to 
{{task.checkpoint.alignment.blocking}} (we already have 
{{task.checkpoint.alignment.max-size}})

h1. Existing Keys

* {{taskmanager.debug.memory.startLogThread}} => 
{{taskmanager.debug.memory.log}}

* {{taskmanager.debug.memory.logIntervalMs}} => 
{{taskmanager.debug.memory.log-interval}}

* {{taskmanager.initial-registration-pause}} => 
{{taskmanager.registration.initial-backoff}}

* {{taskmanager.max-registration-pause}} => 
{{taskmanager.registration.max-backoff}}

* {{taskmanager.refused-registration-pause}} 
{{taskmanager.registration.refused-backoff}}

* {{taskmanager.maxRegistrationDuration}} ==> * 
{{taskmanager.registration.timeout}}



> Fix TaskManager config keys
> ---
>
> Key: FLINK-8835
> URL: https://issues.apache.org/jira/browse/FLINK-8835
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Blocker
>  Labels: easy-fix
> Fix For: 1.5.0
>
>
> Many new config keys in the TaskManager don't follow the proper naming 
> scheme. We need to clear those up before the release. I would also suggest to 
> keep the key names short, because that makes it easier for users.
> When doing this cleanup pass over the config keys, I would suggest to also 
> make some of the existing keys more hierarchical harmonize them with the 
> common scheme in Flink.
> h1. New Keys
> * {{taskmanager.network.credit-based-flow-control.enabled}} to 
> {{taskmanager.network.credit-model}}.
> h1. Existing Keys
> * {{taskmanager.debug.memory.startLogThread}} => 
> {{taskmanager.debug.memory.log}}
> * {{taskmanager.debug.memory.logIntervalMs}} => 
> {{taskmanager.debug.memory.log-interval}}
> * {{taskmanager.initial-registration-pause}} => 
> {{taskmanager.registration.initial-backoff}}
> * {{taskmanager.max-registration-pause}} => 
> {{taskmanager.registration.max-backoff}}
> * {{taskmanager.refused-registration-pause}} 
> {{taskmanager.registration.refused-backoff}}
> * {{taskmanager.maxRegistrationDuration}} ==> * 
> {{taskmanager.registration.timeout}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >