[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84035347 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -424,13 +481,17 @@ public String toString() { ResultPartitionID partitionId; + String secureCookie = ""; + public TaskEventRequest() { } - TaskEventRequest(TaskEvent event, ResultPartitionID partitionId, InputChannelID receiverId) { + TaskEventRequest(TaskEvent event, ResultPartitionID partitionId, InputChannelID receiverId, + String secureCookie) { this.event = event; this.receiverId = receiverId; this.partitionId = partitionId; + this.secureCookie = (secureCookie == null || secureCookie.length() == 0) ? "": secureCookie; --- End diff -- Please pass a proper non-null secureCookie instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84027778 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -725,7 +755,21 @@ else if (response != RETURN_OKAY) { Object msg = JobManagerMessages.getRequestBlobManagerPort(); Future futureBlobPort = jobManager.ask(msg, askTimeout); + Object secureCookieMsg = JobManagerMessages.getRequestBlobManagerSecureCookie(); + Future futureSecureCookie = jobManager.ask(secureCookieMsg, askTimeout); + try { + String secureCookie = null; + + Object cookie = Await.result(futureSecureCookie, askTimeout); + if(cookie instanceof String) { + secureCookie = (String) cookie; + } --- End diff -- We are transferring the cookie here from the JobManager? That should never be the case. The client has to provide the cookie, otherwise a client must not be able to communicate with the JobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84033606 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -57,24 +61,37 @@ // constructor in order to work with the generic deserializer. // - static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1) + static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), magic number (4), Cookie (4), msg ID (1) static final int MAGIC_NUMBER = 0xBADC0FFE; + static final int BUFFER_SIZE = 65536; + + static final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + abstract ByteBuf write(ByteBufAllocator allocator) throws Exception; abstract void readFrom(ByteBuf buffer) throws Exception; // - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie) { + return allocateBuffer(allocator, id, secureCookie, 0); } - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie, int length) { + secureCookie = (secureCookie == null || secureCookie.length() == 0) ? "": secureCookie; + length+=secureCookie.getBytes().length; final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer(); buffer.writeInt(HEADER_LENGTH + length); buffer.writeInt(MAGIC_NUMBER); + + buffer.writeInt(secureCookie.length()); --- End diff -- Here you're writing `cookie.length()` but early you write calculate with `secureCookie.getBytes().length`. I think this causes issues with non ASCII chars. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84041460 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -108,6 +111,11 @@ private final Options ALL_OPTIONS; + private static final String fileName = "yarn-app.ini"; + private static final String cookieKey = "secureCookie"; --- End diff -- Please see Yarn properties which should be used for this purpose: `writeYarnProperties(..)`. Please no new configuration file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84035425 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -492,19 +553,22 @@ public void readFrom(ByteBuf buffer) { InputChannelID receiverId; + String secureCookie = ""; --- End diff -- The `""` initialization is always overridden, please remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84039884 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java --- @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.ConfigConstants; --- End diff -- Unused import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84029858 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java --- @@ -199,6 +199,11 @@ public int getBlobServerPort() { return blobService.getPort(); } + public String getSecureCookie() { + return blobService.getSecureCookie() == null + ? "": blobService.getSecureCookie(); --- End diff -- I feel like this logic should be delegated to the blobService. Also, this might be clearer: ```java String secureCookie = blobService.getSecureCookie(); return secureCookie != null ? secureCookie : "" ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84035178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -369,21 +423,24 @@ void readFrom(ByteBuf buffer) throws Exception { InputChannelID receiverId; + String secureCookie = ""; --- End diff -- The `""` initialization is always overridden, please remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84033402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -57,24 +61,37 @@ // constructor in order to work with the generic deserializer. // - static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1) + static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), magic number (4), Cookie (4), msg ID (1) static final int MAGIC_NUMBER = 0xBADC0FFE; + static final int BUFFER_SIZE = 65536; + + static final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + abstract ByteBuf write(ByteBufAllocator allocator) throws Exception; abstract void readFrom(ByteBuf buffer) throws Exception; // - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie) { + return allocateBuffer(allocator, id, secureCookie, 0); } - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie, int length) { + secureCookie = (secureCookie == null || secureCookie.length() == 0) ? "": secureCookie; + length+=secureCookie.getBytes().length; --- End diff -- This seems inefficient, always getting the byte array. The length should be supplied and just be calculated once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84034881 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -369,21 +423,24 @@ void readFrom(ByteBuf buffer) throws Exception { InputChannelID receiverId; + String secureCookie = ""; + public PartitionRequest() { } - PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) { + PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId, String secureCookie) { this.partitionId = partitionId; this.queueIndex = queueIndex; this.receiverId = receiverId; + this.secureCookie = (secureCookie == null || secureCookie.length() == 0) ? "": secureCookie; --- End diff -- These checks should not be necessary. Please remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84029158 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -393,6 +399,24 @@ static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws } /** +* Utility method to validate secure cookie from Flink configuration instance +* @throws +*/ + public static String validateAndGetSecureCookie(Configuration configuration) { + String secureCookie = null; + if(configuration.getBoolean(ConfigConstants.SECURITY_ENABLED, DEFAULT_SECURITY_ENABLED) == true) { + secureCookie = configuration.getString(ConfigConstants.SECURITY_COOKIE, null); + if(secureCookie == null) { + String message = "Missing " + ConfigConstants.SECURITY_COOKIE + + " configuration in Flink configuration file"; + LOG.error(message); + throw new RuntimeException(message); --- End diff -- - Exceptions are logged anyways. - This should be `IllegalConfigurationException` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84035615 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -527,13 +591,17 @@ void readFrom(ByteBuf buffer) throws Exception { static class CloseRequest extends NettyMessage { private static final byte ID = 5; + String secureCookie = ""; --- End diff -- Initialization can never used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84034660 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -226,8 +277,9 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException { int length = 16 + 4 + 1 + 4 + buffer.getSize(); ByteBuf result = null; + final String NO_SECURE_COOKIE= ""; --- End diff -- space after variable name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84036664 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java --- @@ -30,16 +30,29 @@ private final NettyMessageEncoder messageEncoder = new NettyMessageEncoder(); - private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); + private final NettyMessage.NettyMessageDecoder serverMessageDecoder; + + private final NettyMessage.NettyMessageDecoder clientMessageDecoder; private final ResultPartitionProvider partitionProvider; private final TaskEventDispatcher taskEventDispatcher; private final NetworkBufferPool networkbufferPool; + private final String secureCookie; - PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) { + PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, + NetworkBufferPool networkbufferPool, String secureCookie) { this.partitionProvider = partitionProvider; this.taskEventDispatcher = taskEventDispatcher; this.networkbufferPool = networkbufferPool; + this.secureCookie = (secureCookie == null || secureCookie.length() == 0) ? "": secureCookie; + + serverMessageDecoder = new NettyMessage.NettyMessageDecoder(secureCookie); + + /* +* Client decoder does not validate the secure cookie from server since +* the server protocol does not transmit the secure cookie on the wire +*/ + clientMessageDecoder = new NettyMessage.NettyMessageDecoder(null); --- End diff -- But it still sends an empty cookie? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84026759 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -272,4 +284,8 @@ private void closeSilently(Closeable closeable) { } } } + + /* Secure cookie to authenticate */ --- End diff -- > authorize --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84028789 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java --- @@ -53,6 +53,9 @@ /** Internal code to identify a reference via jobId as the key */ static final byte JOB_ID_SCOPE = 2; + /** The maximum length of secure cookie. */ + static final int MAX_LENGTH_SECURE_COOKIE = 1024; --- End diff -- Shouldn't this be tied to the buffer size which limits the cookie length? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84032194 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -57,24 +61,37 @@ // constructor in order to work with the generic deserializer. // - static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1) + static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), magic number (4), Cookie (4), msg ID (1) --- End diff -- 4 bytes for the cookie or cookie **length**? How is this in line with the size restrictions assumed in other places? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84035467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -492,19 +553,22 @@ public void readFrom(ByteBuf buffer) { InputChannelID receiverId; + String secureCookie = ""; + public CancelPartitionRequest() { } - public CancelPartitionRequest(InputChannelID receiverId) { + public CancelPartitionRequest(InputChannelID receiverId, String secureCookie) { this.receiverId = receiverId; + this.secureCookie = (secureCookie == null || secureCookie.length() == 0) ? "": secureCookie; --- End diff -- Please pass a proper initialized cookie instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84041338 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -108,6 +111,11 @@ private final Options ALL_OPTIONS; + private static final String fileName = "yarn-app.ini"; + private static final String cookieKey = "secureCookie"; + + private final Option SECURE_COOKIE_OPTION; --- End diff -- This option can be retrieved from the main options. No need for yarn specific option. Please remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84039282 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1008,6 +1008,9 @@ class JobManager( case RequestBlobManagerPort => sender ! decorateMessage(libraryCacheManager.getBlobServerPort) +case RequestBlobManagerSecureCookie => + sender ! decorateMessage(libraryCacheManager.getSecureCookie) --- End diff -- That is not a good idea for security! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84020724 --- Diff: docs/setup/cli.md --- @@ -217,6 +217,8 @@ Action "run" compiles and runs a program. java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode + -k,--cookie Secure cookie to +authenticate --- End diff -- The description could be a bit more elaborate. For example, > String to authorize Akka-based RPC communication --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r84030548 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java --- @@ -91,6 +94,14 @@ public NettyConfig( this.config = checkNotNull(config); + boolean security = config.getBoolean(ConfigConstants.SECURITY_ENABLED, false); + this.secureCookie = config.getString(ConfigConstants.SECURITY_COOKIE, ""); + + if(security && this.secureCookie == "") { + LOG.error("Security is enabled but secure cookie is not provided"); + throw new IllegalConfigurationException("Security is enabled but secure cookie is not provided"); --- End diff -- Exceptions are logged anyways. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2618 Could you please elaborate on the changes you made to the `ContinuousFileReaderOperator`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2660: [FLINK-4833] properly log exceptions in CountMinHe...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2660 [FLINK-4833] properly log exceptions in CountMinHeavyHitter This logs the underlying exception properly which could help us to find the exact cause of the reported problems. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4833 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2660 commit d117c59e704e9faad8d5001cb9b2164cd4aa7b9a Author: Maximilian Michels Date: 2016-10-19T13:50:15Z [FLINK-4833] properly log exceptions in CountMinHeavyHitter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2659: [FLINK-4857] Remove throws clause from ZooKeeperUtils fun...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2659 +1 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2662 Thanks for the PR @greghogan! Having a custom exception for missing arguments to a user program is a good approach. However, it requires the author of the program to use the custom exception. At least, we would have to adapt all the included examples. Additionally, it would be nice to throw another custom exception when no Flink job was generated during execution of the jar (which might be because of missing arguments). Currently, we simply throw a `ProgramInvocationException` which could look like a serious error to the user when merely arguments are missing. So +1 but we might do some follow-ups to fully solve the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84241679 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -20,11 +20,19 @@ if [ "$1" = "jobmanager" ]; then echo "Starting Job Manager" -sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml +#sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml + +# make use of container linking and exploit the jobmanager entry in /etc/hosts +sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml + sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" $FLINK_HOME/conf/flink-conf.yaml --- End diff -- This line has to go to the `taskmanager` section. Before, it didn't really matter because the config was shared but now this setting will just be configured for the `JobManager` when, in fact, it is only used by the `TaskManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242416 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" depends_on: - jobmanager command: taskmanager -volumes_from: - - jobmanager:ro +links: --- End diff -- `links` are now a legacy feature of Docker 1.9.0 but probably fine to stick with it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242735 --- Diff: flink-contrib/docker-flink/docker-compose.sh --- @@ -0,0 +1,4 @@ +#!/bin/sh --- End diff -- Could we name this file `bluemix-docker-compose.sh`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2657 This doesn't compile currently. Do you prefer if I review the PRs individually or review the commits in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2618 Thanks for updating the description. Let take a look at the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242539 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- What are these ports needed for? The TaskManager will always initiate the connection to the JobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84285533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** +* Creates a {@link RichFileInputSplit} based on the file modification time and +* the rest of the information of the {@link FileInputSplit}, as returned by the +* underlying filesystem. +* +* @param modificationTime the modification file of the file this split belongs to +* @param split the rest of the information about the split +*/ + public RichFileInputSplit(long modificationTime, FileInputSplit split) { --- End diff -- Not sure about this constructor. I think I'd prefer something spelling out the parameters. This also avoids to create a regular FileInputSplit every time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288480 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit --- End diff -- I think you can drop the type parameter here since you don't gain any type safety from the parameter. It is never used in any argument which would make it meaningful. Instead just use `Serializable` for the state type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84280372 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -199,44 +196,39 @@ public void close() throws Exception { private final Object checkpointLock; private final SourceFunction.SourceContext readerContext; - private final Queue pendingSplits; - - private FileInputSplit currentSplit = null; + private final Queue> pendingSplits; - private S restoredFormatState = null; + private RichFileInputSplit currentSplit; - private volatile boolean isSplitOpen = false; + private volatile boolean isSplitOpen; private SplitReader(FileInputFormat format, TypeSerializer serializer, SourceFunction.SourceContext readerContext, Object checkpointLock, - Tuple3, FileInputSplit, S> restoredState) { + List> restoredState) { this.format = checkNotNull(format, "Unspecified FileInputFormat."); this.serializer = checkNotNull(serializer, "Unspecified Serializer."); this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.pendingSplits = new ArrayDeque<>(); this.isRunning = true; - // this is the case where a task recovers from a previous failed attempt - if (restoredState != null) { - List pending = restoredState.f0; - FileInputSplit current = restoredState.f1; - S formatState = restoredState.f2; - - for (FileInputSplit split : pending) { - pendingSplits.add(split); + this.pendingSplits = new PriorityQueue<>(100, new Comparator>() { --- End diff -- Why did you choose 100 as the initial size? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84285924 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit --- End diff -- The name rich :) I'd be happy if we could find another name. Rich doesn't really mean anything. How about `TimestampedFileInputSplit`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288567 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; --- End diff -- ```java private Serializable splitState; ``` should be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288776 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** +* Creates a {@link RichFileInputSplit} based on the file modification time and +* the rest of the information of the {@link FileInputSplit}, as returned by the +* underlying filesystem. +* +* @param modificationTime the modification file of the file this split belongs to +* @param split the rest of the information about the split +*/ + public RichFileInputSplit(long modificationTime, FileInputSplit split) { + this(modificationTime, + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames()); + } + + /** +* Constructor with the raw split information. +* +* @param modificationTime the modification file of the file this split belongs to +* @param numthe number of this input split +* @param file the file name +* @param start the position of the first byte in the file to process +* @param length the number of bytes in the file to process (-1 is flag for "read whole file") +* @param hosts the list of hosts containing the block, possibly null +*/ + private RichFileInputSplit(long modificationTime, int num, Path file, long start, long length, String[] hosts) { + super(num, file, start, length, hosts); + + Preconditions.checkArgument(modificationTime >= 0 || modificationTime == Long.MIN_VALUE, + "Invalid File Split Modification Time: "+ modificationTime +"."); + + this.modificationTime = modificationTime; + } + + /** +* Sets the state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* +* This is applicable to {@link org.apache.flink.api.common.io.FileInputFormat FileInputFormats} +* that implement the {@link org.apache.flink.api.common.io.CheckpointableInputFormat +* CheckpointableInputFormat} interface. +* */ + public void setSplitState(S state) { + this.splitState = state; + }
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84284953 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RichFileInputSplitTest.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.checkpointing; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.source.RichFileInputSplit; +import org.junit.Assert; +import org.junit.Test; + +public class RichFileInputSplitTest { + + @Test + public void testSplitEquality() { + + RichFileInputSplit eos1 = RichFileInputSplit.EOS; + RichFileInputSplit eos2 = RichFileInputSplit.EOS; + + Assert.assertEquals(eos1, eos2); + + FileInputSplit firstSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, firstSplit); + Assert.assertNotEquals(eos1, richFirstSplit); + Assert.assertNotEquals(richFirstSplit, firstSplit); + + FileInputSplit secondSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, secondSplit); + Assert.assertEquals(richFirstSplit, richSecondSplit); + Assert.assertNotEquals(richFirstSplit, firstSplit); + + FileInputSplit modSecondSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richModSecondSplit = new RichFileInputSplit(11, modSecondSplit); + Assert.assertNotEquals(richSecondSplit, richModSecondSplit); + + FileInputSplit thirdSplit = new FileInputSplit(2, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, thirdSplit); + Assert.assertEquals(richThirdSplit.getModificationTime(), 10); + Assert.assertNotEquals(richFirstSplit, richThirdSplit); + + FileInputSplit thirdSplitCopy = new FileInputSplit(2, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richThirdSplitCopy = new RichFileInputSplit(10, thirdSplitCopy); + Assert.assertEquals(richThirdSplitCopy, richThirdSplit); + } + + @Test + public void testSplitComparison() { + FileInputSplit firstSplit = new FileInputSplit(3, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, firstSplit); + + FileInputSplit secondSplit = new FileInputSplit(2, new Path("test/test2"), 0, 100, null); + RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, secondSplit); + + FileInputSplit thirdSplit = new FileInputSplit(1, new Path("test/test2"), 0, 100, null); + RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, thirdSplit); + + FileInputSplit forthSplit = new FileInputSplit(0, new Path("test/test3"), 0, 100, null); + RichFileInputSplit richForthSplit = new RichFileInputSplit(11, forthSplit); + + // lexicographically on the path order + Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0); + Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0); + + // same mod time, same file so smaller split number first + Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0); + + // smaller modification time first + Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); + } + + @Test + public void testIllegalArgument() { + try { +
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84281147 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); --- End diff -- Is it really necessary to have this special split? Couldn't you just have a `reader.stop()` method which stops the reader after the current split has been processed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84279968 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -189,7 +186,7 @@ public void close() throws Exception { output.close(); } - private class SplitReader extends Thread { + private final class SplitReader extends Thread { --- End diff -- Making private classes final is not really necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84290827 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- Sure, makes sense since those ports are not reachable by TaskManagers running in different containers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288975 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -347,34 +328,17 @@ public void run() { } } - private Tuple3, FileInputSplit, S> getReaderState() throws IOException { - List snapshot = new ArrayList<>(this.pendingSplits.size()); - for (FileInputSplit split: this.pendingSplits) { - snapshot.add(split); - } - - // remove the current split from the list if inside. - if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) { - this.pendingSplits.remove(); - } - - if (this.currentSplit != null) { - if (this.format instanceof CheckpointableInputFormat) { - @SuppressWarnings("unchecked") - CheckpointableInputFormat checkpointableFormat = - (CheckpointableInputFormat) this.format; - - S formatState = this.isSplitOpen ? - checkpointableFormat.getCurrentState() : - restoredFormatState; - return new Tuple3<>(snapshot, currentSplit, formatState); - } else { - LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery."); - return new Tuple3<>(snapshot, currentSplit, null); + private List> getReaderState() throws IOException { + List> snapshot = new ArrayList<>(this.pendingSplits.size()); + if (currentSplit != null ) { + if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { + S formatState = ((CheckpointableInputFormat, S>) this.format).getCurrentState(); --- End diff -- ```java Serializable formatState = ((CheckpointableInputFormat) this.format).getCurrentState(); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84276796 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -43,16 +41,18 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; import java.util.Queue; +import static org.apache.flink.streaming.api.functions.source.RichFileInputSplit.EOS; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The operator that reads the {@link FileInputSplit splits} received from the preceding + * The operator that reads the {@link RichFileInputSplit splits} received from the preceding * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} * which has a parallelism of 1, this operator can have DOP > 1. * --- End diff -- Generic types are not documented in the JavaDoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2662: [FLINK-4824] [client] CliFrontend shows misleading...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2662#discussion_r84448442 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +/** + * Exception used to indicate that there is an error in the parametrization of a Flink program. + */ +public class ProgramParametrizationException extends RuntimeException { + /** +* Serial version UID for serialization interoperability. +*/ + private static final long serialVersionUID = 909054589029890262L; + + /** +* Creates a ProgramParametrizationException. +*/ + public ProgramParametrizationException() { + super(); --- End diff -- I think we should not even allow to skip the message here. This will simplify the code further and display some message for the user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2662: [FLINK-4824] [client] CliFrontend shows misleading...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2662#discussion_r84448541 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -975,6 +981,32 @@ private int handleArgException(Exception e) { } /** +* Displays an optional exception message for incorrect program parametrization. +* +* @param e The exception to display. +* @return The return code for the process. +*/ + private int handleParametrizationException(ProgramParametrizationException e) { + String message = e.getMessage(); + if (message != null) { + System.err.println(message); + } --- End diff -- This block could be removed if we got rid of the zero-args constructor in `ProgramParametrizationException`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464167 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null){ + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } --- End diff -- The above `else if` block is not correct because there is only one instance of the trigger which is reused for each Window. Hence the abstraction using the state descriptor to retrieve the appropriate state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464275 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -45,6 +45,12 @@ private final ReducingStateDescriptor stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); + /** +* Used to preserve the fire timestamp before merge such that +* the corresponding timer could be cleared after merge +*/ + private Long cachedFireTimestamp = null; --- End diff -- This doesn't work because there is only one `Trigger` instance and this will potentially be overwritten by many Windows being merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465211 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null){ + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } --- End diff -- I think we're fine with not doing anything when `timestamp == null`. The old timer won't influence the newly merged window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84467266 --- Diff: flink-contrib/docker-flink/bluemix-docker-compose.sh --- @@ -0,0 +1,4 @@ +#!/bin/sh --- End diff -- Adding a license file here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465826 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); --- End diff -- The above looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2667: README.md - Description of the bluemix specif…
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2667 Thank you for your contribution! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84463868 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -99,8 +111,12 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84502118 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); + } + + /** +* Disconnects the job manager which is connected for the given job from the resource manager. +* +* @param jobId identifying the job whose leader shall be disconnected +*/ + protected void disconnectJobManager(JobID jobId, Exception cause) { + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); + + if (jobManagerRegistration != null) { + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", + jobManagerRegistration.getLeaderID(), + jobManagerRegistration.getJobManagerGateway().getAddress(), + jobId); + + JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + + // tell the job manager about the disconnect + jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + } else { + log.debug("There was no registered job manager for job {}.", jobId); + } + } + + /** +* Checks whether the given resource manager leader id is matching the current leader id. +* +* @param resourceManagerLeaderId to check +* @return True if the given leader id matches the actual leader id; otherwise false +*/ + protected boolean isValid(UUID resourceManagerLeaderId) { + if (resourceManagerLeaderId == null) { + return leaderSessionId == null; + } else { + return resourceManagerLeaderId.equals(leaderSessionId); + } + } + + protected void removeJob(JobID jobId) { + try { + jobLeaderIdService.removeJob(jobId); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not remove job " + jobId + '.', e)); --- End diff -- This should not easily fail (e.g. closing a connection to Zookeeper throws an exception). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); --- End diff -- We probably want to set this to null again to work with the `isValid` method (if we want to support null values for UUIDs). I would rather not allow null values at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); --- End diff -- Declining seems ok in this case since the failure might be temporary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500271 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); --- End diff -- Should this be logged on `error` level? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2651: [FLINK-4847] Let RpcEndpoint.start/shutDown throw excepti...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2651 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500048 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + --- End diff -- Actually, this might happen when the leader id service fails to start. It could be temporary and we might have to introduce some sort of retry rule here. Not in the scope of this PR though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501742 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); + } + + /** +* Disconnects the job manager which is connected for the given job from the resource manager. +* +* @param jobId identifying the job whose leader shall be disconnected +*/ + protected void disconnectJobManager(JobID jobId, Exception cause) { + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); + + if (jobManagerRegistration != null) { + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", + jobManagerRegistration.getLeaderID(), + jobManagerRegistration.getJobManagerGateway().getAddress(), + jobId); + + JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + + // tell the job manager about the disconnect + jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + } else { + log.debug("There was no registered job manager for job {}.", jobId); + } + } + + /** +* Checks whether the given resource manager leader id is matching the current leader id. +* +* @param resourceManagerLeaderId to check +* @return True if the given leader id matches the actual leader id; otherwise false +*/ + protected boolean isValid(UUID resourceManagerLeaderId) { + if (resourceManagerLeaderId == null) { + return leaderSessionId == null; --- End diff -- Should `null` always return `false` if we assume that we use a default UUID in non high availability mode? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); - return FlinkCompletableFuture.completed( - new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); + log.debug("Could not obtain the job leader id future to verify the correct job leader."); + return FlinkCompletableFuture.completedExceptionally(exception); } - leaderListeners.put(jobID, jobIdLeaderListener); - } + Future jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class); -
[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84507291 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Testing fatal error handler which records the occurred exceptions during the execution of the + * tests. Captured exceptions are thrown as a {@link TestingException}. + */ +public class TestingFatalErrorHandler implements FatalErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); + private final AtomicReference atomicThrowable; + + public TestingFatalErrorHandler() { + atomicThrowable = new AtomicReference<>(null); + } + + public void rethrowError() throws TestingException { + Throwable throwable = atomicThrowable.get(); + + if (throwable != null) { + throw new TestingException(throwable); + } + } + + public boolean hasExceptionOccurred() { + return atomicThrowable.get() != null; + } + + public Throwable getException() { + return atomicThrowable.get(); + } + + @Override + public void onFatalError(Throwable exception) { + LOG.error("OnFatalError:", exception); + + if (!atomicThrowable.compareAndSet(null, exception)) { + atomicThrowable.get().addSuppressed(exception); + } --- End diff -- Oh, I didn't know you could do that. That's neat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84506385 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java --- @@ -145,6 +171,10 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { Future declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); + + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } --- End diff -- This seems like a lot of boilerplate that we could abstract using a base testing class for ResourceManager tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84509754 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String bindAddress) { this.commonBindAddress = bindAddress; } + public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { + checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); + this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + } + // // getters // public Configuration getConfiguration() { + // update the memory in case that we've changed the number of components (TM, RM, JM) + long memory = calculateManagedMemoryPerTaskManager(); --- End diff -- Getters should usually not perform any calculation. How about changing the method name to `updateConfiguration()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84508885 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -162,4 +180,63 @@ public String toString() { ", config=" + config + '}'; } + + /** +* Calculate the managed memory per task manager. The memory is calculated in the following +* order: +* +* 1. Return {@link #managedMemoryPerTaskManager} if set +* 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set +* 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and +* calculate the managed memory from the share of memory for a single task manager. +* +* @return +*/ + private long calculateManagedMemoryPerTaskManager() { --- End diff -- `getOrCalculateManagedMemoryPerTaskManager`? ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84509867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String bindAddress) { this.commonBindAddress = bindAddress; } + public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { + checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); + this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + } + // // getters // public Configuration getConfiguration() { + // update the memory in case that we've changed the number of components (TM, RM, JM) + long memory = calculateManagedMemoryPerTaskManager(); --- End diff -- After this method has been called, you can't change the memory configuration anymore because the config value will prevent new calculation in `calculateManagedMemoryPerTaskManager`. Is that desired? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2671: [FLINK-4862] fix Timer register in ContinuousEventTimeTri...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2671 Thanks for having a look @aljoscha. Thanks again for the PR @manuzhang. Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84689568 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -99,8 +111,12 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } --- End diff -- Yes, you're right. It is actually handled correctly in `EventTimeTrigger` but not for the continuous trigger. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84689078 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); - return FlinkCompletableFuture.completed( - new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); + log.debug("Could not obtain the job leader id future to verify the correct job leader."); + return FlinkCompletableFuture.completedExceptionally(exception); } - leaderListeners.put(jobID, jobIdLeaderListener); - } + Future jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class); -
[GitHub] flink pull request #2692: [FLINK-4913][yarn] include user jar in system clas...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2692 [FLINK-4913][yarn] include user jar in system class loader When deploying a Yarn cluster for a single job, this change pre-configures the cluster to include the user jar(s) on all nodes. This eliminates the need to upload jar files through the BlobClient. More importantly, it loads the user classes only once and not on every instantiation of a Task. This also reduces the JobManager class loading upon recovery of a failed job. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4913 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2692.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2692 commit bfe0f1a1b5cfee200ff76e4b988fe43613c8d0c0 Author: Maximilian Michels Date: 2016-10-25T14:16:52Z [FLINK-4913][yarn] include user jar in system class loader When deploying a Yarn cluster for a single job, this change pre-configures the cluster to include the user jar(s) on all nodes. This eliminates the need to upload jar files through the BlobClient. More importantly, it loads the user classes only once and not on every instantiation of a Task. This also reduces the JobManager class loading upon recovery of a failed job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2425 @vijikarthi I haven't forgotten about your PR. Thanks for the feedback. I'll get back to you today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2692: [FLINK-4913][yarn] include user jar in system class loade...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2692 Thanks for checking out the changes. Yes, it is handled correctly. Indeed, I started off just thinking about the user-provided jar but then discovered that the `PackagedProgram` extracts nested jars within that jar. The code handles that transparently in the sense that the shipped user jars are always represented by a list of URLs. Either a singleton list or multiple entries in case of nested jars. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r85175840 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -108,6 +111,11 @@ private final Options ALL_OPTIONS; + private static final String fileName = "yarn-app.ini"; + private static final String cookieKey = "secureCookie"; --- End diff -- I think the ini file format is actually fine. Could also be JSON but I don't mind. To not break backwards-compatibility, I think we have to keep the behavior to use the last-used application id in case none is supplied. We could have an extra config entry for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2425 CC @uce to check out the network layer changes. This is a very sensitive and performance critical part of Flink. We should be very sure nothing breaks it with the changes. @vijikarthi Please have a look at the null checks in the network code. I would replace them with `checkNotNull` and never pass any null values in there. It would be desirable that turned off security doesn't have any overhead with the security support built in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: [FLINK-4800] Refactor the ContinuousFileMonitoringFunctio...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2618 Looks good @kl0u. Could you push the rebased version? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2692: [FLINK-4913][yarn] include user jar in system class loade...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2692 Pushed a minor change to check which jars are required in case the cluster is used to execute multiple jobs with different dependencies. Merigng after a local Travis run. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: [FLINK-4800] Refactor the ContinuousFileMonitoringFunctio...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2618 Merged. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2718: [hotfix] Fixes the TimestampedInputSplit.EOS comparison.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2718 I think we should get rid of the magic `EOF` split. We should simply set a flag in the reader to stop reading. Until then, this looks like a good fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2718: [hotfix] Fixes the TimestampedInputSplit.EOS compa...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2718#discussion_r85523016 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java --- @@ -90,4 +112,60 @@ public void testIllegalArgument() { } } } + + @Test + public void testPriorityQ() { + TimestampedFileInputSplit richFirstSplit = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit richSecondSplit = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit richThirdSplit = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit richForthSplit = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + TimestampedFileInputSplit richFifthSplit = + new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null); + + TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS; + + Queue pendingSplits = new PriorityQueue<>(10, new Comparator() { + @Override + public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) { + return o1.compareTo(o2); + } + }); --- End diff -- `TimestampedFileInputSplit` is already `Comparable`. So you can skip the Comparator here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2732 [FLINK-4272] Create a JobClient for job control and monitoring Also includes: [FLINK-4274] Expose new JobClient in the DataSet/DataStream API - rename JobClient class to JobClientActorUtils - introduce JobClient interface with two implementations - JobClientEager: starts an actor system right away and monitors the job - Move ClusterClient#cancel, ClusterClient#stop, ClusterClient#getAccumulators to JobClient - JobClientLazy: starts an actor system when requests are made by encapsulating the eager job client - Java and Scala API - JobClient integration - introduce ExecutionEnvironment#executeWithControl() - introduce StreamExecutionEnvironment#executeWithControl() - report errors during job execution as JobExecutionException instead of ProgramInvocationException and adapt test cases - provide finalizers to run code upon shutdown of client - use ActorGateway in JobListeningContext - add test case for JobClient implementations You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4272 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2732.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2732 commit a3f5cc00ec6959ca662ca925918816b4c8d984cd Author: Maximilian Michels Date: 2016-08-21T13:25:02Z [FLINK-4272] Create a JobClient for job control and monitoring Also includes: [FLINK-4274] Expose new JobClient in the DataSet/DataStream API - rename JobClient class to JobClientActorUtils - introduce JobClient interface with two implementations - JobClientEager: starts an actor system right away and monitors the job - Move ClusterClient#cancel, ClusterClient#stop, ClusterClient#getAccumulators to JobClient - JobClientLazy: starts an actor system when requests are made by encapsulating the eager job client - Java and Scala API - JobClient integration - introduce ExecutionEnvironment#executeWithControl() - introduce StreamExecutionEnvironment#executeWithControl() - report errors during job execution as JobExecutionException instead of ProgramInvocationException and adapt test cases - provide finalizers to run code upon shutdown of client - use ActorGateway in JobListeningContext - add test case for JobClient implementations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2732 Rebased to the latest changes on the master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2732 CC @rmetzger @aljoscha Could you take a look at the changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2734: Keytab & TLS support for Flink on Mesos Setup
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2734 Thanks for your PR @vijikarthi! I'll try to check it out as soon as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2741: [FLINK-4998][yarn] fail if too many task slots are...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2741 [FLINK-4998][yarn] fail if too many task slots are configured This fails the deployment of the Yarn application if the number of task slots are configured to be larger than the maximum virtual cores of the Yarn cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2741.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2741 commit 35c4ad3cb086abe6fa85c5755daa8a83fbdfbf56 Author: Maximilian Michels Date: 2016-11-02T15:37:56Z [FLINK-4998][yarn] fail if too many task slots are configured This fails the deployment of the Yarn application if the number of task slots are configured to be larger than the maximum virtual cores of the Yarn cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2741: [FLINK-4998][yarn] fail if too many task slots are config...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2741 Added a test case to verify the error reporting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2745: [yarn] fix debug string displayed for failed appli...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2745 [yarn] fix debug string displayed for failed applications Merging for `master` and `release-1.1`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2745.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2745 commit a285002c8b651b50c54d3660daf4e2a3994e11c4 Author: Maximilian Michels Date: 2016-11-01T10:02:39Z [yarn] fix debug string displayed for failed applications --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86323404 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -101,6 +102,14 @@ public void run() { final byte[] buffer = new byte[BUFFER_SIZE]; while (true) { + + int keyLength = inputStream.read(); --- End diff -- Here the cookie length is limited to one byte. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86321008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -64,26 +64,25 @@ static final String NO_SECURE_COOKIE = ""; - static final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); - abstract ByteBuf write(ByteBufAllocator allocator) throws Exception; abstract void readFrom(ByteBuf buffer) throws Exception; // private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie) { - return allocateBuffer(allocator, id, secureCookie, 0); + return allocateBuffer(allocator, id, 0, secureCookie); } - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie, int length) { + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length, String secureCookie) { + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); secureCookie = secureCookie == null ? "": secureCookie; --- End diff -- Simpler: ```java if (secureCookie == null) { secureCookie = NO_SECURE_COOKIE; } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86322286 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -120,6 +136,15 @@ static LengthFieldBasedFrameDecoder createFrameLengthDecoder() { @ChannelHandler.Sharable static class NettyMessageDecoder extends MessageToMessageDecoder { + private static final Logger LOG = LoggerFactory.getLogger(NettyMessageDecoder.class); + + final byte[] secureCookie; + + public NettyMessageDecoder(String secureCookie) { + secureCookie = secureCookie == null ? "": secureCookie; --- End diff -- Should be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86321320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -64,26 +64,25 @@ static final String NO_SECURE_COOKIE = ""; - static final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); - abstract ByteBuf write(ByteBufAllocator allocator) throws Exception; abstract void readFrom(ByteBuf buffer) throws Exception; // private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie) { - return allocateBuffer(allocator, id, secureCookie, 0); + return allocateBuffer(allocator, id, 0, secureCookie); } - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie, int length) { + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length, String secureCookie) { + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); secureCookie = secureCookie == null ? "": secureCookie; --- End diff -- Still think we should never pass a null cookie. Then the check wouldn't be necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86324769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -54,24 +58,36 @@ // constructor in order to work with the generic deserializer. // - static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1) + static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), magic number (4), Cookie (4), msg ID (1) static final int MAGIC_NUMBER = 0xBADC0FFE; + static final String NO_SECURE_COOKIE = ""; + abstract ByteBuf write(ByteBufAllocator allocator) throws Exception; abstract void readFrom(ByteBuf buffer) throws Exception; // - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) { - return allocateBuffer(allocator, id, 0); + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, String secureCookie) { + return allocateBuffer(allocator, id, 0, secureCookie); } - private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) { + private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length, String secureCookie) { + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + secureCookie = secureCookie == null ? "": secureCookie; --- End diff -- Should be removed in favor of never passing a null value here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86340045 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -788,75 +719,125 @@ private void logAndSysout(String message) { } } - public static File getYarnPropertiesLocation(Configuration conf) { - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = - conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - - return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); + public static File getYarnPropertiesLocation() { + String path = System.getProperty("user.home") + File.separator + YARN_APP_INI; + File stateFile; + try { + stateFile = new File(path); + if(!stateFile.exists()) { + stateFile.createNewFile(); + } + } catch(IOException e) { + throw new RuntimeException(e); + } + return stateFile; } - public static void persistAppState(String appId, String cookie) { - if(appId == null || cookie == null) { - return; + public static void persistAppState(YarnAppState appState) { + + final String appId = appState.getApplicationId(); + final String parallelism = appState.getParallelism(); + final String dynaProps = appState.getDynamicProperties(); + final String cookie = appState.getCookie(); + + if(appId == null) { + throw new RuntimeException("Missing application ID from Yarn application state"); } - String path = System.getProperty("user.home") + File.separator + fileName; - LOG.debug("Going to persist cookie for the appID: {} in {} ", appId, path); + + String path = getYarnPropertiesLocation().getAbsolutePath(); + + LOG.debug("Going to persist Yarn application state: {} in {}", appState,path); + try { - File f = new File(path); - if(!f.exists()) { - f.createNewFile(); - } HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); - if (subNode.containsKey(cookieKey)) { - String errorMessage = "Secure Cookie is already found in "+ path + " for the appID: "+ appId; - LOG.error(errorMessage); - throw new RuntimeException(errorMessage); + if(!subNode.isEmpty()) { + throw new RuntimeException("Application with ID " + appId + "already exists"); } - subNode.addProperty(cookieKey, cookie); + + subNode.addProperty(YARN_PROPERTIES_PARALLELISM, parallelism); + subNode.addProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynaProps); + subNode.addProperty(YARN_PROPERTIES_SECURE_COOKIE, cookie); + + //update latest entry section with the most recent APP Id + config.clearTree(YARN_LATEST_ENTRY_SECTION_NAME); + SubnodeConfiguration activeAppSection = config.getSection(YARN_LATEST_ENTRY_SECTION_NAME); + activeAppSection.addProperty(YARN_APPLICATION_ID_KEY, appId); + config.save(); - LOG.debug("Persisted cookie for the appID: {}", appId); + LOG.debug("Persisted Yarn App state: {}", appState); } catch(Exception e) { - LOG.error("Exception occurred while persisting app state for app id: {}", appId, e); throw new RuntimeException(e); } } - public static String getAppSecureCookie(String appId) { + public static YarnAppState retrieveMostRecentYarnApp() { + String path = getYarnPropertiesLocation().getAbsolutePath(); + LOG.debug("Going to fetch app state from {}", path); + try { + HierarchicalINIConfiguration conf
[GitHub] flink pull request #2749: [FLINK-3813][yarn] wait for CLI to complete before...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2749 [FLINK-3813][yarn] wait for CLI to complete before checking output You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3813 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2749 commit e747d94215792ac68fde871226b49d6322a76781 Author: Maximilian Michels Date: 2016-11-03T14:11:10Z [FLINK-3813][yarn] wait for CLI to complete before checking output --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2425 Thank you for the changes. I wonder, could we remove the cookie header completely for Netty or the BlobServer in case the authorization is turned off? The Netty protocol has a `MAGIC_NUMBER` which is checked when decoding the message. We could use a different "magic number" to check whether we use the normal or the cookie-based Netty protocol. This would eliminate all the overhead of the cookie transmission. Furthermore, we should strip the cookie from the message once we have verified it is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2708: [FLINK-4946] [scripts] Load jar files from subdire...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2708#discussion_r86545553 --- Diff: flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh --- @@ -35,14 +33,15 @@ JVM_ARGS="$JVM_ARGS -Xmx512m" # Flink CLI client constructCLIClientClassPath() { - for jarfile in $FLINK_LIB_DIR/*.jar ; do - if [[ $CC_CLASSPATH = "" ]]; then - CC_CLASSPATH=$jarfile; - else - CC_CLASSPATH=$CC_CLASSPATH:$jarfile - fi - done - echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS +while read -d '' -r jarfile ; do +if [[ $FLINK_CLASSPATH = "" ]]; then +CC_CLASSPATH="$jarfile"; +else +CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" +fi +done < <(find "$FLINK_LIB_DIR" -name '*.jar' -print0) + +echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS --- End diff -- Recursing over the lib folder for uploading is not necessary because Yarn already recursively uploads it into the home directory using `FileUtil#copyFromLocalFile`. We just have to make sure to generate the classpath including all subdirectories. The easiest fix would be to use `lib/**` instead of `lib/*` which would be a one-line change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2703: [FLINK-4900] flink-master: Allow to deploy TM with contai...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2703 Thanks for the PR! Jenkins CI is currently broken but Travis CI passed. Looks good to me but would be great if you could take a look @EronWright. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2708: [FLINK-4946] [scripts] Load jar files from subdirectories...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2708 @greghogan It would be helpful to look at the Yarn container files which include the container environment variables. This would help us to figure out why the recursive `**` is not working. Another solution would be to recurse into directories and add the files to the classpath one-by-one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3086: Improve docker setup
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3086#discussion_r95410892 --- Diff: flink-contrib/docker-flink/Dockerfile --- @@ -22,9 +22,9 @@ FROM java:8-jre-alpine RUN apk add --no-cache bash snappy # Configure Flink version -ENV FLINK_VERSION=1.1.1 -ENV HADOOP_VERSION=27 -ENV SCALA_VERSION=2.11 +ARG FLINK_VERSION=1.1.3 --- End diff -- `ARG` is only available in newer versions of Docker. If we want to maintain backwards-compatibility, we should adjust the README to state `docker build --env FLINK_VERSION=1.0.3`. As far as I know, we don't gain anything by using `ARG`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3086: Improve docker setup
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3086#discussion_r95411497 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -36,9 +39,9 @@ elif [ "$1" == "taskmanager" ]; then echo "Starting Task Manager" echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml $FLINK_HOME/bin/taskmanager.sh start + + # prevent script to exit + tail -f /dev/null else $@ --- End diff -- @greghogan Seems like a way to execute an arbitrary command passed inside the Docker container passed as an argument to `docker run `. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3086: Improve docker setup
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3086#discussion_r95411011 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -28,6 +28,9 @@ if [ "$1" == "jobmanager" ]; then echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml $FLINK_HOME/bin/jobmanager.sh start cluster + + # prevent script to exit + tail -f /dev/null --- End diff -- I think the proper way to fix this, would be to call a non-daemonized startup script. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---