[GitHub] flink issue #3416: [FLINK-5918] [runtime] port range support for config task...

2017-04-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3416
  
@zentol Has this been merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3749: Typo sick -> sink

2017-04-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3749
  
Good fix, funny typo ;-)

Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3747: [FLINK-5623] [runtime] Fix TempBarrier dam has been close...

2017-04-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3747
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3648: [FLINK-6217] ContaineredTaskManagerParameters sets off-he...

2017-04-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3648
  
Good fix, thanks @haohui 

I was wondering - I am a trying to advocate fewer dependencies in Flink 
(there is always the problem of shading and conflicts) so if there is a way to 
do this without Guava, that'd be great. I think `commons-lang3` has a joiner as 
well, or we might just add a joiner to Flink...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3704: [FLINK-5756] Replace RocksDB dependency with FRocksDB

2017-04-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3704
  
@hsaputra There is a plan to do that and we are in touch with the RocksDB 
folks. The latest RocksDB master does not work for Flink though, currently, so 
we needed a custom backport to an earlier RocksDB version...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3747: [FLINK-5623] [runtime] Fix TempBarrier dam has been close...

2017-04-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3747
  
+1 to this fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

2017-04-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3741
  
Looks quite good now.

If I can ask you for one more followup: To have faster tests, it would be 
good to add the streaming distributed cache test and the batch distributed 
cache test to the same file.

Can you change the `DistributedCacheTest` to extend 
`StreamingMultipleProgramsTestBase` and put your test in there as well? That 
will cause only one distributed mini cluster to be spawned, which typically 
saves 1-2 secs in tests. May not seem much, but it adds up over the 1000s of 
tests Flink has by now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3701: [FLINK-6280] [scripts] Allow logging with Java flags

2017-04-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3701
  
Okay, got it. The docs and examples did help a lot.

This looks good to me, +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...

2017-04-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3598
  
Thanks, I'll take it from here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3598
  
I would do the following:
  - log nothing
  - Catch the errors that are regular "move failed" exceptions and return 
false.
 - `FileNotFoundException`
 - `DirectoryNotEmptyException`
 - `SecurityException`
  - Let all other `IOExceptions` bubble out



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Looks good, thank you!
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3738
  
The `shardConsumersExecutor` variable is final. No need to make it 
`volatile`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3722: [FLINK-5646] Document JAR upload with the REST API

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3722
  
Looks good, thanks.
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic Yes, that is correct. @zentol's suggestion should work.

On access, if the `JobStatus` is suspended, remove the entry from the 
`WeakHashMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3703
  
We cannot add any other dependencies to the pom files. Adding "akka" back 
will create a conflict with the "flakka" files.

What we can do is wither of the following two options:
  - Release "flakka" for Scala 2.12 and then we need to change nothing in 
Flink. The flakka code is at https://github.com/mxm/flakka - we can do the 
release, you could help up by checking out what needs to be done to use flakka 
with Scala 2.12 (if it is at all possible)

  - See if we can pull out the dependency as a property ans use "flakka" 
int the Scala 2.10 and 2.11 case and use vanilla akka 2.4 in the 
java8/scala2.12 case. That would be a lot of Maven hacking, though - if 
possible, I would prefer the first variant (less complexity in Flink).

We can also not add more Travis build profiles (builds take too long 
already). We need to keep that number as it is and simply select one of these 
profiles to use Scala 2.12 rather than for example 2.10.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3703
  
One thing you can try and do is to run 
`TypeExtractionUtils.checkAndExtractLambda` to see if it is a generated 
serializable Lambda.
In the case of a Lambda, you could switch to a different code path 
(possibly not clean anything in the first version).

@twalthr may have some thoughts on that as well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3701: [FLINK-6280] [scripts] Allow logging with Java flags

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3701
  
Can you explain a bit more what this change does exactly? Does it make sure 
that if you put bash commands into the java opts to compute flags, it evaluates 
those?

Can you elaborate why this needs a change in the log rotation logic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3727: [FLINK-6312]update curator version to 2.12.0 to avoid pot...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3727
  
Good idea to upgrade Curator.
Unfortunately, it seems some behavior in Curator has changed. The change 
causes many tests to hang/fail:

https://travis-ci.org/apache/flink/builds/22291

I think this would need to be stabilized before we can merge the fix. Can 
you look into this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3736
  
A few more comments on this:

  - Guava is so conflict heavy that should avoid using it in the framework 
wherever possible. Adding a multiple MBs dependency to write 
`Lists.newArrayList` rather than `new ArrayList<>()` also seems a bit much. The 
use of the `Joiner` can also be replaced with a `StringBuilder`.

  - You can drop the `logback-test.xml` - I think this is an uneeded 
artifact that some other modules still have for historic reasons.

  - Would be good to also make sure this is put into the `opt` folder in 
the build target, so it can be easily dropped into the `lib` folder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3738
  
Looks good. To make this really safe, we should actually make the 
`mainThread` variable `volatile`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3741
  
Thanks for contributing this, the added functionality looks good.

I would prefer to add this change without changing the dependencies and 
test base classes. You could for example change the test to throw an exception 
in the "validator function" if the word is not in the cache file. That way you 
do not need to "collect back" the data.

Minor comment: Generating the input from a collection rather than a file 
makes the tests usually a bit more lightweight. In all newer tests, we try to 
do that.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3704: [FLINK-5756] Replace RocksDB dependency with FRocksDB

2017-04-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3704
  
@SyinChwunLeo We will try and contribute the patch to RocksDB and will also 
soon try and move to a newer RocksDB version, as soon as its Java API works 
again for the required functions. The RocksDB folks mentioned that the next 
release of RocksDB is quite soon and should fix that.

That will hopefully address the issue. Until then, we cannot upgrade, 
unfortunately :-(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...

2017-04-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
I am currently travelling and then attending Flink Forward. Will come back 
to this after that.

Quick feedback:
  - I am still thinking that the random suffix breaks the original idea of 
the cached blobs.
  - The blob manager counts references to files and does not delete them as 
long as someone has a reference. That prevents deletion if multiple parties 
work with the same jar.
  - Properly handling rename and add reference in one lock, as well as 
de-reference and delete in the same lock should fix it, I think
  - The blob manager needs to make sure it has an exclusive directory, so 
that no other process accesses the files. But I think that is the case already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3563: [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast...

2017-04-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3563
  
I think cherry-picking this into the `release-1.2` branch would be nice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2753: [FLINK-4840] [metrics] Measure latency of record processi...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2753
  
A gentle ping about how to proceed here...
Are you interested in pursuing another implementation approach, or should 
we close this as "fix later"?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108671962
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
+
+   /**
+* Register the state in the registry
+*
+* @param key The key of the state to register
+* @param state The state to register
+*/
+   public void register(String key, StateObject state) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
+
+   if (stateAndRefCnt == null) {
+   registeredStates.put(key, new Tuple2<>(state, 1));
+   } else {
+   if (!stateAndRefCnt.f0.equals(state)) {
--- End diff --

This is a nice idea, but I think that `equals()` on state objects does not 
always work very well. On `ByteStreamHandle`, it is a bit expensive (compare 
all bytes, which may be up to a few megabytes) and on `FileStreamHandle` it 
assumes that the paths are normalized (upper/lower case, slashes, authority or 
no authority, ...). That's why I suggested to have a dedicated key. Then we 
only requite the key to be "normalized" so it behaved well for `equals()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108671268
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
--- End diff --

I think we can be a bit more efficient by having a `StateObjectAndCount` 
class, rather than a tuple 2. Safes the reference for the Integer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108670950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
--- End diff --

I think we don't need to make the registry serializable. In one of my next 
changes, the `CompletedCheckpoint` is not serializable any more, and then the 
registry can also be non serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108670845
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
+
+   /**
+* Register the state in the registry
+*
+* @param key The key of the state to register
+* @param state The state to register
+*/
+   public void register(String key, StateObject state) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
+
+   if (stateAndRefCnt == null) {
+   registeredStates.put(key, new Tuple2<>(state, 1));
+   } else {
+   if (!stateAndRefCnt.f0.equals(state)) {
+   throw new IllegalStateException("Cannot 
register a key with different states.");
+   }
+
+   stateAndRefCnt.f1++;
+   }
+   }
+
+   /**
+* Decrease the reference count of the state in the registry
+*
+* @param key The key of the state to unregister
+*/
+   public void unregister(String key) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
+
+   if (stateAndRefCnt == null) {
+   throw new IllegalStateException("Cannot unregister an 
unexisted state.");
+   }
+
+   stateAndRefCnt.f1--;
+
+   // Remove the state from the registry when it's not referenced 
any more.
+   if (stateAndRefCnt.f1 == 0) {
+   registeredStates.remove(key);
+   discardedStates.add(stateAndRefCnt.f0);
+   }
+   }
+
+   /**
+* Register all the shared states in the given state handles.
+* 
+* @param stateHandles The state handles to register their shared states
+*/
+   public void registerAll(Collection 
stateHandles) {
+   synchronized (this) {
--- End diff --

It is good practice to use a dedicated lock object, because someone outside 
may hold a lock on `this`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108670700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
+
+   /**
+* Register the state in the registry
+*
+* @param key The key of the state to register
+* @param state The state to register
+*/
+   public void register(String key, StateObject state) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
+
+   if (stateAndRefCnt == null) {
+   registeredStates.put(key, new Tuple2<>(state, 1));
+   } else {
+   if (!stateAndRefCnt.f0.equals(state)) {
+   throw new IllegalStateException("Cannot 
register a key with different states.");
+   }
+
+   stateAndRefCnt.f1++;
+   }
+   }
+
+   /**
+* Decrease the reference count of the state in the registry
+*
+* @param key The key of the state to unregister
+*/
+   public void unregister(String key) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
--- End diff --

This method misses synchronization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108671445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
--- End diff --

I think we can drop list reuse here - this would make it safer. Currently 
the list is returned outside locked scope and can be concurrently used, which 
results in errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108670677
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
+
+   /**
+* Register the state in the registry
+*
+* @param key The key of the state to register
+* @param state The state to register
+*/
+   public void register(String key, StateObject state) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
--- End diff --

This method misses synchronization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108670567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
+
+   /**
+* Register the state in the registry
+*
+* @param key The key of the state to register
+* @param state The state to register
+*/
+   public void register(String key, StateObject state) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
+
+   if (stateAndRefCnt == null) {
+   registeredStates.put(key, new Tuple2<>(state, 1));
+   } else {
+   if (!stateAndRefCnt.f0.equals(state)) {
+   throw new IllegalStateException("Cannot 
register a key with different states.");
+   }
+
+   stateAndRefCnt.f1++;
+   }
+   }
+
+   /**
+* Decrease the reference count of the state in the registry
+*
+* @param key The key of the state to unregister
+*/
+   public void unregister(String key) {
+   Tuple2 stateAndRefCnt = 
registeredStates.get(key);
+
+   if (stateAndRefCnt == null) {
+   throw new IllegalStateException("Cannot unregister an 
unexisted state.");
+   }
+
+   stateAndRefCnt.f1--;
+
+   // Remove the state from the registry when it's not referenced 
any more.
+   if (stateAndRefCnt.f1 == 0) {
+   registeredStates.remove(key);
+   discardedStates.add(stateAndRefCnt.f0);
+   }
+   }
+
+   /**
+* Register all the shared states in the given state handles.
+* 
+* @param stateHandles The state handles to register their shared states
+*/
+   public void registerAll(Collection 
stateHandles) {
+   synchronized (this) {
+   if (stateHandles != null) {
+   for (CompositeStateHandle stateHandle : 
stateHandles) {
+   stateHandle.register(this);
+   }
+   }
+   }
+   }
+   
+   /**
+* Register all the shared states in the given state handle.
+* 
+* @param stateHandle The state handle to register its shared states
+*/
+   public void registerAll(CompositeStateHandle stateHandle) {
+   if (stateHandle != null) {
+   synchronized (this) {
+   stateHandle.register(this);
+   }
+   }
+   }
+
+   /**
+* Unregister all the shared states in the given state handles and 
ret

[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3598
  
I think we don't need the logging in all of the cases. We typically avoid 
logging in "utility functions", which are missing the context if the exception 
is in fact a problem worth mentioning, of if it only adds noise to the log.

Other than that, I think its good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3563: [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3563
  
@greghogan I think you can go ahead and merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3632: [FLINK-6176] [scripts] Add JARs to CLASSPATH deterministi...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3632
  
Looks like a good change. Do we want the same deterministic class path 
order also when launching Yarn and Mesos containers?

/cc @rmetzger @EronWright 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable and Sta...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks, I will try and look at this over the next week.
It is quite a big change, so I might need a bit of time to review it 
properly...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3642: SignalWindow

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3642
  
I think this will not work. Windows are keys in hash tables. Mutating a key 
while it is already in the hash table makes it effectively impossible to find 
or delete any more.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3639: Update TimeWindow.java

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3639
  
Windows are immutable and must be, as they are keys in hash tables.
I am also not sure how this code can possibly compile, as it tries to 
update a final field.

For changes to the implementation of the windowing logic, please post on 
the mailing list or in JIRA what you want to solve and how you want to solve 
it. Because the logic is complex, we cannot simply merge changes without a good 
design discussion first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3640: [FLINK-6213] [yarn] terminate resource manager itself whe...

2017-03-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3640
  
Good idea to add the poison pill. But does it actually work when the poison 
pill is decorated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable and Sta...

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks for this refactoring. I had only a quick look so far, but one 
thought was that this patch could probably be simplified by making 
`triggerCheckpoint()` (and restore / notifyComplete) not abstract in the 
`AbstractInvokable`. Instead let it throw the `UnsupportedOperationException`.

That way, we avoid refactoring all the test classes to implement those 
methods (which would be nice).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3610: [FLINK-6183]/[FLINK-6184] Prevent some NPE and unc...

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3610#discussion_r108478998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -80,8 +80,17 @@ public TaskMetricGroup addTask(
taskName,
subtaskIndex,
attemptNumber);
-   tasks.put(executionAttemptID, task);
-   return task;
+   TaskMetricGroup prior = 
tasks.put(executionAttemptID, task);
+   if (prior == null) {
+   return task;
--- End diff --

Can you avoid adding `closeLocally()` by simply doing a "contains()" check 
before creating the group?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3627: Release 0.4 alpha.0

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3627
  
Can you close this PR?
Looks like this is some confusion...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3600
  
I am a bit confused here, this seems to make things more complicated.

Before, if you wanted to use ZK and Kerberos, you only added `zookeeper` to 
`security.kerberos.login.contexts`. Now you need additionally to set 
`zookeeper.sasl.disable` to `false`?

Why don't we keep `zookeeper.sasl.disable` as `false` by default? If I 
understand it correctly, it is anyways only ever relevant when the ZooKeeper 
login context has been enabled...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2252: [FLINK-3466] [runtime] Cancel state handled on state rest...

2017-03-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2252
  
I think it is yes. We worked around it in the meantime...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-25 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r108034229
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -64,6 +70,22 @@ public void init() throws Exception {
super.init();
}
 
+   @Override
+   protected boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
+   LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
+
+   synchronized (getCheckpointLock()) {
+   if (isRunning()) {
+   dataChannel.put(new Either.Right(new 
CheckpointBarrier(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), checkpointOptions)));
+   
getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointMetrics);
--- End diff --

Can the `IterationTailTask` contain operators as well, or is it always a 
task without operators? If it has operators, we cannot immediately acknowledge 
here, but need to delegate to superclass checkpoint method instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3602: [FLINK-5715] Asynchronous snapshots for heap keyed...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3602#discussion_r107931282
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 ---
@@ -115,6 +117,14 @@ public static void stopTestCluster() {
@Before
public void initStateBackend() throws IOException {
switch (stateBackendEnum) {
+   case MEM_ASYNC:
+   this.stateBackend = new 
AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE);
+   break;
+   case FILE_ASYNC: {
+   String backups = 
tempFolder.newFolder().getAbsolutePath();
+   this.stateBackend = new 
AsyncFsStateBackend("file://" + backups);
--- End diff --

To make this work cross platform, always use `file.toUri()` or `new 
Path(file.toUri())`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3600
  
@EronWright @vijikarthi If you find the time, what do you think about this 
patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3600
  
Yes, CI seems to be unstable.

Can you try to push again to the branch (maybe change the commit message, 
force push then) to retrigger the CI?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3599: [FLINK-6174][HA]introduce a new election service to make ...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3599
  
I would suggest to fix this the following way:

  - There is an upcoming patch that makes the Flink codebase use the 
`HighAvailabilityServices` properly in all places.
  - We introduce a new HA mode called `yarnsimple` or so (next to `none` 
and `zookeeper`) and instantiate a new implementation of 
`HighAvailabilityServices` which is ZooKeeper independent.
  - The new implementation of the High Availability Services does not use 
ZooKeeper. It uses a leader service that always grants the JobManager 
leadership, but also implements a way for TaskManagers to find the JobManager 
(to be seen how, possibly a file in HDFS or so). It also implements a ZooKeeper 
independent CompletedCheckpointStore that finds checkpoints by maintaining a 
file with completed checkpoints.

That is all not a "proper" HA setup - it only works as long as there is 
strictly only one master
But it comes close and is ZooKeeper independent.

Is that what you are looking for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3598: [FLINK-6103] LocalFileSystem rename() uses File.re...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3598#discussion_r107925602
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -262,8 +265,13 @@ public FSDataOutputStream create(
public boolean rename(final Path src, final Path dst) throws 
IOException {
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);
-
-   return srcFile.renameTo(dstFile);
+   //Files.move fails if the destination directory doesn't exist
+   if(dstFile.getParentFile()!= null && 
!dstFile.getParentFile().exists()){
+   
Files.createDirectories(dstFile.getParentFile().toPath());
+   }
+   //this throws an IOException if any error occurs
+   Files.move(srcFile.toPath(), dstFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
--- End diff --

I think that is inherited from the interface that is also used by the 
HdfsFileSystem. The outcomes are `true`(rename success), `false` (rename 
failed), and `IOExeption` (communication with filesystem failed).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3605: [FLINK-6181][Start scripts] Fix regex in start scr...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3605#discussion_r107923309
  
--- Diff: tools/travis_mvn_watchdog.sh ---
@@ -164,7 +164,7 @@ watchdog () {
 
 # Check the final fat jar for illegal artifacts
 check_shaded_artifacts() {
-   jar tf build-target/lib/flink-dist-*.jar > allClasses
--- End diff --

I assume with the "-" this only matches something like 
`flink-dist-1.2-SNAPSHOT` or so and with the fix, it also matched 
`flink-dist.jar`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3598: [FLINK-6103] LocalFileSystem rename() uses File.re...

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3598#discussion_r107863788
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -262,8 +265,13 @@ public FSDataOutputStream create(
public boolean rename(final Path src, final Path dst) throws 
IOException {
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);
-
-   return srcFile.renameTo(dstFile);
+   //Files.move fails if the destination directory doesn't exist
+   if(dstFile.getParentFile()!= null && 
!dstFile.getParentFile().exists()){
+   
Files.createDirectories(dstFile.getParentFile().toPath());
+   }
+   //this throws an IOException if any error occurs
+   Files.move(srcFile.toPath(), dstFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
--- End diff --

If we want to preserve the old semantics, we should catch the exception and 
return `false` in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3583: [FLINK-6043] [web] Display exception timestamp

2017-03-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3583
  
Can we remove the `errorTimestamp` from the `notifyStateTransition`? I 
think it obfuscates the meaning a bit...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3599: [FLINK-6174][HA]introduce a new election service to make ...

2017-03-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3599
  
-1 sorry.

This needs to go to the drawing board (FLIP or detailed JIRA discussion) 
before we consider a change that is impacting the guarantees and failure mode 
so heavily.

Some initial comments:

  - In proper HA, you need some service that "locks" the leader, otherwise 
you are vulnerable to the "split brain" problem where a network partition makes 
multiple JobManagers work as leaders, each with some TaskManagers.

  - In FLIP-6, we are introducing the `HighAvailabilityServices` to allow 
for multiple levels of guarantees with different implementations. I can see 
that introducing a highly-available but not split-brain-protected is 
interesting, but it should not replace any existing mode, but be a new mode.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3515: [FLINK-5890] [gelly] GatherSumApply broken when object re...

2017-03-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3515
  
It is a +1 :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3592: [FLINK-5977] Rename MAX_ATTEMPTS_HISTORY_SIZE key

2017-03-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3592
  
Thanks for the pull request.
This has actually already been fixed in 
https://github.com/apache/flink/commit/81a143f6b42cf39d56a36222d14b5db0cc54addb

The commit also retains the old key as a deprecated parameter for backwards 
compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3588: [FLINK-6144] [config] Port JobManager configuration optio...

2017-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3588
  
Good change, +1

merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3506: [FLINK-6000] Fix starting HA cluster with start-cluster.s...

2017-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3506
  
Good change, merging this for `1.2.1` and `1.3`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3540: [FLINK-6056] [build]apache-rat exclude flink directory in...

2017-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3540
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3556: [FLINK-6084][Cassandra] Promote transitive dependencies

2017-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3556
  
Looks good to me, +1 to merge for both `master` and `release-1.2`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3567: [FLINK-6107] Add custom checkstyle for flink-streaming-ja...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3567
  
It would be awesome if we could make IntelliJ pick up the style config
automatically. By  committing some files under ".Idea", we might be able to
do that...

On Mar 20, 2017 11:29 PM, "Greg Hogan"  wrote:

> @aljoscha <https://github.com/aljoscha> should we host the new checkstyle
> under tools/maven/ alongside the existing checkstyle? There is already a
> ticket (FLINK-6137) to add a custom checkstyle to flink-cep and I don't 
see
> any of these rules being module-specific.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/flink/pull/3567#issuecomment-287918204>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/ABpaqivIvxg5mrILAz5PQxGwg64Rc021ks5rnv3YgaJpZM4MhxzT>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3539: Flip1: fine gained recovery

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3539
  
@shuai-xu Had a look at the changes. In general, I like the decoupling of 
failover handling from the execution graph.

Some suggestions:
  - How about we rename the `FailoverCoordinator` to `FailoverStrategy`?

  - We should probably implement one very simple failover strategy (restart 
all) and one better one (failover region based). That has the advantage of 
having a default implementation that is simple and robust (same as now) for all 
conservative users.

  - I would suggest to let commands like `fail()` and `cancel()` that go 
directly to the `ExecutionGraph` stay on the execution graph. They are meant to 
apply the whole graph and not be specially handled. The failover coordinator 
could only handle failures from individual tasks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
Looks good, thanks. Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3567: [FLINK-6107] Add custom checkstyle for flink-streaming-ja...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3567
  
Can you describe what rules this style actually enforces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3540: [FLINK-6056] [build]apache-rat exclude flink directory in...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3540
  
@shijinkui Got it, thanks.

+1 to merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3563: [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3563
  
Nice, looks like a compact fix. 
With the explanation about test coverage, +1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
Can you give me some background why you want to make also 
`PendingCheckpoint` register its state immediately (and not only upon 
completion)?
I see no problem with that, just want to double check whether we are 
changing the assumption from the original design doc, where you suggested that 
shared state can only be referenced by another checkpoint, if it is already 
part of a committed checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3535: [FLINK-5713] Protect against NPE in WindowOperator window...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3535
  
I can not really assess what this change is doing in detail, meaning how it 
affects typical work loads. Would need a bit more context.

The change looks small an innocent and seems to have decent tests ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
@shixiaogang Thanks for the fast response!

Can y In the initial design document, you suggest that shared state is only 
s


I think what we need is a subclass of `StateHandle` that is a 
`SharedStateHandle`. I would suggest that the shared state handle has a method 
`String getKey()` (or `Object getKey()`) which gives the unique identifier of 
the shared state. The `SharedStateRegistry` internally could use something like 
a `Map`. I think that would give us a bit 
more flexibility in how we describe "equality of shared state": we don't need 
to make sure that the state handles themselves implement `equals()` such that 
it meets the semantics of the shared state registry. In the case we have 
currently, the `getKey()` method could return the normalized path of the file.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
I think we should then fix this in the blob server.

The problem that only one should succeed upon collision should be fixable 
by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store 
the file in the blob store.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106717641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

How about calling it `ROW` and `ROW_NAMED` or so? I think just adding 
another parameter is hacky...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
One more question: Can the StateRegistry not directly drop states that have 
no reference any more when states are unregistered? Is there a special reason 
for first collecting these states in a list, then getting them and then 
dropping them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
Thanks for opening this pull request. Adding a `CompositeStateHandle` and a 
`StateRegistry` is a good idea.

Some thoughts:

  - What do you think about making the `StateRegistry` into a 
`SharedStateRegistry` which only contains the handles to state that is shared 
across checkpoints? State that is exclusive to a checkpoint is not handled by 
that registry, but remains only in the checkpoint. That way we "isolate" the 
existing behavior against the coming changes and do not risk regressions in the 
state cleanup code (which is very critical for current users).

  - Another reason for the above suggestion is to also bring some other 
code into place that has some "fast paths" and "safety nets" for checkpoint 
cleanups (currently only with non-shared state), for example dropping a 
checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 
). We have seen that for various users the state cleanup problems are among the 
biggest problems they have, which we can address very well with the work 
started in the above linked pull request. These things would work together 
seamlessly if the registry deals only with shared state handles.

  - I am wondering if it is easier to put the registry into the checkpoint 
coordinator rather than the checkpoint stores. That way we need the code that 
deals with adding / failure handling / etc only once.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3537: [FLINK-6050] [robustness] Register exception handler on t...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3537
  
I think this is good, +1

Do we have a test that validates that completing a `Future` exceptionally 
also completes all result Futures of `thenApply` (or `thenApplyAsync`) 
functions with an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
Actually, do you think you could add a test for this? Would be good to 
guard that for the future...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3526: [FLINK-5999] [resMgnr] Move JobLeaderIdService shut down ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3526
  
Looks good to me, +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674262
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
-   // it would be replaced by any 
exception thrown in the finally block
-   IOUtils.closeQuietly(

[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677990
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
+   if (!localFile.delete()) {
+   LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+   }
+   }
+
+   blobStore.delete(jobId, key);
+   }
+
+   /**
+* Deletes all files associated with the given job id from the storage.
+*
+* @param jobId JobID of the files in the blob store
+*/
+   @Override
+   public void deleteAll(final JobID jobId) {
+   checkArgument(jobId != null, "Job id must not be null.");
+
+   try {
+   BlobUtils.deleteJobDirectory(storageDir, jobId);
+   } catch (IOException e) {
--- End diff --

If we want to make sure we cleanup in any case, we can actually catch 
`Exception` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
-   // it would be replaced by any 
exception thrown in the finally block
-   IOUtils.closeQuietly(

[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106675058
  
--- Diff: docs/setup/config.md ---
@@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the 
root dir under which the ZooKeeper HA mode will create namespace directories. 
Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in 
standalone cluster mode, or the  under YARN) Defines the 
subdirectory under the root dir where the ZooKeeper HA mode will create znodes. 
This allows to isolate multiple applications on the same ZooKeeper. Previously 
this key was named `recovery.zookeeper.path.namespace`.
+- `high-availability.cluster-id`: (Default `/default_ns` in standalone 
cluster mode, or the  under YARN) Defines the subdirectory 
under the root dir where the ZooKeeper HA mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper. Previously this key was 
named `recovery.zookeeper.path.namespace` and 
`high-availability.zookeeper.path.namespace`.
--- End diff --

I would move these into ` ### High Availability (HA)` section, because they 
are independent of ZooKeeper


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106680476
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1305,6 +1305,9 @@ class TaskManager(
 s"${task.getExecutionState} to JobManager for task 
${task.getTaskInfo.getTaskName} " +
 s"(${task.getExecutionId})")
 
+  // delete all NAME_ADDRESSABLE BLOBs
+  libraryCacheManager.get.getBlobService.deleteAll(task.getJobID)
--- End diff --

Multiple tasks of the same job run in a TaskManager. This means that tasks 
delete each others blobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
--- End diff --

From concurrency safety, it better to do `if (!delete && exists)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
+1, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
I don't quite understand the issue. Currently, the name should exactly 
match the hash to make sure that each library is stored only once. Adding a 
random suffix exactly destroys that behavior.

In the case where multiple clients upload the same jar to *different* 
clusters, it should not be a problem, if they use different storage directories 
(which they should definitely do).

In the case where multiple clients upload the same jar to the *same* 
cluster, the first rename from tmp to file will succeed. The second rename from 
tmp to file will fail, but that's not a problem, because the file already 
exists with the same contents, and the client can assume success.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106664092
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

Can you explain what the manifest is needed for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3543: [FLINK-5985] [Backport for 1.2] Report no task states for...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3543
  
Looks good.

+1 for merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106644720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

Out of curiosity: Why do you need the manifest? I think you don't need it 
as you don't reference `m` anywhere...

Also, I think the common way of doing would be:
```scala
def ROW[T: Manifest](types: TypeInformation[_]*) = { ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Found one more small concern (inline comment)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Looks good to me, +1

@twalthr @fhueske Any concerns about merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3392: [FLINK-5883] Re-adding the Exception-thrown code for List...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3392
  
+1
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Looks good, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
@aljoscha Can you share 
https://github.com/aljoscha/flink/tree/jira-6018-state-init-fixups with 
@tzulitai who is looking into serializer upgrade paths?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
This change is probably uncritical:
  - It seems this code path was never executed (by chance) because the 
`RuntimeContext` pre-initializes state descriptors, and all the operators 
directly supply serializers to the descriptors.
  - As @aljoscha mentioned, in the current case or the KryoSerializer, 
adding more configuration is not a problem.

Okay, +1, this seems safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3478: Flink 4816 Executions failed from "DEPLOYING" should reta...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3478
  
This change currently fails all CI build. The main reason is assuming that 
checkpointing is always active.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3394: [FLINK-5810] [flip6] Introduce a hardened slot manager

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3394
  
I have some suggested edits on top of this (not strictly tied to the 
changes here) in that commit: 
https://github.com/StephanEwen/incubator-flink/commit/910de21972992d0ed80f232f3a64bf107c6819f2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3394: [FLINK-5810] [flip6] Introduce a hardened slot man...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3394#discussion_r106507099
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -21,519 +21,897 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
- * 
- * The main operation principle of SlotManager is:
- * 
- * 1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.
- * 2. All slots that have registered must be tracked, either by free 
pool or allocated pool.
- * 3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
- * holds.
- * 
- * IMPORTANT: This class is Not Thread-safe.
+ * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
+ * are not enough slots available the slot manager will notify the 
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
+ * release and failure, respectively.
  */
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.

[GitHub] flink pull request #3394: [FLINK-5810] [flip6] Introduce a hardened slot man...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3394#discussion_r106514810
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -21,519 +21,897 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
- * 
- * The main operation principle of SlotManager is:
- * 
- * 1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.
- * 2. All slots that have registered must be tracked, either by free 
pool or allocated pool.
- * 3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
- * holds.
- * 
- * IMPORTANT: This class is Not Thread-safe.
+ * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
+ * are not enough slots available the slot manager will notify the 
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
+ * release and failure, respectively.
  */
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.

[GitHub] flink pull request #3394: [FLINK-5810] [flip6] Introduce a hardened slot man...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3394#discussion_r106515494
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -21,519 +21,897 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
- * 
- * The main operation principle of SlotManager is:
- * 
- * 1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.
- * 2. All slots that have registered must be tracked, either by free 
pool or allocated pool.
- * 3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
- * holds.
- * 
- * IMPORTANT: This class is Not Thread-safe.
+ * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
+ * are not enough slots available the slot manager will notify the 
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
+ * release and failure, respectively.
  */
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.

[GitHub] flink pull request #3394: [FLINK-5810] [flip6] Introduce a hardened slot man...

2017-03-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3394#discussion_r106508907
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -21,519 +21,897 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
- * 
- * The main operation principle of SlotManager is:
- * 
- * 1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.
- * 2. All slots that have registered must be tracked, either by free 
pool or allocated pool.
- * 3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
- * holds.
- * 
- * IMPORTANT: This class is Not Thread-safe.
+ * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
+ * are not enough slots available the slot manager will notify the 
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
+ * release and failure, respectively.
  */
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.

<    5   6   7   8   9   10   11   12   13   14   >