[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2017-10-06 Thread Vishnu Viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16195159#comment-16195159
 ] 

Vishnu Viswanath commented on FLINK-3655:
-

was looking for this feature. why wasn't this ever merged?

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2017-09-26 Thread Vishnu Viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180517#comment-16180517
 ] 

Vishnu Viswanath commented on FLINK-4660:
-

in which version is this fixed? I am using 1.3.1 and getting similar exception 
when reading input split from S3.
{code}
2017-09-26 08:47:27,220 INFO  
org.apache.flink.api.common.io.LocatableInputSplitAssigner- Assigning 
remote split to host ip-10-150-98-185
2017-09-26 08:47:27,344 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at ...Job$$anonfun$main$4$$anonfun$apply$3.apply(Job.scala:138) 
(org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
..sources.SourceSelector$.selectSource(SourceSelector.scala:17)) -> Map 
(from: ) (6/8) (df8e44219270f80170e6d027b77b246f) switched from RUNNING to 
FAILED.
com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout 
waiting for connection from pool
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:972)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4137)
at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1346)
at 
io.grhodes.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:72)
at 
io.grhodes.hadoop.fs.s3a.S3AInputStream.openIfNeeded(S3AInputStream.java:43)
at io.grhodes.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:137)
at java.io.DataInputStream.read(DataInputStream.java:149)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:669)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:490)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting 
for connection from pool
at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
at com.amazonaws.http.conn.$Proxy16.get(Unknown Source)
at 
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at 
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at 
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at 
com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1115)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
... 19 more
2017-09-26 08:47:27,345 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
Job_at_09/26/2017_08:44:08 (74a0b9f0eab746705ad88817849e5c4b) switched from 
state RUNNING to FAILING.
com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout 
waiting for connection from pool
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:972)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 

[jira] [Created] (FLINK-7299) Write GenericRecord using AvroOutputFormat

2017-07-30 Thread Vishnu Viswanath (JIRA)
Vishnu Viswanath created FLINK-7299:
---

 Summary: Write GenericRecord using AvroOutputFormat
 Key: FLINK-7299
 URL: https://issues.apache.org/jira/browse/FLINK-7299
 Project: Flink
  Issue Type: Improvement
  Components: Batch Connectors and Input/Output Formats
Reporter: Vishnu Viswanath
Assignee: Vishnu Viswanath
Priority: Minor


Allow AvroOutputFormat to write GenericRecords



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6726) Allow setting Timers in ProcessWindowFunction

2017-05-31 Thread Vishnu Viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16031263#comment-16031263
 ] 

Vishnu Viswanath commented on FLINK-6726:
-

Hi [~aljoscha],
I have implemented the changes and had to do the following:
 - Created a new timer service(userTimerService) in WindowOperator because the 
Triggerable tied to the InternalTimerService is the WindowOperator.
 - Created a new Triggerable class inside WindowOperator which will be tied to 
the userTimerService
 - Added onTimer function to interface InternalWindowFunction so that new 
Triggerable in WindowOperator can invoke it once the timer fires.

Does this sound ok? Have to add tests, will create a PR once that is done.

> Allow setting Timers in ProcessWindowFunction
> -
>
> Key: FLINK-6726
> URL: https://issues.apache.org/jira/browse/FLINK-6726
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Vishnu Viswanath
>Assignee: Vishnu Viswanath
>Priority: Minor
>
> Allow registration of timers in ProcessWindowFunction.
> {code}
> public abstract void registerEventTimeTimer(long time);
> public abstract void registerProcessingTimeTimer(long time);
> {code}
> This is based on one of the use case that I have, where I need to register an 
> EventTimeTimer that will clean the elements in the Window State based on some 
> condition. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6726) Allow setting Timers in ProcessWindowFunction

2017-05-25 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath updated FLINK-6726:

Description: 
Allow registration of timers in ProcessWindowFunction.

{code}
public abstract void registerEventTimeTimer(long time);
public abstract void registerProcessingTimeTimer(long time);
{code}

This is based on one of the use case that I have, where I need to register an 
EventTimeTimer that will clean the elements in the Window State based on some 
condition. 

  was:
Allow registration of timers in ProcessWindowFunction.

{code}
public abstract void registerEventTimeTimer(long time);
public abstract void registerProcessingTimeTimer(long time);
{code}

This is based on one of the use case that I have, where I need to register an 
EventTimeTimer that will clean the elements in it's state based on some 
condition. 


> Allow setting Timers in ProcessWindowFunction
> -
>
> Key: FLINK-6726
> URL: https://issues.apache.org/jira/browse/FLINK-6726
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>Priority: Minor
>
> Allow registration of timers in ProcessWindowFunction.
> {code}
> public abstract void registerEventTimeTimer(long time);
> public abstract void registerProcessingTimeTimer(long time);
> {code}
> This is based on one of the use case that I have, where I need to register an 
> EventTimeTimer that will clean the elements in the Window State based on some 
> condition. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6726) Allow setting Timers in ProcessWindowFunction

2017-05-25 Thread vishnu viswanath (JIRA)
vishnu viswanath created FLINK-6726:
---

 Summary: Allow setting Timers in ProcessWindowFunction
 Key: FLINK-6726
 URL: https://issues.apache.org/jira/browse/FLINK-6726
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: vishnu viswanath
Assignee: vishnu viswanath
Priority: Minor


Allow registration of timers in ProcessWindowFunction.

{code}
public abstract void registerEventTimeTimer(long time);
public abstract void registerProcessingTimeTimer(long time);
{code}

This is based on one of the use case that I have, where I need to register an 
EventTimeTimer that will clean the elements in it's state based on some 
condition. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2017-05-11 Thread vishnu viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006987#comment-16006987
 ] 

vishnu viswanath commented on FLINK-5053:
-

is this task complete, I see all the pull requests for the subtasks are merged. 
If the work is complete, I would like to try this out :). Have been waiting for 
Incremental checkpoint for one of the tasks that I am working on. 

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Xiaogang Shi
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.
> One problem in the suggested approach is still that even checkpoints should 
> support scale-down, in case that only a smaller number of instances is left 
> available in a recovery case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6372) change-scala-version.sh does not change version for flink-gelly-examples

2017-04-24 Thread vishnu viswanath (JIRA)
vishnu viswanath created FLINK-6372:
---

 Summary: change-scala-version.sh does not change version for 
flink-gelly-examples
 Key: FLINK-6372
 URL: https://issues.apache.org/jira/browse/FLINK-6372
 Project: Flink
  Issue Type: Bug
Reporter: vishnu viswanath
Assignee: vishnu viswanath


change-scala-version.sh does not change the version for flink-gelly-examples in 
bin.xml. This is causing build to fail if using scala 2.11, since its looking 
for flink-gelly-examples_2.10



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4369) EvictingWindowOperator Must Actually Evict Elements

2016-11-15 Thread vishnu viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667309#comment-15667309
 ] 

vishnu viswanath commented on FLINK-4369:
-

Fixed by [FLINK-4174]

> EvictingWindowOperator Must Actually Evict Elements
> ---
>
> Key: FLINK-4369
> URL: https://issues.apache.org/jira/browse/FLINK-4369
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: vishnu viswanath
>Priority: Blocker
>
> {{EvictingWindowOperator}} does not actually remove evicted elements from the 
> state. They are only filtered from the Iterable that is given to the 
> WindowFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath closed FLINK-4174.
---
Resolution: Fixed

> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4369) EvictingWindowOperator Must Actually Evict Elements

2016-11-15 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath closed FLINK-4369.
---
Resolution: Fixed

> EvictingWindowOperator Must Actually Evict Elements
> ---
>
> Key: FLINK-4369
> URL: https://issues.apache.org/jira/browse/FLINK-4369
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: vishnu viswanath
>Priority: Blocker
>
> {{EvictingWindowOperator}} does not actually remove evicted elements from the 
> state. They are only filtered from the Iterable that is given to the 
> WindowFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4174) Enhance Window Evictor

2016-10-17 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath reassigned FLINK-4174:
---

Assignee: vishnu viswanath

> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4174) Enhance Window Evictor

2016-07-25 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath updated FLINK-4174:

Description: 
Enhance the current functionality of Evictor as per this [design 
document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].

This includes:
- Allow eviction of elements from the window in any order (not only from the 
beginning). To do this Evictor must go through the list of elements and remove 
the elements that have to be evicted instead of the current approach of : 
returning the count of elements to be removed from beginning.
- Allow eviction to be done before/after applying the window function.

FLIP page for this enhancement : 
[FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]





  was:
Enhance the current functionality of Evictor as per this [design 
document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].

This includes:
- Allow eviction of elements from the window in any order (not only from the 
beginning). To do this Evictor must go through the list of elements and remove 
the elements that have to be evicted instead of the current approach of : 
returning the count of elements to be removed from beginning.
- Allow eviction to be done before/after applying the window function.






> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4174) Enhance Window Evictor

2016-07-07 Thread vishnu viswanath (JIRA)
vishnu viswanath created FLINK-4174:
---

 Summary: Enhance Window Evictor
 Key: FLINK-4174
 URL: https://issues.apache.org/jira/browse/FLINK-4174
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: vishnu viswanath


Enhance the current functionality of Evictor as per this [design 
document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].

This includes:
- Allow eviction of elements from the window in any order (not only from the 
beginning). To do this Evictor must go through the list of elements and remove 
the elements that have to be evicted instead of the current approach of : 
returning the count of elements to be removed from beginning.
- Allow eviction to be done before/after applying the window function.







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-06-23 Thread vishnu viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347222#comment-15347222
 ] 

vishnu viswanath commented on FLINK-3962:
-

I am getting this exception in Local execution aswell. (from IDE or when run 
through `flink run`)

{code}
17:16:38,095 ERROR org.apache.flink.metrics.reporter.JMXReporter
 - A metric with the name 
org.apache.flink.metrics:key0=localhost,key1=taskmanager,key2=f7bccd0c64bc9c3643647913eced7104,key3=Streaming_Job,key4=Map,key5=0,name=numRecordsIn
 was already registered.
javax.management.InstanceAlreadyExistsException: 
org.apache.flink.metrics:key0=localhost,key1=taskmanager,key2=f7bccd0c64bc9c3643647913eced7104,key3=Streaming_Job,key4=Map,key5=0,name=numRecordsIn
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:109)
at 
org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:194)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:150)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:141)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:235)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
{code}

> JMXReporter doesn't properly register/deregister metrics
> 
>
> Key: FLINK-3962
> URL: https://issues.apache.org/jira/browse/FLINK-3962
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> The following fails our Yarn tests because it checks for errors in the 
> jobmanager/taskmanager logs:
> {noformat}
> 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
>