[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84035347
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -424,13 +481,17 @@ public String toString() {
 
ResultPartitionID partitionId;
 
+   String secureCookie = "";
+
public TaskEventRequest() {
}
 
-   TaskEventRequest(TaskEvent event, ResultPartitionID 
partitionId, InputChannelID receiverId) {
+   TaskEventRequest(TaskEvent event, ResultPartitionID 
partitionId, InputChannelID receiverId,
+   String secureCookie) {
this.event = event;
this.receiverId = receiverId;
this.partitionId = partitionId;
+   this.secureCookie = (secureCookie == null || 
secureCookie.length() == 0) ? "": secureCookie;
--- End diff --

Please pass a proper non-null secureCookie instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84027778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -725,7 +755,21 @@ else if (response != RETURN_OKAY) {
Object msg = 
JobManagerMessages.getRequestBlobManagerPort();
Future futureBlobPort = jobManager.ask(msg, 
askTimeout);
 
+   Object secureCookieMsg = 
JobManagerMessages.getRequestBlobManagerSecureCookie();
+   Future futureSecureCookie = 
jobManager.ask(secureCookieMsg, askTimeout);
+
try {
+   String secureCookie = null;
+
+   Object cookie = 
Await.result(futureSecureCookie, askTimeout);
+   if(cookie instanceof String) {
+   secureCookie = (String) cookie;
+   }
--- End diff --

We are transferring the cookie here from the JobManager? That should never 
be the case. The client has to provide the cookie, otherwise a client must not 
be able to communicate with the JobManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84033606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -57,24 +61,37 @@
// constructor in order to work with the generic deserializer.
// 

 
-   static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic 
number (4), msg ID (1)
+   static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), 
magic number (4), Cookie (4), msg ID (1)
 
static final int MAGIC_NUMBER = 0xBADC0FFE;
 
+   static final int BUFFER_SIZE = 65536;
+
+   static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
 
abstract void readFrom(ByteBuf buffer) throws Exception;
 
// 

 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie) {
+   return allocateBuffer(allocator, id, secureCookie, 0);
}
 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie, int length) {
+   secureCookie = (secureCookie == null || secureCookie.length() 
== 0) ? "": secureCookie;
+   length+=secureCookie.getBytes().length;
final ByteBuf buffer = length != 0 ? 
allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
buffer.writeInt(HEADER_LENGTH + length);
buffer.writeInt(MAGIC_NUMBER);
+
+   buffer.writeInt(secureCookie.length());
--- End diff --

Here you're writing `cookie.length()` but early you write calculate with 
`secureCookie.getBytes().length`. I think this causes issues with non ASCII 
chars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84041460
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -108,6 +111,11 @@
 
private final Options ALL_OPTIONS;
 
+   private static final String fileName = "yarn-app.ini";
+   private static final String cookieKey = "secureCookie";
--- End diff --

Please see Yarn properties which should be used for this purpose: 
`writeYarnProperties(..)`.

Please no new configuration file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84035425
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -492,19 +553,22 @@ public void readFrom(ByteBuf buffer) {
 
InputChannelID receiverId;
 
+   String secureCookie = "";
--- End diff --

The `""` initialization is always overridden, please remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84039884
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 ---
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.configuration.ConfigConstants;
--- End diff --

Unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84029858
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 ---
@@ -199,6 +199,11 @@ public int getBlobServerPort() {
return blobService.getPort();
}
 
+   public String getSecureCookie() {
+   return blobService.getSecureCookie() == null
+   ? "": blobService.getSecureCookie();
--- End diff --

I feel like this logic should be delegated to the blobService. Also, this 
might be clearer:
```java
String secureCookie = blobService.getSecureCookie();
return secureCookie != null ? secureCookie : ""
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84035178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -369,21 +423,24 @@ void readFrom(ByteBuf buffer) throws Exception {
 
InputChannelID receiverId;
 
+   String secureCookie = "";
--- End diff --

The `""` initialization is always overridden, please remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84033402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -57,24 +61,37 @@
// constructor in order to work with the generic deserializer.
// 

 
-   static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic 
number (4), msg ID (1)
+   static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), 
magic number (4), Cookie (4), msg ID (1)
 
static final int MAGIC_NUMBER = 0xBADC0FFE;
 
+   static final int BUFFER_SIZE = 65536;
+
+   static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
 
abstract void readFrom(ByteBuf buffer) throws Exception;
 
// 

 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie) {
+   return allocateBuffer(allocator, id, secureCookie, 0);
}
 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie, int length) {
+   secureCookie = (secureCookie == null || secureCookie.length() 
== 0) ? "": secureCookie;
+   length+=secureCookie.getBytes().length;
--- End diff --

This seems inefficient, always getting the byte array. The length should be 
supplied and just be calculated once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84034881
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -369,21 +423,24 @@ void readFrom(ByteBuf buffer) throws Exception {
 
InputChannelID receiverId;
 
+   String secureCookie = "";
+
public PartitionRequest() {
}
 
-   PartitionRequest(ResultPartitionID partitionId, int queueIndex, 
InputChannelID receiverId) {
+   PartitionRequest(ResultPartitionID partitionId, int queueIndex, 
InputChannelID receiverId, String secureCookie) {
this.partitionId = partitionId;
this.queueIndex = queueIndex;
this.receiverId = receiverId;
+   this.secureCookie = (secureCookie == null || 
secureCookie.length() == 0) ? "": secureCookie;
--- End diff --

These checks should not be necessary. Please remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84029158
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -393,6 +399,24 @@ static void copyFromRecoveryPath(String recoveryPath, 
File localBlobFile) throws
}
 
/**
+* Utility method to validate secure cookie from Flink configuration 
instance
+* @throws
+*/
+   public static String validateAndGetSecureCookie(Configuration 
configuration) {
+   String secureCookie = null;
+   if(configuration.getBoolean(ConfigConstants.SECURITY_ENABLED, 
DEFAULT_SECURITY_ENABLED) == true) {
+   secureCookie = 
configuration.getString(ConfigConstants.SECURITY_COOKIE, null);
+   if(secureCookie == null) {
+   String message = "Missing " + 
ConfigConstants.SECURITY_COOKIE +
+   " configuration in Flink 
configuration file";
+   LOG.error(message);
+   throw new RuntimeException(message);
--- End diff --

- Exceptions are logged anyways.
- This should be `IllegalConfigurationException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84035615
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -527,13 +591,17 @@ void readFrom(ByteBuf buffer) throws Exception {
static class CloseRequest extends NettyMessage {
 
private static final byte ID = 5;
+   String secureCookie = "";
--- End diff --

Initialization can never used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84034660
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -226,8 +277,9 @@ ByteBuf write(ByteBufAllocator allocator) throws 
IOException {
int length = 16 + 4 + 1 + 4 + buffer.getSize();
 
ByteBuf result = null;
+   final String NO_SECURE_COOKIE= "";
--- End diff --

space after variable name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84036664
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
 ---
@@ -30,16 +30,29 @@
 
private final NettyMessageEncoder messageEncoder = new 
NettyMessageEncoder();
 
-   private final NettyMessage.NettyMessageDecoder messageDecoder = new 
NettyMessage.NettyMessageDecoder();
+   private final NettyMessage.NettyMessageDecoder serverMessageDecoder;
+
+   private final NettyMessage.NettyMessageDecoder clientMessageDecoder;
 
private final ResultPartitionProvider partitionProvider;
private final TaskEventDispatcher taskEventDispatcher;
private final NetworkBufferPool networkbufferPool;
+   private final String secureCookie;
 
-   PartitionRequestProtocol(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) {
+   PartitionRequestProtocol(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher,
+   NetworkBufferPool 
networkbufferPool, String secureCookie) {
this.partitionProvider = partitionProvider;
this.taskEventDispatcher = taskEventDispatcher;
this.networkbufferPool = networkbufferPool;
+   this.secureCookie = (secureCookie == null || 
secureCookie.length() == 0) ? "": secureCookie;
+
+   serverMessageDecoder = new 
NettyMessage.NettyMessageDecoder(secureCookie);
+
+   /*
+* Client decoder does not validate the secure cookie from 
server since
+* the server protocol does not transmit the secure cookie on 
the wire
+*/
+   clientMessageDecoder = new 
NettyMessage.NettyMessageDecoder(null);
--- End diff --

But it still sends an empty cookie?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84026759
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -272,4 +284,8 @@ private void closeSilently(Closeable closeable) {
}
}
}
+
+   /* Secure cookie to authenticate */
--- End diff --

> authorize


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84028789
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
 ---
@@ -53,6 +53,9 @@
/** Internal code to identify a reference via jobId as the key */
static final byte JOB_ID_SCOPE = 2;
 
+   /** The maximum length of secure cookie. */
+   static final int MAX_LENGTH_SECURE_COOKIE = 1024;
--- End diff --

Shouldn't this be tied to the buffer size which limits the cookie length?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84032194
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -57,24 +61,37 @@
// constructor in order to work with the generic deserializer.
// 

 
-   static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic 
number (4), msg ID (1)
+   static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), 
magic number (4), Cookie (4), msg ID (1)
--- End diff --

4 bytes for the cookie or cookie **length**? How is this in line with the 
size restrictions assumed in other places? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84035467
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -492,19 +553,22 @@ public void readFrom(ByteBuf buffer) {
 
InputChannelID receiverId;
 
+   String secureCookie = "";
+
public CancelPartitionRequest() {
}
 
-   public CancelPartitionRequest(InputChannelID receiverId) {
+   public CancelPartitionRequest(InputChannelID receiverId, String 
secureCookie) {
this.receiverId = receiverId;
+   this.secureCookie = (secureCookie == null || 
secureCookie.length() == 0) ? "": secureCookie;
--- End diff --

Please pass a proper initialized cookie instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84041338
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -108,6 +111,11 @@
 
private final Options ALL_OPTIONS;
 
+   private static final String fileName = "yarn-app.ini";
+   private static final String cookieKey = "secureCookie";
+
+   private final Option SECURE_COOKIE_OPTION;
--- End diff --

This option can be retrieved from the main options. No need for yarn 
specific option. Please remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84039282
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1008,6 +1008,9 @@ class JobManager(
 case RequestBlobManagerPort =>
   sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
 
+case RequestBlobManagerSecureCookie =>
+  sender ! decorateMessage(libraryCacheManager.getSecureCookie)
--- End diff --

That is not a good idea for security! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84020724
  
--- Diff: docs/setup/cli.md ---
@@ -217,6 +217,8 @@ Action "run" compiles and runs a program.
 
java.net.URLClassLoader}.
  -d,--detached  If present, runs the 
job in
 detached mode
+ -k,--cookie  Secure cookie to
+authenticate
--- End diff --

The description could be a bit more elaborate. For example,

> String to authorize Akka-based RPC communication



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r84030548
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 ---
@@ -91,6 +94,14 @@ public NettyConfig(
 
this.config = checkNotNull(config);
 
+   boolean security = 
config.getBoolean(ConfigConstants.SECURITY_ENABLED, false);
+   this.secureCookie = 
config.getString(ConfigConstants.SECURITY_COOKIE, "");
+
+   if(security && this.secureCookie == "") {
+   LOG.error("Security is enabled but secure cookie is not 
provided");
+   throw new IllegalConfigurationException("Security is 
enabled but secure cookie is not provided");
--- End diff --

Exceptions are logged anyways. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-19 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2618
  
Could you please elaborate on the changes you made to the 
`ContinuousFileReaderOperator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2660: [FLINK-4833] properly log exceptions in CountMinHe...

2016-10-19 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2660

[FLINK-4833] properly log exceptions in CountMinHeavyHitter

This logs the underlying exception properly which could help us to find the 
exact cause of the reported problems.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4833

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2660


commit d117c59e704e9faad8d5001cb9b2164cd4aa7b9a
Author: Maximilian Michels 
Date:   2016-10-19T13:50:15Z

[FLINK-4833] properly log exceptions in CountMinHeavyHitter




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2659: [FLINK-4857] Remove throws clause from ZooKeeperUtils fun...

2016-10-19 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2659
  
+1 LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2662
  
Thanks for the PR @greghogan! Having a custom exception for missing 
arguments to a user program is a good approach. However, it requires the author 
of the program to use the custom exception. At least, we would have to adapt 
all the included examples. Additionally, it would be nice to throw another 
custom exception when no Flink job was generated during execution of the jar 
(which might be because of missing arguments). Currently, we simply throw a 
`ProgramInvocationException` which could look like a serious error to the user 
when merely arguments are missing. 

So +1 but we might do some follow-ups to fully solve the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84241679
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -20,11 +20,19 @@
 
 if [ "$1" = "jobmanager" ]; then
 echo "Starting Job Manager"
-sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
`hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml
+#sed -i -e "s/jobmanager.rpc.address: 
localhost/jobmanager.rpc.address: `hostname -f`/g" 
$FLINK_HOME/conf/flink-conf.yaml
+
+# make use of container linking and exploit the jobmanager entry in 
/etc/hosts
+sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml
+
 sed -i -e "s/taskmanager.numberOfTaskSlots: 
1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" 
$FLINK_HOME/conf/flink-conf.yaml
--- End diff --

This line has to go to the `taskmanager` section. Before, it didn't really 
matter because the config was shared but now this setting will just be 
configured for the `JobManager` when, in fact, it is only used by the 
`TaskManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242416
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
 depends_on:
   - jobmanager
 command: taskmanager
-volumes_from:
-  - jobmanager:ro
+links:
--- End diff --

`links` are now a legacy feature of Docker 1.9.0 but probably fine to stick 
with it for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242735
  
--- Diff: flink-contrib/docker-flink/docker-compose.sh ---
@@ -0,0 +1,4 @@
+#!/bin/sh
--- End diff --

Could we name this file `bluemix-docker-compose.sh`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2657
  
This doesn't compile currently. Do you prefer if I review the PRs 
individually or review the commits in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2618
  
Thanks for updating the description. Let take a look at the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242539
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

What are these ports needed for? The TaskManager will always initiate the 
connection to the JobManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84285533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+   /**
+* Creates a {@link RichFileInputSplit} based on the file modification 
time and
+* the rest of the information of the {@link FileInputSplit}, as 
returned by the
+* underlying filesystem.
+*
+* @param modificationTime  the modification file of the file this 
split belongs to
+* @param split the rest of the information about the split
+*/
+   public RichFileInputSplit(long modificationTime, FileInputSplit split) {
--- End diff --

Not sure about this constructor. I think I'd prefer something spelling out 
the parameters. This also avoids to create a regular FileInputSplit every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288480
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
--- End diff --

I think you can drop the type parameter here since you don't gain any type 
safety from the parameter. It is never used in any argument which would make it 
meaningful. Instead just use `Serializable` for the state type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84280372
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -199,44 +196,39 @@ public void close() throws Exception {
private final Object checkpointLock;
private final SourceFunction.SourceContext readerContext;
 
-   private final Queue pendingSplits;
-
-   private FileInputSplit currentSplit = null;
+   private final Queue> pendingSplits;
 
-   private S restoredFormatState = null;
+   private RichFileInputSplit currentSplit;
 
-   private volatile boolean isSplitOpen = false;
+   private volatile boolean isSplitOpen;
 
private SplitReader(FileInputFormat format,
TypeSerializer serializer,
SourceFunction.SourceContext 
readerContext,
Object checkpointLock,
-   Tuple3, 
FileInputSplit, S> restoredState) {
+   List> 
restoredState) {
 
this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-   this.pendingSplits = new ArrayDeque<>();
this.isRunning = true;
 
-   // this is the case where a task recovers from a 
previous failed attempt
-   if (restoredState != null) {
-   List pending = restoredState.f0;
-   FileInputSplit current = restoredState.f1;
-   S formatState = restoredState.f2;
-
-   for (FileInputSplit split : pending) {
-   pendingSplits.add(split);
+   this.pendingSplits = new PriorityQueue<>(100, new 
Comparator>() {
--- End diff --

Why did you choose 100 as the initial size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84285924
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
--- End diff --

The name rich :) I'd be happy if we could find another name. Rich doesn't 
really mean anything. How about `TimestampedFileInputSplit`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288567
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
--- End diff --

```java
private Serializable splitState;
```
should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288776
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+   /**
+* Creates a {@link RichFileInputSplit} based on the file modification 
time and
+* the rest of the information of the {@link FileInputSplit}, as 
returned by the
+* underlying filesystem.
+*
+* @param modificationTime  the modification file of the file this 
split belongs to
+* @param split the rest of the information about the split
+*/
+   public RichFileInputSplit(long modificationTime, FileInputSplit split) {
+   this(modificationTime,
+   split.getSplitNumber(),
+   split.getPath(),
+   split.getStart(),
+   split.getLength(),
+   split.getHostnames());
+   }
+
+   /**
+* Constructor with the raw split information.
+*
+* @param modificationTime the modification file of the file this split 
belongs to
+* @param numthe number of this input split
+* @param file   the file name
+* @param start  the position of the first byte in the file to process
+* @param length the number of bytes in the file to process (-1 is flag 
for "read whole file")
+* @param hosts  the list of hosts containing the block, possibly 
null
+*/
+   private RichFileInputSplit(long modificationTime, int num, Path file, 
long start, long length, String[] hosts) {
+   super(num, file, start, length, hosts);
+
+   Preconditions.checkArgument(modificationTime >= 0 || 
modificationTime == Long.MIN_VALUE,
+   "Invalid File Split Modification Time: "+ 
modificationTime +".");
+
+   this.modificationTime = modificationTime;
+   }
+
+   /**
+* Sets the state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* 
+* This is applicable to {@link 
org.apache.flink.api.common.io.FileInputFormat FileInputFormats}
+* that implement the {@link 
org.apache.flink.api.common.io.CheckpointableInputFormat
+* CheckpointableInputFormat} interface.
+* */
+   public void setSplitState(S state) {
+   this.splitState = state;
+   }

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84284953
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RichFileInputSplitTest.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.RichFileInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RichFileInputSplitTest {
+
+   @Test
+   public void testSplitEquality() {
+
+   RichFileInputSplit eos1 = RichFileInputSplit.EOS;
+   RichFileInputSplit eos2 = RichFileInputSplit.EOS;
+
+   Assert.assertEquals(eos1, eos2);
+
+   FileInputSplit firstSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, 
firstSplit);
+   Assert.assertNotEquals(eos1, richFirstSplit);
+   Assert.assertNotEquals(richFirstSplit, firstSplit);
+
+   FileInputSplit secondSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, 
secondSplit);
+   Assert.assertEquals(richFirstSplit, richSecondSplit);
+   Assert.assertNotEquals(richFirstSplit, firstSplit);
+
+   FileInputSplit modSecondSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richModSecondSplit = new 
RichFileInputSplit(11, modSecondSplit);
+   Assert.assertNotEquals(richSecondSplit, richModSecondSplit);
+
+   FileInputSplit thirdSplit = new FileInputSplit(2, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, 
thirdSplit);
+   Assert.assertEquals(richThirdSplit.getModificationTime(), 10);
+   Assert.assertNotEquals(richFirstSplit, richThirdSplit);
+
+   FileInputSplit thirdSplitCopy = new FileInputSplit(2, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richThirdSplitCopy = new 
RichFileInputSplit(10, thirdSplitCopy);
+   Assert.assertEquals(richThirdSplitCopy, richThirdSplit);
+   }
+
+   @Test
+   public void testSplitComparison() {
+   FileInputSplit firstSplit = new FileInputSplit(3, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, 
firstSplit);
+
+   FileInputSplit secondSplit = new FileInputSplit(2, new 
Path("test/test2"), 0, 100, null);
+   RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, 
secondSplit);
+
+   FileInputSplit thirdSplit = new FileInputSplit(1, new 
Path("test/test2"), 0, 100, null);
+   RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, 
thirdSplit);
+
+   FileInputSplit forthSplit = new FileInputSplit(0, new 
Path("test/test3"), 0, 100, null);
+   RichFileInputSplit richForthSplit = new RichFileInputSplit(11, 
forthSplit);
+
+   // lexicographically on the path order
+   Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 
0);
+   Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+   // same mod time, same file so smaller split number first
+   Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 
0);
+
+   // smaller modification time first
+   Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+   }
+
+   @Test
+   public void testIllegalArgument() {
+   try {
+

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84281147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
--- End diff --

Is it really necessary to have this special split? Couldn't you just have a 
`reader.stop()` method which stops the reader after the current split has been 
processed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84279968
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -189,7 +186,7 @@ public void close() throws Exception {
output.close();
}
 
-   private class SplitReader extends Thread {
+   private final class SplitReader extends Thread {
--- End diff --

Making private classes final is not really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84290827
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

Sure, makes sense since those ports are not reachable by TaskManagers 
running in different containers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288975
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -347,34 +328,17 @@ public void run() {
}
}
 
-   private Tuple3, FileInputSplit, S> 
getReaderState() throws IOException {
-   List snapshot = new 
ArrayList<>(this.pendingSplits.size());
-   for (FileInputSplit split: this.pendingSplits) {
-   snapshot.add(split);
-   }
-
-   // remove the current split from the list if inside.
-   if (this.currentSplit != null && 
this.currentSplit.equals(pendingSplits.peek())) {
-   this.pendingSplits.remove();
-   }
-
-   if (this.currentSplit != null) {
-   if (this.format instanceof 
CheckpointableInputFormat) {
-   @SuppressWarnings("unchecked")
-   
CheckpointableInputFormat checkpointableFormat =
-   
(CheckpointableInputFormat) this.format;
-
-   S formatState = this.isSplitOpen ?
-   
checkpointableFormat.getCurrentState() :
-   restoredFormatState;
-   return new Tuple3<>(snapshot, 
currentSplit, formatState);
-   } else {
-   LOG.info("The format does not support 
checkpointing. The current input split will be re-read from start upon 
recovery.");
-   return new Tuple3<>(snapshot, 
currentSplit, null);
+   private List> getReaderState() throws 
IOException {
+   List> snapshot = new 
ArrayList<>(this.pendingSplits.size());
+   if (currentSplit != null ) {
+   if (this.format instanceof 
CheckpointableInputFormat && this.isSplitOpen) {
+   S formatState = 
((CheckpointableInputFormat, S>) 
this.format).getCurrentState();
--- End diff --

```java
Serializable formatState = ((CheckpointableInputFormat) 
this.format).getCurrentState();
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84276796
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -43,16 +41,18 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
+import static 
org.apache.flink.streaming.api.functions.source.RichFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The operator that reads the {@link FileInputSplit splits} received from 
the preceding
+ * The operator that reads the {@link RichFileInputSplit splits} received 
from the preceding
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link 
ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
  * 
--- End diff --

Generic types are not documented in the JavaDoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2662: [FLINK-4824] [client] CliFrontend shows misleading...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2662#discussion_r84448442
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+/**
+ * Exception used to indicate that there is an error in the 
parametrization of a Flink program.
+ */
+public class ProgramParametrizationException extends RuntimeException {
+   /**
+* Serial version UID for serialization interoperability.
+*/
+   private static final long serialVersionUID = 909054589029890262L;
+
+   /**
+* Creates a ProgramParametrizationException.
+*/
+   public ProgramParametrizationException() {
+   super();
--- End diff --

I think we should not even allow to skip the message here. This will 
simplify the code further and display some message for the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2662: [FLINK-4824] [client] CliFrontend shows misleading...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2662#discussion_r84448541
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -975,6 +981,32 @@ private int handleArgException(Exception e) {
}
 
/**
+* Displays an optional exception message for incorrect program 
parametrization.
+*
+* @param e The exception to display.
+* @return The return code for the process.
+*/
+   private int 
handleParametrizationException(ProgramParametrizationException e) {
+   String message = e.getMessage();
+   if (message != null) {
+   System.err.println(message);
+   }
--- End diff --

This block could be removed if we got rid of the zero-args constructor in 
`ProgramParametrizationException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2671#discussion_r84464167
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 ---
@@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W 
window, TriggerContext ctx) t
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-   long timestamp = fireTimestamp.get();
-   ctx.deleteEventTimeTimer(timestamp);
-   fireTimestamp.clear();
+   Long timestamp = fireTimestamp.get();
+   if (timestamp != null) {
+   ctx.deleteEventTimeTimer(timestamp);
+   fireTimestamp.clear();
+   } else if (cachedFireTimestamp != null){
+   ctx.deleteEventTimeTimer(cachedFireTimestamp);
+   }
--- End diff --

The above `else if` block is not correct because there is only one instance 
of the trigger which is reused for each Window. Hence the abstraction using the 
state descriptor to retrieve the appropriate state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2671#discussion_r84464275
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 ---
@@ -45,6 +45,12 @@
private final ReducingStateDescriptor stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), 
LongSerializer.INSTANCE);
 
+   /**
+* Used to preserve the fire timestamp before merge such that
+* the corresponding timer could be cleared after merge
+*/
+   private Long cachedFireTimestamp = null;
--- End diff --

This doesn't work because there is only one `Trigger` instance and this 
will potentially be overwritten by many Windows being merged. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2671#discussion_r84465211
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 ---
@@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W 
window, TriggerContext ctx) t
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-   long timestamp = fireTimestamp.get();
-   ctx.deleteEventTimeTimer(timestamp);
-   fireTimestamp.clear();
+   Long timestamp = fireTimestamp.get();
+   if (timestamp != null) {
+   ctx.deleteEventTimeTimer(timestamp);
+   fireTimestamp.clear();
+   } else if (cachedFireTimestamp != null){
+   ctx.deleteEventTimeTimer(cachedFireTimestamp);
+   }
--- End diff --

I think we're fine with not doing anything when `timestamp == null`. The 
old timer won't influence the newly merged window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84467266
  
--- Diff: flink-contrib/docker-flink/bluemix-docker-compose.sh ---
@@ -0,0 +1,4 @@
+#!/bin/sh
--- End diff --

Adding a license file here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2671#discussion_r84465826
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 ---
@@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W 
window, TriggerContext ctx) t
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-   long timestamp = fireTimestamp.get();
-   ctx.deleteEventTimeTimer(timestamp);
-   fireTimestamp.clear();
+   Long timestamp = fireTimestamp.get();
+   if (timestamp != null) {
+   ctx.deleteEventTimeTimer(timestamp);
+   fireTimestamp.clear();
--- End diff --

The above looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2667: README.md - Description of the bluemix specif…

2016-10-21 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2667
  
Thank you for your contribution!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2671#discussion_r84463868
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 ---
@@ -99,8 +111,12 @@ public boolean canMerge() {
}
 
@Override
-   public TriggerResult onMerge(W window, OnMergeContext ctx) {
+   public TriggerResult onMerge(W window, OnMergeContext ctx) throws 
Exception {
ctx.mergePartitionedState(stateDesc);
+   Long nextFireTimestamp = 
ctx.getPartitionedState(stateDesc).get();
+   if (nextFireTimestamp != null) {
+   ctx.registerEventTimeTimer(nextFireTimestamp);
+   }
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84502118
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus 
finalStatus, final String op
}
 
// 

+   //  Testing methods
+   // 

+
+   /**
+* Gets the leader session id of current resourceManager.
+*
+* @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
+*/
+   @VisibleForTesting
+   UUID getLeaderSessionId() {
+   return leaderSessionId;
+   }
+
+   // 

+   //  Internal methods
+   // 

+
+   private void clearState() {
+   jobManagerRegistrations.clear();
+   taskExecutors.clear();
+   slotManager.clearState();
+
+   try {
+   jobLeaderIdService.clear();
+   } catch (Exception e) {
+   onFatalError(new ResourceManagerException("Could not 
properly clear the job leader id service.", e));
+   }
+
+   leaderSessionId = new UUID(0, 0);
+   }
+
+   /**
+* Disconnects the job manager which is connected for the given job 
from the resource manager.
+*
+* @param jobId identifying the job whose leader shall be disconnected
+*/
+   protected void disconnectJobManager(JobID jobId, Exception cause) {
+   JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.remove(jobId);
+
+   if (jobManagerRegistration != null) {
+   log.info("Disconnect job manager {}@{} for job {} from 
the resource manager.",
+   jobManagerRegistration.getLeaderID(),
+   
jobManagerRegistration.getJobManagerGateway().getAddress(),
+   jobId);
+
+   JobMasterGateway jobMasterGateway = 
jobManagerRegistration.getJobManagerGateway();
+
+   // tell the job manager about the disconnect
+   
jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(),
 getLeaderSessionId(), cause);
+   } else {
+   log.debug("There was no registered job manager for job 
{}.", jobId);
+   }
+   }
+
+   /**
+* Checks whether the given resource manager leader id is matching the 
current leader id.
+*
+* @param resourceManagerLeaderId to check
+* @return True if the given leader id matches the actual leader id; 
otherwise false
+*/
+   protected boolean isValid(UUID resourceManagerLeaderId) {
+   if (resourceManagerLeaderId == null) {
+   return leaderSessionId == null;
+   } else {
+   return resourceManagerLeaderId.equals(leaderSessionId);
+   }
+   }
+
+   protected void removeJob(JobID jobId) {
+   try {
+   jobLeaderIdService.removeJob(jobId);
+   } catch (Exception e) {
+   onFatalError(new ResourceManagerException("Could not 
remove job " + jobId + '.', e));
--- End diff --

This should not easily fail (e.g. closing a connection to Zookeeper throws 
an exception).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84501555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus 
finalStatus, final String op
}
 
// 

+   //  Testing methods
+   // 

+
+   /**
+* Gets the leader session id of current resourceManager.
+*
+* @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
+*/
+   @VisibleForTesting
+   UUID getLeaderSessionId() {
+   return leaderSessionId;
+   }
+
+   // 

+   //  Internal methods
+   // 

+
+   private void clearState() {
+   jobManagerRegistrations.clear();
+   taskExecutors.clear();
+   slotManager.clearState();
+
+   try {
+   jobLeaderIdService.clear();
+   } catch (Exception e) {
+   onFatalError(new ResourceManagerException("Could not 
properly clear the job leader id service.", e));
+   }
+
+   leaderSessionId = new UUID(0, 0);
--- End diff --

We probably want to set this to null again to work with the `isValid` 
method (if we want to support null values for UUIDs). I would rather not allow 
null values at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84500666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -202,101 +205,125 @@ public void shutDown() throws Exception {
//  RPC methods
// 

 
-   /**
-* Register a {@link JobMaster} at the resource manager.
-*
-* @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-* @param jobMasterAddressThe address of the JobMaster that 
registers
-* @param jobID   The Job ID of the JobMaster that 
registers
-* @return Future registration response
-*/
@RpcMethod
-   public Future registerJobMaster(
-   final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
-   final String jobMasterAddress, final JobID jobID) {
+   public Future registerJobManager(
+   final UUID resourceManagerLeaderId,
+   final UUID jobManagerLeaderId,
+   final String jobManagerAddress,
+   final JobID jobId) {
+
+   checkNotNull(resourceManagerLeaderId);
+   checkNotNull(jobManagerLeaderId);
+   checkNotNull(jobManagerAddress);
+   checkNotNull(jobId);
+
+   if (isValid(resourceManagerLeaderId)) {
+   if (!jobLeaderIdService.containsJob(jobId)) {
+   try {
+   jobLeaderIdService.addJob(jobId);
+   } catch (Exception e) {
+   // This should actually never happen 
because, it should always be possible to add a new job
+   ResourceManagerException exception = 
new ResourceManagerException("Could not add the job " +
+   jobId + " to the job id leader 
service. This should never happen.", e);
+
+   onFatalErrorAsync(exception);
+
+   log.debug("Could not add job {} to job 
leader id service.", jobId, e);
+   return 
FlinkCompletableFuture.completedExceptionally(exception);
+   }
+   }
 
-   checkNotNull(jobMasterAddress);
-   checkNotNull(jobID);
+   log.info("Registering job manager {}@{} for job {}.", 
jobManagerLeaderId, jobManagerAddress, jobId);
+
+   Future jobLeaderIdFuture;
 
-   // create a leader retriever in case it doesn't exist
-   final JobIdLeaderListener jobIdLeaderListener;
-   if (leaderListeners.containsKey(jobID)) {
-   jobIdLeaderListener = leaderListeners.get(jobID);
-   } else {
try {
-   LeaderRetrievalService jobMasterLeaderRetriever 
=
-   
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
-   jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+   jobLeaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
} catch (Exception e) {
-   log.warn("Failed to start 
JobMasterLeaderRetriever for job id {}", jobID, e);
+   // we cannot check the job leader id so let's 
fail
+   // TODO: Maybe it's also ok to skip this check 
in case that we cannot check the leader id
+   ResourceManagerException exception = new 
ResourceManagerException("Cannot obtain the " +
+   "job leader id future to verify the 
correct job leader.", e);
+
+   onFatalErrorAsync(exception);
--- End diff --

Declining seems ok in this case since the failure might be temporary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84500271
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -202,101 +205,125 @@ public void shutDown() throws Exception {
//  RPC methods
// 

 
-   /**
-* Register a {@link JobMaster} at the resource manager.
-*
-* @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-* @param jobMasterAddressThe address of the JobMaster that 
registers
-* @param jobID   The Job ID of the JobMaster that 
registers
-* @return Future registration response
-*/
@RpcMethod
-   public Future registerJobMaster(
-   final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
-   final String jobMasterAddress, final JobID jobID) {
+   public Future registerJobManager(
+   final UUID resourceManagerLeaderId,
+   final UUID jobManagerLeaderId,
+   final String jobManagerAddress,
+   final JobID jobId) {
+
+   checkNotNull(resourceManagerLeaderId);
+   checkNotNull(jobManagerLeaderId);
+   checkNotNull(jobManagerAddress);
+   checkNotNull(jobId);
+
+   if (isValid(resourceManagerLeaderId)) {
+   if (!jobLeaderIdService.containsJob(jobId)) {
+   try {
+   jobLeaderIdService.addJob(jobId);
+   } catch (Exception e) {
+   // This should actually never happen 
because, it should always be possible to add a new job
+   ResourceManagerException exception = 
new ResourceManagerException("Could not add the job " +
+   jobId + " to the job id leader 
service. This should never happen.", e);
+
+   onFatalErrorAsync(exception);
+
+   log.debug("Could not add job {} to job 
leader id service.", jobId, e);
--- End diff --

Should this be logged on `error` level?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2651: [FLINK-4847] Let RpcEndpoint.start/shutDown throw excepti...

2016-10-21 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2651
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84500048
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -202,101 +205,125 @@ public void shutDown() throws Exception {
//  RPC methods
// 

 
-   /**
-* Register a {@link JobMaster} at the resource manager.
-*
-* @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-* @param jobMasterAddressThe address of the JobMaster that 
registers
-* @param jobID   The Job ID of the JobMaster that 
registers
-* @return Future registration response
-*/
@RpcMethod
-   public Future registerJobMaster(
-   final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
-   final String jobMasterAddress, final JobID jobID) {
+   public Future registerJobManager(
+   final UUID resourceManagerLeaderId,
+   final UUID jobManagerLeaderId,
+   final String jobManagerAddress,
+   final JobID jobId) {
+
+   checkNotNull(resourceManagerLeaderId);
+   checkNotNull(jobManagerLeaderId);
+   checkNotNull(jobManagerAddress);
+   checkNotNull(jobId);
+
+   if (isValid(resourceManagerLeaderId)) {
+   if (!jobLeaderIdService.containsJob(jobId)) {
+   try {
+   jobLeaderIdService.addJob(jobId);
+   } catch (Exception e) {
+   // This should actually never happen 
because, it should always be possible to add a new job
+   ResourceManagerException exception = 
new ResourceManagerException("Could not add the job " +
--- End diff --

Actually, this might happen when the leader id service fails to start. It 
could be temporary and we might have to introduce some sort of retry rule here. 
Not in the scope of this PR though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84501742
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus 
finalStatus, final String op
}
 
// 

+   //  Testing methods
+   // 

+
+   /**
+* Gets the leader session id of current resourceManager.
+*
+* @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
+*/
+   @VisibleForTesting
+   UUID getLeaderSessionId() {
+   return leaderSessionId;
+   }
+
+   // 

+   //  Internal methods
+   // 

+
+   private void clearState() {
+   jobManagerRegistrations.clear();
+   taskExecutors.clear();
+   slotManager.clearState();
+
+   try {
+   jobLeaderIdService.clear();
+   } catch (Exception e) {
+   onFatalError(new ResourceManagerException("Could not 
properly clear the job leader id service.", e));
+   }
+
+   leaderSessionId = new UUID(0, 0);
+   }
+
+   /**
+* Disconnects the job manager which is connected for the given job 
from the resource manager.
+*
+* @param jobId identifying the job whose leader shall be disconnected
+*/
+   protected void disconnectJobManager(JobID jobId, Exception cause) {
+   JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.remove(jobId);
+
+   if (jobManagerRegistration != null) {
+   log.info("Disconnect job manager {}@{} for job {} from 
the resource manager.",
+   jobManagerRegistration.getLeaderID(),
+   
jobManagerRegistration.getJobManagerGateway().getAddress(),
+   jobId);
+
+   JobMasterGateway jobMasterGateway = 
jobManagerRegistration.getJobManagerGateway();
+
+   // tell the job manager about the disconnect
+   
jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(),
 getLeaderSessionId(), cause);
+   } else {
+   log.debug("There was no registered job manager for job 
{}.", jobId);
+   }
+   }
+
+   /**
+* Checks whether the given resource manager leader id is matching the 
current leader id.
+*
+* @param resourceManagerLeaderId to check
+* @return True if the given leader id matches the actual leader id; 
otherwise false
+*/
+   protected boolean isValid(UUID resourceManagerLeaderId) {
+   if (resourceManagerLeaderId == null) {
+   return leaderSessionId == null;
--- End diff --

Should `null` always return `false` if we assume that we use a default UUID 
in non high availability mode?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84501182
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -202,101 +205,125 @@ public void shutDown() throws Exception {
//  RPC methods
// 

 
-   /**
-* Register a {@link JobMaster} at the resource manager.
-*
-* @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-* @param jobMasterAddressThe address of the JobMaster that 
registers
-* @param jobID   The Job ID of the JobMaster that 
registers
-* @return Future registration response
-*/
@RpcMethod
-   public Future registerJobMaster(
-   final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
-   final String jobMasterAddress, final JobID jobID) {
+   public Future registerJobManager(
+   final UUID resourceManagerLeaderId,
+   final UUID jobManagerLeaderId,
+   final String jobManagerAddress,
+   final JobID jobId) {
+
+   checkNotNull(resourceManagerLeaderId);
+   checkNotNull(jobManagerLeaderId);
+   checkNotNull(jobManagerAddress);
+   checkNotNull(jobId);
+
+   if (isValid(resourceManagerLeaderId)) {
+   if (!jobLeaderIdService.containsJob(jobId)) {
+   try {
+   jobLeaderIdService.addJob(jobId);
+   } catch (Exception e) {
+   // This should actually never happen 
because, it should always be possible to add a new job
+   ResourceManagerException exception = 
new ResourceManagerException("Could not add the job " +
+   jobId + " to the job id leader 
service. This should never happen.", e);
+
+   onFatalErrorAsync(exception);
+
+   log.debug("Could not add job {} to job 
leader id service.", jobId, e);
+   return 
FlinkCompletableFuture.completedExceptionally(exception);
+   }
+   }
 
-   checkNotNull(jobMasterAddress);
-   checkNotNull(jobID);
+   log.info("Registering job manager {}@{} for job {}.", 
jobManagerLeaderId, jobManagerAddress, jobId);
+
+   Future jobLeaderIdFuture;
 
-   // create a leader retriever in case it doesn't exist
-   final JobIdLeaderListener jobIdLeaderListener;
-   if (leaderListeners.containsKey(jobID)) {
-   jobIdLeaderListener = leaderListeners.get(jobID);
-   } else {
try {
-   LeaderRetrievalService jobMasterLeaderRetriever 
=
-   
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
-   jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+   jobLeaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
} catch (Exception e) {
-   log.warn("Failed to start 
JobMasterLeaderRetriever for job id {}", jobID, e);
+   // we cannot check the job leader id so let's 
fail
+   // TODO: Maybe it's also ok to skip this check 
in case that we cannot check the leader id
+   ResourceManagerException exception = new 
ResourceManagerException("Cannot obtain the " +
+   "job leader id future to verify the 
correct job leader.", e);
+
+   onFatalErrorAsync(exception);
 
-   return 
FlinkCompletableFuture.completed(
-   new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+   log.debug("Could not obtain the job leader id 
future to verify the correct job leader.");
+   return 
FlinkCompletableFuture.completedExceptionally(exception);
}
 
-   leaderListeners.put(jobID, jobIdLeaderListener);
-   }
+   Future jobMasterGatewayFuture = 
getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
 
- 

[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2655#discussion_r84507291
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Testing fatal error handler which records the occurred exceptions 
during the execution of the
+ * tests. Captured exceptions are thrown as a {@link TestingException}.
+ */
+public class TestingFatalErrorHandler implements FatalErrorHandler {
+   private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
+   private final AtomicReference atomicThrowable;
+
+   public TestingFatalErrorHandler() {
+   atomicThrowable = new AtomicReference<>(null);
+   }
+
+   public void rethrowError() throws TestingException {
+   Throwable throwable = atomicThrowable.get();
+
+   if (throwable != null) {
+   throw new TestingException(throwable);
+   }
+   }
+
+   public boolean hasExceptionOccurred() {
+   return atomicThrowable.get() != null;
+   }
+
+   public Throwable getException() {
+   return atomicThrowable.get();
+   }
+
+   @Override
+   public void onFatalError(Throwable exception) {
+   LOG.error("OnFatalError:", exception);
+
+   if (!atomicThrowable.compareAndSet(null, exception)) {
+   atomicThrowable.get().addSuppressed(exception);
+   }
--- End diff --

Oh, I didn't know you could do that. That's neat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2655#discussion_r84506385
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 ---
@@ -145,6 +171,10 @@ public void 
testRegisterJobMasterWithFailureLeaderListener() throws Exception {
Future declineFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
jobMasterAddress, unknownJobIDToHAServices);
RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
assertTrue(response instanceof RegistrationResponse.Decline);
+
+   if (testingFatalErrorHandler.hasExceptionOccurred()) {
+   testingFatalErrorHandler.rethrowError();
+   }
--- End diff --

This seems like a lot of boilerplate that we could abstract using a base 
testing class for ResourceManager tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2669#discussion_r84509754
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 ---
@@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String 
bindAddress) {
this.commonBindAddress = bindAddress;
}
 
+   public void setManagedMemoryPerTaskManager(long 
managedMemoryPerTaskManager) {
+   checkArgument(managedMemoryPerTaskManager > 0, "must have more 
than 0 MB of memory for the TaskManager.");
+   this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+   }
+
// 

//  getters
// 

 
public Configuration getConfiguration() {
+   // update the memory in case that we've changed the number of 
components (TM, RM, JM)
+   long memory = calculateManagedMemoryPerTaskManager();
--- End diff --

Getters should usually not perform any calculation. How about changing the 
method name to `updateConfiguration()`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2669#discussion_r84508885
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 ---
@@ -162,4 +180,63 @@ public String toString() {
", config=" + config +
'}';
}
+
+   /**
+* Calculate the managed memory per task manager. The memory is 
calculated in the following
+* order:
+*
+* 1. Return {@link #managedMemoryPerTaskManager} if set
+* 2. Return 
config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
+* 3. Distribute the available free memory equally among all components 
(JMs, RMs and TMs) and
+* calculate the managed memory from the share of memory for a single 
task manager.
+*
+* @return
+*/
+   private long calculateManagedMemoryPerTaskManager() {
--- End diff --

`getOrCalculateManagedMemoryPerTaskManager`? 😃


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...

2016-10-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2669#discussion_r84509867
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 ---
@@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String 
bindAddress) {
this.commonBindAddress = bindAddress;
}
 
+   public void setManagedMemoryPerTaskManager(long 
managedMemoryPerTaskManager) {
+   checkArgument(managedMemoryPerTaskManager > 0, "must have more 
than 0 MB of memory for the TaskManager.");
+   this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+   }
+
// 

//  getters
// 

 
public Configuration getConfiguration() {
+   // update the memory in case that we've changed the number of 
components (TM, RM, JM)
+   long memory = calculateManagedMemoryPerTaskManager();
--- End diff --

After this method has been called, you can't change the memory 
configuration anymore because the config value will prevent new calculation in 
`calculateManagedMemoryPerTaskManager`. Is that desired?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2671: [FLINK-4862] fix Timer register in ContinuousEventTimeTri...

2016-10-24 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2671
  
Thanks for having a look @aljoscha. Thanks again for the PR @manuzhang. 
Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2671#discussion_r84689568
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 ---
@@ -99,8 +111,12 @@ public boolean canMerge() {
}
 
@Override
-   public TriggerResult onMerge(W window, OnMergeContext ctx) {
+   public TriggerResult onMerge(W window, OnMergeContext ctx) throws 
Exception {
ctx.mergePartitionedState(stateDesc);
+   Long nextFireTimestamp = 
ctx.getPartitionedState(stateDesc).get();
+   if (nextFireTimestamp != null) {
+   ctx.registerEventTimeTimer(nextFireTimestamp);
+   }
--- End diff --

Yes, you're right. It is actually handled correctly in `EventTimeTrigger` 
but not for the continuous trigger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...

2016-10-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2657#discussion_r84689078
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -202,101 +205,125 @@ public void shutDown() throws Exception {
//  RPC methods
// 

 
-   /**
-* Register a {@link JobMaster} at the resource manager.
-*
-* @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-* @param jobMasterAddressThe address of the JobMaster that 
registers
-* @param jobID   The Job ID of the JobMaster that 
registers
-* @return Future registration response
-*/
@RpcMethod
-   public Future registerJobMaster(
-   final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
-   final String jobMasterAddress, final JobID jobID) {
+   public Future registerJobManager(
+   final UUID resourceManagerLeaderId,
+   final UUID jobManagerLeaderId,
+   final String jobManagerAddress,
+   final JobID jobId) {
+
+   checkNotNull(resourceManagerLeaderId);
+   checkNotNull(jobManagerLeaderId);
+   checkNotNull(jobManagerAddress);
+   checkNotNull(jobId);
+
+   if (isValid(resourceManagerLeaderId)) {
+   if (!jobLeaderIdService.containsJob(jobId)) {
+   try {
+   jobLeaderIdService.addJob(jobId);
+   } catch (Exception e) {
+   // This should actually never happen 
because, it should always be possible to add a new job
+   ResourceManagerException exception = 
new ResourceManagerException("Could not add the job " +
+   jobId + " to the job id leader 
service. This should never happen.", e);
+
+   onFatalErrorAsync(exception);
+
+   log.debug("Could not add job {} to job 
leader id service.", jobId, e);
+   return 
FlinkCompletableFuture.completedExceptionally(exception);
+   }
+   }
 
-   checkNotNull(jobMasterAddress);
-   checkNotNull(jobID);
+   log.info("Registering job manager {}@{} for job {}.", 
jobManagerLeaderId, jobManagerAddress, jobId);
+
+   Future jobLeaderIdFuture;
 
-   // create a leader retriever in case it doesn't exist
-   final JobIdLeaderListener jobIdLeaderListener;
-   if (leaderListeners.containsKey(jobID)) {
-   jobIdLeaderListener = leaderListeners.get(jobID);
-   } else {
try {
-   LeaderRetrievalService jobMasterLeaderRetriever 
=
-   
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
-   jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+   jobLeaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
} catch (Exception e) {
-   log.warn("Failed to start 
JobMasterLeaderRetriever for job id {}", jobID, e);
+   // we cannot check the job leader id so let's 
fail
+   // TODO: Maybe it's also ok to skip this check 
in case that we cannot check the leader id
+   ResourceManagerException exception = new 
ResourceManagerException("Cannot obtain the " +
+   "job leader id future to verify the 
correct job leader.", e);
+
+   onFatalErrorAsync(exception);
 
-   return 
FlinkCompletableFuture.completed(
-   new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+   log.debug("Could not obtain the job leader id 
future to verify the correct job leader.");
+   return 
FlinkCompletableFuture.completedExceptionally(exception);
}
 
-   leaderListeners.put(jobID, jobIdLeaderListener);
-   }
+   Future jobMasterGatewayFuture = 
getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
 
- 

[GitHub] flink pull request #2692: [FLINK-4913][yarn] include user jar in system clas...

2016-10-25 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2692

[FLINK-4913][yarn] include user jar in system class loader

When deploying a Yarn cluster for a single job, this change
pre-configures the cluster to include the user jar(s) on all nodes.
This eliminates the need to upload jar files through the
BlobClient. More importantly, it loads the user classes only once and
not on every instantiation of a Task. This also reduces the JobManager
class loading upon recovery of a failed job.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4913

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2692


commit bfe0f1a1b5cfee200ff76e4b988fe43613c8d0c0
Author: Maximilian Michels 
Date:   2016-10-25T14:16:52Z

[FLINK-4913][yarn] include user jar in system class loader

When deploying a Yarn cluster for a single job, this change
pre-configures the cluster to include the user jar(s) on all nodes.
This eliminates the need to upload jar files through the
BlobClient. More importantly, it loads the user classes only once and
not on every instantiation of a Task. This also reduces the JobManager
class loading upon recovery of a failed job.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-10-26 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2425
  
@vijikarthi I haven't forgotten about your PR. Thanks for the feedback. 
I'll get back to you today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2692: [FLINK-4913][yarn] include user jar in system class loade...

2016-10-26 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2692
  
Thanks for checking out the changes.

Yes, it is handled correctly. Indeed, I started off just thinking about the 
user-provided jar but then discovered that the `PackagedProgram` extracts 
nested jars within that jar. The code handles that transparently in the sense 
that the shipped user jars are always represented by a list of URLs. Either a 
singleton list or multiple entries in case of nested jars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-10-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r85175840
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -108,6 +111,11 @@
 
private final Options ALL_OPTIONS;
 
+   private static final String fileName = "yarn-app.ini";
+   private static final String cookieKey = "secureCookie";
--- End diff --

I think the ini file format is actually fine. Could also be JSON but I 
don't mind. To not break backwards-compatibility, I think we have to keep the 
behavior to use the last-used application id in case none is supplied. We could 
have an extra config entry for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-10-27 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2425
  
CC @uce to check out the network layer changes. This is a very sensitive 
and performance critical part of Flink. We should be very sure nothing breaks 
it with the changes.

@vijikarthi Please have a look at the null checks in the network code. I 
would replace them with `checkNotNull` and never pass any null values in there. 
It would be desirable that turned off security doesn't have any overhead with 
the security support built in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: [FLINK-4800] Refactor the ContinuousFileMonitoringFunctio...

2016-10-27 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2618
  
Looks good @kl0u. Could you push the rebased version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2692: [FLINK-4913][yarn] include user jar in system class loade...

2016-10-27 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2692
  
Pushed a minor change to check which jars are required in case the cluster 
is used to execute multiple jobs with different dependencies. Merigng after a 
local Travis run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: [FLINK-4800] Refactor the ContinuousFileMonitoringFunctio...

2016-10-27 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2618
  
Merged. Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2718: [hotfix] Fixes the TimestampedInputSplit.EOS comparison.

2016-10-28 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2718
  
I think we should get rid of the magic `EOF` split. We should simply set a 
flag in the reader to stop reading. Until then, this looks like a good fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2718: [hotfix] Fixes the TimestampedInputSplit.EOS compa...

2016-10-28 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2718#discussion_r85523016
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
 ---
@@ -90,4 +112,60 @@ public void testIllegalArgument() {
}
}
}
+
+   @Test
+   public void testPriorityQ() {
+   TimestampedFileInputSplit richFirstSplit =
+   new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+   TimestampedFileInputSplit richSecondSplit =
+   new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 0, 100, null);
+
+   TimestampedFileInputSplit richThirdSplit =
+   new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+   TimestampedFileInputSplit richForthSplit =
+   new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+   TimestampedFileInputSplit richFifthSplit =
+   new TimestampedFileInputSplit(11, 1, new 
Path("test/test3"), 0, 100, null);
+
+   TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS;
+
+   Queue pendingSplits = new 
PriorityQueue<>(10, new Comparator() {
+   @Override
+   public int compare(TimestampedFileInputSplit o1, 
TimestampedFileInputSplit o2) {
+   return o1.compareTo(o2);
+   }
+   });
--- End diff --

`TimestampedFileInputSplit` is already `Comparable`. So you can skip the 
Comparator here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-10-31 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2732

[FLINK-4272] Create a JobClient for job control and monitoring

Also includes: [FLINK-4274] Expose new JobClient in the DataSet/DataStream 
API

- rename JobClient class to JobClientActorUtils

- introduce JobClient interface with two implementations

  - JobClientEager: starts an actor system right away and monitors the job
- Move ClusterClient#cancel, ClusterClient#stop,
  ClusterClient#getAccumulators to JobClient

  - JobClientLazy: starts an actor system when requests are made by
encapsulating the eager job client

- Java and Scala API
  - JobClient integration
  - introduce ExecutionEnvironment#executeWithControl()
  - introduce StreamExecutionEnvironment#executeWithControl()

- report errors during job execution as JobExecutionException instead of
  ProgramInvocationException and adapt test cases

- provide finalizers to run code upon shutdown of client

- use ActorGateway in JobListeningContext

- add test case for JobClient implementations

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4272

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2732


commit a3f5cc00ec6959ca662ca925918816b4c8d984cd
Author: Maximilian Michels 
Date:   2016-08-21T13:25:02Z

[FLINK-4272] Create a JobClient for job control and monitoring

Also includes: [FLINK-4274] Expose new JobClient in the DataSet/DataStream 
API

- rename JobClient class to JobClientActorUtils

- introduce JobClient interface with two implementations

  - JobClientEager: starts an actor system right away and monitors the job
- Move ClusterClient#cancel, ClusterClient#stop,
  ClusterClient#getAccumulators to JobClient

  - JobClientLazy: starts an actor system when requests are made by
encapsulating the eager job client

- Java and Scala API
  - JobClient integration
  - introduce ExecutionEnvironment#executeWithControl()
  - introduce StreamExecutionEnvironment#executeWithControl()

- report errors during job execution as JobExecutionException instead of
  ProgramInvocationException and adapt test cases

- provide finalizers to run code upon shutdown of client

- use ActorGateway in JobListeningContext

- add test case for JobClient implementations




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...

2016-11-01 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2732
  
Rebased to the latest changes on the master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...

2016-11-01 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2732
  
CC @rmetzger @aljoscha Could you take a look at the changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2734: Keytab & TLS support for Flink on Mesos Setup

2016-11-02 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2734
  
Thanks for your PR @vijikarthi! I'll try to check it out as soon as 
possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2741: [FLINK-4998][yarn] fail if too many task slots are...

2016-11-02 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2741

[FLINK-4998][yarn] fail if too many task slots are configured

This fails the deployment of the Yarn application if the number of task
slots are configured to be larger than the maximum virtual cores of the
Yarn cluster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2741.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2741


commit 35c4ad3cb086abe6fa85c5755daa8a83fbdfbf56
Author: Maximilian Michels 
Date:   2016-11-02T15:37:56Z

[FLINK-4998][yarn] fail if too many task slots are configured

This fails the deployment of the Yarn application if the number of task
slots are configured to be larger than the maximum virtual cores of the
Yarn cluster.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2741: [FLINK-4998][yarn] fail if too many task slots are config...

2016-11-02 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2741
  
Added a test case to verify the error reporting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2745: [yarn] fix debug string displayed for failed appli...

2016-11-03 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2745

[yarn] fix debug string displayed for failed applications

Merging for `master` and `release-1.1`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2745.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2745


commit a285002c8b651b50c54d3660daf4e2a3994e11c4
Author: Maximilian Michels 
Date:   2016-11-01T10:02:39Z

[yarn] fix debug string displayed for failed applications




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86323404
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -101,6 +102,14 @@ public void run() {
final byte[] buffer = new byte[BUFFER_SIZE];
 
while (true) {
+
+   int keyLength = inputStream.read();
--- End diff --

Here the cookie length is limited to one byte.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86321008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -64,26 +64,25 @@
 
static final String NO_SECURE_COOKIE = "";
 
-   static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
-
abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
 
abstract void readFrom(ByteBuf buffer) throws Exception;
 
// 

 
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie) {
-   return allocateBuffer(allocator, id, secureCookie, 0);
+   return allocateBuffer(allocator, id, 0, secureCookie);
}
 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie, int length) {
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length, String secureCookie) {
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
secureCookie = secureCookie == null ? "": secureCookie;
--- End diff --

Simpler:
```java
if (secureCookie == null) {
secureCookie = NO_SECURE_COOKIE;
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86322286
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -120,6 +136,15 @@ static LengthFieldBasedFrameDecoder 
createFrameLengthDecoder() {
@ChannelHandler.Sharable
static class NettyMessageDecoder extends 
MessageToMessageDecoder {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyMessageDecoder.class);
+
+   final byte[] secureCookie;
+
+   public NettyMessageDecoder(String secureCookie) {
+   secureCookie = secureCookie == null ? "": secureCookie;
--- End diff --

Should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86321320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -64,26 +64,25 @@
 
static final String NO_SECURE_COOKIE = "";
 
-   static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
-
abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
 
abstract void readFrom(ByteBuf buffer) throws Exception;
 
// 

 
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie) {
-   return allocateBuffer(allocator, id, secureCookie, 0);
+   return allocateBuffer(allocator, id, 0, secureCookie);
}
 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie, int length) {
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length, String secureCookie) {
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
secureCookie = secureCookie == null ? "": secureCookie;
--- End diff --

Still think we should never pass a null cookie. Then the check wouldn't be 
necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86324769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -54,24 +58,36 @@
// constructor in order to work with the generic deserializer.
// 

 
-   static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic 
number (4), msg ID (1)
+   static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), 
magic number (4), Cookie (4), msg ID (1)
 
static final int MAGIC_NUMBER = 0xBADC0FFE;
 
+   static final String NO_SECURE_COOKIE = "";
+
abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
 
abstract void readFrom(ByteBuf buffer) throws Exception;
 
// 

 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id) {
-   return allocateBuffer(allocator, id, 0);
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, String secureCookie) {
+   return allocateBuffer(allocator, id, 0, secureCookie);
}
 
-   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length) {
+   private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte 
id, int length, String secureCookie) {
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+   secureCookie = secureCookie == null ? "": secureCookie;
--- End diff --

Should be removed in favor of never passing a null value here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86340045
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -788,75 +719,125 @@ private void logAndSysout(String message) {
}
}
 
-   public static File getYarnPropertiesLocation(Configuration conf) {
-   String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
-   String currentUser = System.getProperty("user.name");
-   String propertiesFileLocation =
-   
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
-
-   return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
+   public static File getYarnPropertiesLocation() {
+   String path = System.getProperty("user.home") + File.separator 
+ YARN_APP_INI;
+   File stateFile;
+   try {
+   stateFile = new File(path);
+   if(!stateFile.exists()) {
+   stateFile.createNewFile();
+   }
+   } catch(IOException e) {
+   throw new RuntimeException(e);
+   }
+   return stateFile;
}
 
-   public static void persistAppState(String appId, String cookie) {
-   if(appId == null || cookie == null) {
-   return;
+   public static void persistAppState(YarnAppState appState) {
+
+   final String appId = appState.getApplicationId();
+   final String parallelism = appState.getParallelism();
+   final String dynaProps = appState.getDynamicProperties();
+   final String cookie = appState.getCookie();
+
+   if(appId == null) {
+   throw new RuntimeException("Missing application ID from 
Yarn application state");
}
-   String path = System.getProperty("user.home") + File.separator 
+ fileName;
-   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+
+   String path = getYarnPropertiesLocation().getAbsolutePath();
+
+   LOG.debug("Going to persist Yarn application state: {} in {}", 
appState,path);
+
try {
-   File f = new File(path);
-   if(!f.exists()) {
-   f.createNewFile();
-   }
HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+
SubnodeConfiguration subNode = config.getSection(appId);
-   if (subNode.containsKey(cookieKey)) {
-   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
-   LOG.error(errorMessage);
-   throw new RuntimeException(errorMessage);
+   if(!subNode.isEmpty()) {
+   throw new RuntimeException("Application with ID 
" + appId + "already exists");
}
-   subNode.addProperty(cookieKey, cookie);
+
+   subNode.addProperty(YARN_PROPERTIES_PARALLELISM, 
parallelism);
+   
subNode.addProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynaProps);
+   subNode.addProperty(YARN_PROPERTIES_SECURE_COOKIE, 
cookie);
+
+   //update latest entry section with the most recent APP 
Id
+   config.clearTree(YARN_LATEST_ENTRY_SECTION_NAME);
+   SubnodeConfiguration activeAppSection = 
config.getSection(YARN_LATEST_ENTRY_SECTION_NAME);
+   activeAppSection.addProperty(YARN_APPLICATION_ID_KEY, 
appId);
+
config.save();
-   LOG.debug("Persisted cookie for the appID: {}", appId);
+   LOG.debug("Persisted Yarn App state: {}", appState);
} catch(Exception e) {
-   LOG.error("Exception occurred while persisting app 
state for app id: {}", appId, e);
throw new RuntimeException(e);
}
}
 
-   public static String getAppSecureCookie(String appId) {
+   public static YarnAppState retrieveMostRecentYarnApp() {
+   String path = getYarnPropertiesLocation().getAbsolutePath();
+   LOG.debug("Going to fetch app state from {}", path);
+   try {
+   HierarchicalINIConfiguration conf

[GitHub] flink pull request #2749: [FLINK-3813][yarn] wait for CLI to complete before...

2016-11-03 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2749

[FLINK-3813][yarn] wait for CLI to complete before checking output



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3813

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2749


commit e747d94215792ac68fde871226b49d6322a76781
Author: Maximilian Michels 
Date:   2016-11-03T14:11:10Z

[FLINK-3813][yarn] wait for CLI to complete before checking output




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-11-04 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2425
  
Thank you for the changes. I wonder, could we remove the cookie header 
completely for Netty or the BlobServer in case the authorization is turned off? 
The Netty protocol has a `MAGIC_NUMBER` which is checked when decoding the 
message. We could use a different "magic number" to check whether we use the 
normal or the cookie-based Netty protocol. This would eliminate all the 
overhead of the cookie transmission. Furthermore, we should strip the cookie 
from the message once we have verified it is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2708: [FLINK-4946] [scripts] Load jar files from subdire...

2016-11-04 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2708#discussion_r86545553
  
--- Diff: flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh ---
@@ -35,14 +33,15 @@ JVM_ARGS="$JVM_ARGS -Xmx512m"
 # Flink CLI client
 constructCLIClientClassPath() {
 
-   for jarfile in $FLINK_LIB_DIR/*.jar ; do
-   if [[ $CC_CLASSPATH = "" ]]; then
-   CC_CLASSPATH=$jarfile;
-   else
-   CC_CLASSPATH=$CC_CLASSPATH:$jarfile
-   fi
-   done
-   echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+while read -d '' -r jarfile ; do
+if [[ $FLINK_CLASSPATH = "" ]]; then
+CC_CLASSPATH="$jarfile";
+else
+CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+fi
+done < <(find "$FLINK_LIB_DIR" -name '*.jar' -print0)
+
+echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
--- End diff --

Recursing over the lib folder for uploading is not necessary because Yarn 
already recursively uploads it into the home directory using 
`FileUtil#copyFromLocalFile`. We just have to make sure to generate the 
classpath including all subdirectories. The easiest fix would be to use 
`lib/**` instead of `lib/*` which would be a one-line change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2703: [FLINK-4900] flink-master: Allow to deploy TM with contai...

2016-11-04 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2703
  
Thanks for the PR! Jenkins CI is currently broken but Travis CI passed. 
Looks good to me but would be great if you could take a look @EronWright.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2708: [FLINK-4946] [scripts] Load jar files from subdirectories...

2016-11-08 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2708
  
@greghogan It would be helpful to look at the Yarn container files which 
include the container environment variables. This would help us to figure out 
why the recursive `**` is not working. Another solution would be to recurse 
into directories and add the files to the classpath one-by-one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3086: Improve docker setup

2017-01-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/3086#discussion_r95410892
  
--- Diff: flink-contrib/docker-flink/Dockerfile ---
@@ -22,9 +22,9 @@ FROM java:8-jre-alpine
 RUN apk add --no-cache bash snappy
 
 # Configure Flink version
-ENV FLINK_VERSION=1.1.1
-ENV HADOOP_VERSION=27
-ENV SCALA_VERSION=2.11
+ARG FLINK_VERSION=1.1.3
--- End diff --

`ARG` is only available in newer versions of Docker. If we want to maintain 
backwards-compatibility, we should adjust the README to state `docker build 
--env FLINK_VERSION=1.0.3`. As far as I know, we don't gain anything by using 
`ARG`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3086: Improve docker setup

2017-01-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/3086#discussion_r95411497
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -36,9 +39,9 @@ elif [ "$1" == "taskmanager" ]; then
 echo "Starting Task Manager"
 echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml
 $FLINK_HOME/bin/taskmanager.sh start
+
+  # prevent script to exit
+  tail -f /dev/null
 else
 $@
--- End diff --

@greghogan Seems like a way to execute an arbitrary command passed inside 
the Docker container passed as an argument to `docker run `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3086: Improve docker setup

2017-01-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/3086#discussion_r95411011
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -28,6 +28,9 @@ if [ "$1" == "jobmanager" ]; then
 
 echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml
 $FLINK_HOME/bin/jobmanager.sh start cluster
+
+  # prevent script to exit
+  tail -f /dev/null
--- End diff --

I think the proper way to fix this, would be to call a non-daemonized 
startup script.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    3   4   5   6   7   8   9   10   11   12   >