[jira] [Commented] (FLINK-2810) Warn user if bc not installed
[ https://issues.apache.org/jira/browse/FLINK-2810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943704#comment-14943704 ] ASF GitHub Bot commented on FLINK-2810: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1228#discussion_r41172866 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -73,6 +73,13 @@ if [[ $STARTSTOP == "start" ]]; then TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) else +# Bash only performs integer arithmetic so floating point computation is performed using bc +BC_PATH=`command -v bc` +if [[ $? -eq 1 || ! -f $BC_PATH ]]; then --- End diff -- That should be sufficient. Thanks. > Warn user if bc not installed > - > > Key: FLINK-2810 > URL: https://issues.apache.org/jira/browse/FLINK-2810 > Project: Flink > Issue Type: Improvement > Components: Command-line client >Affects Versions: 0.10 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 0.10 > > > taskmanager.sh will print the following message when starting the cluster if > bc is not installed and off-heap memory is enabled and configured as a ratio. > The script should first check that bc is installed and otherwise print a > specific message. > {noformat} > [ERROR] Configured TaskManager managed memory fraction is not a valid value. > Please set 'taskmanager.memory.fraction' in flink-conf.yaml > {noformat} > An example of a distribution where bc is not installed by default are the > Debian images for Google Compute Engine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145617179 I think alphabetically is better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145620411 I was thinking more about the actual pixel length but achieving that is proving to be kind of hard. Alphabetically should be okay I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2811) Add page with configuration overview
[ https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943780#comment-14943780 ] ASF GitHub Bot commented on FLINK-2811: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145620411 I was thinking more about the actual pixel length but achieving that is proving to be kind of hard. Alphabetically should be okay I think. > Add page with configuration overview > > > Key: FLINK-2811 > URL: https://issues.apache.org/jira/browse/FLINK-2811 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 0.10 > > > The old web interface contained a page to view the configuration of the > JobManager. > This issue is about adding the page again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943852#comment-14943852 ] Chengxuan Wang commented on FLINK-2066: --- Hi, Maximilian, Do I need to make change based on 0.10? I think I made change based on 0.9. > Make delay between execution retries configurable > - > > Key: FLINK-2066 > URL: https://issues.apache.org/jira/browse/FLINK-2066 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Nuno Miguel Marques dos Santos >Priority: Blocker > Labels: starter > Fix For: 0.10 > > > Flink allows to specify a delay between execution retries. This helps to let > some external failure causes fully manifest themselves before the restart is > attempted. > The delay is currently defined only system wide. > We should add it to the {{ExecutionConfig}} of a job to allow per-job > specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode
[ https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943860#comment-14943860 ] ASF GitHub Bot commented on FLINK-2797: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1214#issuecomment-145635600 @uce , I have pushed a commit with my fix for disabling eager execution in detached mode. Please have a look. If it's okay, I can go ahead with adding docs for it. > CLI: Missing option to submit jobs in detached mode > --- > > Key: FLINK-2797 > URL: https://issues.apache.org/jira/browse/FLINK-2797 > Project: Flink > Issue Type: Bug > Components: Command-line client >Affects Versions: 0.9, 0.10 >Reporter: Maximilian Michels >Assignee: Sachin Goel > Fix For: 0.10 > > > Jobs can only be submitted in detached mode using YARN but not on a > standalone installation. This has been requested by users who want to submit > a job, get the job id, and later query its status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2767) Add support Scala 2.11 to Scala shell
[ https://issues.apache.org/jira/browse/FLINK-2767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943701#comment-14943701 ] ASF GitHub Bot commented on FLINK-2767: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1197#issuecomment-145606437 It occured by jline library version mismatching between Scala 2.10 and 2.11. Since Scala 2.11.5, we don't need jline dependency. Scala REPL uses its own jline library. So I upgraded Scala minor version to 2.11.7 and moved jline to Scala 2.10 only dependencies list. > Add support Scala 2.11 to Scala shell > - > > Key: FLINK-2767 > URL: https://issues.apache.org/jira/browse/FLINK-2767 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 0.10 >Reporter: Chiwan Park >Assignee: Chiwan Park > > Since FLINK-2200 is resolved, the Flink community provides JARs for Scala > 2.11. But currently, there is no Scala shell with Scala 2.11. If we add > support Scala 2.11 to Scala shell, the user with Scala 2.11 could use Flink > easily. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2741) Use single log statement in TestLogger
[ https://issues.apache.org/jira/browse/FLINK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943816#comment-14943816 ] ASF GitHub Bot commented on FLINK-2741: --- Github user rerngvit commented on the pull request: https://github.com/apache/flink/pull/1221#issuecomment-145629889 Thanks all for suggestions. > Use single log statement in TestLogger > -- > > Key: FLINK-2741 > URL: https://issues.apache.org/jira/browse/FLINK-2741 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: rerngvit yanggratoke >Priority: Trivial > > {{TestLogger}} prints log statements before and after tests. Currently this > is done via multiple {{log.info}} statements. Sometimes this leads to > interleaved output with failure stack traces. > I would like to change it to a single statements with new lines: > {code} > 17:30:31,887 ERROR A - - > 17:30:31,891 INFO B - Shutting down remote daemon. > 17:30:31,895 ERROR A - Test testJobManagerCleanUp(A) failed with: > ... > 17:30:31,909 ERROR A - = > {code} > to > {code} > 17:30:31,891 INFO B - Shutting down remote daemon. > 17:30:31,887 ERROR A - > - > Test testJobManagerCleanUp(A) failed with: > ... > = > {code} > Any opinions? Does this improve readability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2741] - Use single log statement in Tes...
Github user rerngvit commented on the pull request: https://github.com/apache/flink/pull/1221#issuecomment-145629889 Thanks all for suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2811) Add page with configuration overview
[ https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943848#comment-14943848 ] ASF GitHub Bot commented on FLINK-2811: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145633502 Yes. You're right. I'd misunderstood Stephan's comment. The goal is to group similar keys together, and sorting alphabetically will definitely take care of that. On a separate note, does it make sense to have this type of structure: *Job Manager* jobmanager.* *Akka* akka. *Recovery* ha.* and so on. Of course, if no keys are available under a group, it won't be displayed at all. Or we can just leave it at sorted alphabetically. Your call. > Add page with configuration overview > > > Key: FLINK-2811 > URL: https://issues.apache.org/jira/browse/FLINK-2811 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 0.10 > > > The old web interface contained a page to view the configuration of the > JobManager. > This issue is about adding the page again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145633502 Yes. You're right. I'd misunderstood Stephan's comment. The goal is to group similar keys together, and sorting alphabetically will definitely take care of that. On a separate note, does it make sense to have this type of structure: *Job Manager* jobmanager.* *Akka* akka. *Recovery* ha.* and so on. Of course, if no keys are available under a group, it won't be displayed at all. Or we can just leave it at sorted alphabetically. Your call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1214#issuecomment-145635600 @uce , I have pushed a commit with my fix for disabling eager execution in detached mode. Please have a look. If it's okay, I can go ahead with adding docs for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2810] Warn user if bc not installed
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1228#discussion_r41172866 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -73,6 +73,13 @@ if [[ $STARTSTOP == "start" ]]; then TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) else +# Bash only performs integer arithmetic so floating point computation is performed using bc +BC_PATH=`command -v bc` +if [[ $? -eq 1 || ! -f $BC_PATH ]]; then --- End diff -- That should be sufficient. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2811) Add page with configuration overview
[ https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943761#comment-14943761 ] ASF GitHub Bot commented on FLINK-2811: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145617179 I think alphabetically is better. > Add page with configuration overview > > > Key: FLINK-2811 > URL: https://issues.apache.org/jira/browse/FLINK-2811 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 0.10 > > > The old web interface contained a page to view the configuration of the > JobManager. > This issue is about adding the page again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145630566 I see. But pixel length or number of characters does not make it easier to navigate when you want to look up a specific key, e.g. `jobmanager.X.Y`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2811) Add page with configuration overview
[ https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943822#comment-14943822 ] ASF GitHub Bot commented on FLINK-2811: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145630566 I see. But pixel length or number of characters does not make it easier to navigate when you want to look up a specific key, e.g. `jobmanager.X.Y`. > Add page with configuration overview > > > Key: FLINK-2811 > URL: https://issues.apache.org/jira/browse/FLINK-2811 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 0.10 > > > The old web interface contained a page to view the configuration of the > JobManager. > This issue is about adding the page again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2767] [scala shell] Add Scala 2.11 supp...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1197#issuecomment-145606437 It occured by jline library version mismatching between Scala 2.10 and 2.11. Since Scala 2.11.5, we don't need jline dependency. Scala REPL uses its own jline library. So I upgraded Scala minor version to 2.11.7 and moved jline to Scala 2.10 only dependencies list. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2815] [REFACTOR] Remove Pact from class...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1218#issuecomment-145740343 Updated based on review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2815) [REFACTOR] Remove Pact from class and file names since it is no longer valid reference
[ https://issues.apache.org/jira/browse/FLINK-2815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944491#comment-14944491 ] ASF GitHub Bot commented on FLINK-2815: --- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1218#issuecomment-145740343 Updated based on review. > [REFACTOR] Remove Pact from class and file names since it is no longer valid > reference > -- > > Key: FLINK-2815 > URL: https://issues.apache.org/jira/browse/FLINK-2815 > Project: Flink > Issue Type: Task >Reporter: Henry Saputra >Assignee: Henry Saputra >Priority: Minor > > Remove Pact word from class and file names in Apache Flink. > Pact was the name used in Stratosphere time to refer to concept of > distributed datasets (similar to Flink Dataset). > It was used when Pact and Nephele still separate concept. > As part of 0.10 cleanup effort, let's remove the Pact names to avoid > confusion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2066][core] Add delay between execution...
Github user WangCHX commented on the pull request: https://github.com/apache/flink/pull/1223#issuecomment-145740461 Thank you very much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944495#comment-14944495 ] ASF GitHub Bot commented on FLINK-2066: --- Github user WangCHX commented on the pull request: https://github.com/apache/flink/pull/1223#issuecomment-145740461 Thank you very much. > Make delay between execution retries configurable > - > > Key: FLINK-2066 > URL: https://issues.apache.org/jira/browse/FLINK-2066 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Nuno Miguel Marques dos Santos >Priority: Blocker > Labels: starter > Fix For: 0.10 > > > Flink allows to specify a delay between execution retries. This helps to let > some external failure causes fully manifest themselves before the restart is > attempted. > The delay is currently defined only system wide. > We should add it to the {{ExecutionConfig}} of a job to allow per-job > specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state
[ https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944008#comment-14944008 ] ASF GitHub Bot commented on FLINK-2283: --- Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/1155#issuecomment-145665200 @StephanEwen : Thanks for the heads-up. Now I have addressed the comments and would like to merge this tomorrow if there are no objections. > Make grouped reduce/fold/aggregations stateful using Partitioned state > -- > > Key: FLINK-2283 > URL: https://issues.apache.org/jira/browse/FLINK-2283 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Gyula Fora >Assignee: Márton Balassi >Priority: Minor > > Currently the inner state of the grouped aggregations are not persisted as an > operator state. > These operators should be reimplemented to use the newly introduced > partitioned state abstractions which will make them fault tolerant and > scalable for the future. > A suggested implementation would be to use a stateful mapper to implement the > desired behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2820) Configuration not passed to JobGraphGenerator
Greg Hogan created FLINK-2820: - Summary: Configuration not passed to JobGraphGenerator Key: FLINK-2820 URL: https://issues.apache.org/jira/browse/FLINK-2820 Project: Flink Issue Type: Bug Components: Command-line client Affects Versions: master Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor This was previously reported as FLINK-2625 (commit 8a84937215ea575fa94a00d11c2517902d252756). The Client class was concurrently refactored with FLINK-2097 (commit 71bf2f570861daae53b24bfcf1d06aedb85311b9). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/1155#issuecomment-145665200 @StephanEwen : Thanks for the heads-up. Now I have addressed the comments and would like to merge this tomorrow if there are no objections. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-2737) KeyedDataStream should not be a subclass of DataStream
[ https://issues.apache.org/jira/browse/FLINK-2737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-2737: --- Assignee: Aljoscha Krettek > KeyedDataStream should not be a subclass of DataStream > -- > > Key: FLINK-2737 > URL: https://issues.apache.org/jira/browse/FLINK-2737 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > Fix For: 0.10 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2786] Remove Spargel from source code a...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1229 [FLINK-2786] Remove Spargel from source code and update docs. I also ported 2 Spargel tests that we hadn't copied over to Gelly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink flink-2786 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1229.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1229 commit 4391e8abf121f5d8ba6b8bdcd7d9a811c4d67806 Author: vasiaDate: 2015-10-05T18:08:55Z [FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; Remove Beta badge from Gelly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2819) Add Windowed Join/CoGroup Operator Based on Tagged Union
Aljoscha Krettek created FLINK-2819: --- Summary: Add Windowed Join/CoGroup Operator Based on Tagged Union Key: FLINK-2819 URL: https://issues.apache.org/jira/browse/FLINK-2819 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 0.10 This will add a Join/CoGroup operation that reuses the new windowing code. The implementation should be similar to how a join can be implemented on MapReduce using tags for the two input side and then pulling them apart again in the reduce operation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942997#comment-14942997 ] ASF GitHub Bot commented on FLINK-2354: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41115679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/job-id 1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/job-id N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Path of the shared count */ + private final String counterPath; + + /** Curator recipe for shared counts */ + private final SharedCount sharedCount; + + /** Connection state listener to monitor the client connection */ + private final SharedCountConnectionStateListener connStateListener = + new SharedCountConnectionStateListener(); + + /** +* Creates a {@link ZooKeeperCheckpointIDCounter} instance. +* +* @param client Curator ZooKeeper client +* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job. +* @throws Exception +*/ + public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception { + this.client = checkNotNull(client, "Curator client"); + this.counterPath = checkNotNull(counterPath, "Counter path"); + this.sharedCount = new SharedCount(client, counterPath, 1); + } + + @Override + public void start() throws Exception { + sharedCount.start(); + client.getConnectionStateListenable().addListener(connStateListener); + } + + @Override + public void stop() throws Exception { + sharedCount.close(); + client.getConnectionStateListenable().removeListener(connStateListener); + + LOG.info("Removing {} from ZooKeeper", counterPath); + client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath); + } + + @Override + public long getAndIncrement() throws Exception { + while (true) { + ConnectionState connState = connStateListener.getLastState(); + + if (connState != null) { + throw new IllegalStateException("Connection state: " + connState); + } + + VersionedValue current = sharedCount.getVersionedValue(); + + Integer
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41115679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/job-id 1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/job-id N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Path of the shared count */ + private final String counterPath; + + /** Curator recipe for shared counts */ + private final SharedCount sharedCount; + + /** Connection state listener to monitor the client connection */ + private final SharedCountConnectionStateListener connStateListener = + new SharedCountConnectionStateListener(); + + /** +* Creates a {@link ZooKeeperCheckpointIDCounter} instance. +* +* @param client Curator ZooKeeper client +* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job. +* @throws Exception +*/ + public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception { + this.client = checkNotNull(client, "Curator client"); + this.counterPath = checkNotNull(counterPath, "Counter path"); + this.sharedCount = new SharedCount(client, counterPath, 1); + } + + @Override + public void start() throws Exception { + sharedCount.start(); + client.getConnectionStateListenable().addListener(connStateListener); + } + + @Override + public void stop() throws Exception { + sharedCount.close(); + client.getConnectionStateListenable().removeListener(connStateListener); + + LOG.info("Removing {} from ZooKeeper", counterPath); + client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath); + } + + @Override + public long getAndIncrement() throws Exception { + while (true) { + ConnectionState connState = connStateListener.getLastState(); + + if (connState != null) { + throw new IllegalStateException("Connection state: " + connState); + } + + VersionedValue current = sharedCount.getVersionedValue(); + + Integer newCount = current.getValue() + 1; + + if (sharedCount.trySetCount(current, newCount)) { + return current.getValue(); + } + } + } + +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41115736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java --- @@ -19,29 +19,28 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; import java.util.List; /** * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) * and that is considered completed. */ -public class SuccessfulCheckpoint { - - private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class); - +public class SuccessfulCheckpoint implements Serializable { + + private static final long serialVersionUID = -8360248179615702014L; + private final JobID job; private final long checkpointID; private final long timestamp; - private final List states; - + private final ArrayList states; --- End diff -- Because of Serializbility. SuccessfulCheckpoint has been changed to be Serializable. I figured this is OK, because it is an "internal" interface and the only usage creates and instance with ArrayList. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942998#comment-14942998 ] ASF GitHub Bot commented on FLINK-2354: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41115736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java --- @@ -19,29 +19,28 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; import java.util.List; /** * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) * and that is considered completed. */ -public class SuccessfulCheckpoint { - - private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class); - +public class SuccessfulCheckpoint implements Serializable { + + private static final long serialVersionUID = -8360248179615702014L; + private final JobID job; private final long checkpointID; private final long timestamp; - private final List states; - + private final ArrayList states; --- End diff -- Because of Serializbility. SuccessfulCheckpoint has been changed to be Serializable. I figured this is OK, because it is an "internal" interface and the only usage creates and instance with ArrayList. > Recover running jobs on JobManager failure > -- > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 0.10 > > > tl;dr Persist JobGraphs in state backend and coordinate reference to state > handle via ZooKeeper. > Problem: When running multiple JobManagers in high availability mode, the > leading job manager looses all running jobs when it fails. After a new > leading job manager is elected, it is not possible to recover any previously > running jobs. > Solution: The leading job manager, which receives the job graph writes 1) the > job graph to a state backend, and 2) a reference to the respective state > handle to ZooKeeper. In general, job graphs can become large (multiple MBs, > because they include closures etc.). ZooKeeper is not designed for data of > this size. The level of indirection via the reference to the state backend > keeps the data in ZooKeeper small. > Proposed ZooKeeper layout: > /flink (default) > +- currentJobs >+- job id i > +- state handle reference of job graph i > The 'currentJobs' node needs to be persistent to allow recovery of jobs > between job managers. The currentJobs node needs to satisfy the following > invariant: There is a reference to a job graph with id i IFF the respective > job graph needs to be recovered by a newly elected job manager leader. > With this in place, jobs will be recovered from their initial state (as if > resubmitted). The next step is to backup the runtime state handles of > checkpoints in a similar manner. > --- > This work will be based on [~trohrm...@apache.org]'s implementation of > FLINK-2291. The leader election service notifies the job manager about > granted/revoked leadership. This notification happens via Akka and thus > serially *per* job manager, but results in eventually consistent state > between job managers. For some snapshots of time it is possible to have a new > leader granted leadership, before the old one has been revoked its leadership. > [~trohrm...@apache.org], can you confirm that leadership does not guarantee > mutually exclusive access to the shared 'currentJobs' state? > For example, the following can happen: > - JM 1 is leader, JM 2 is standby > - JOB i is running (and hence /flink/currentJobs/i exists) > - ZK notifies leader election service (LES) of JM 1 and JM 2 > - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 > notification revoking leadership takes longer > - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives > final JobStatusChange > - JM 2 resubmits the job /flink/currentJobs/i > - JM 1 removes /flink/currentJobs/i, because it is now finished > => inconsistent state (wrt the specified invariant above) > If it is indeed a problem, we can circumvent this with a Curator recipe for > [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to > coordinate the access to currentJobs. The lock needs to be acquired on >
[GitHub] flink pull request: [FLINK-2741] - Use single log statement in Tes...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1221#discussion_r41115826 --- Diff: flink-core/src/test/java/org/apache/flink/util/TestLogger.java --- @@ -37,23 +37,26 @@ @Override public void starting(Description description) { - log.info(""); - log.info("Test {} is running.", description); - log.info(""); + log.info("" --- End diff -- I would a new line here before the line as well. Then we have everything together with the same indentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2741) Use single log statement in TestLogger
[ https://issues.apache.org/jira/browse/FLINK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942999#comment-14942999 ] ASF GitHub Bot commented on FLINK-2741: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1221#discussion_r41115826 --- Diff: flink-core/src/test/java/org/apache/flink/util/TestLogger.java --- @@ -37,23 +37,26 @@ @Override public void starting(Description description) { - log.info(""); - log.info("Test {} is running.", description); - log.info(""); + log.info("" --- End diff -- I would a new line here before the line as well. Then we have everything together with the same indentation. > Use single log statement in TestLogger > -- > > Key: FLINK-2741 > URL: https://issues.apache.org/jira/browse/FLINK-2741 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: rerngvit yanggratoke >Priority: Trivial > > {{TestLogger}} prints log statements before and after tests. Currently this > is done via multiple {{log.info}} statements. Sometimes this leads to > interleaved output with failure stack traces. > I would like to change it to a single statements with new lines: > {code} > 17:30:31,887 ERROR A - - > 17:30:31,891 INFO B - Shutting down remote daemon. > 17:30:31,895 ERROR A - Test testJobManagerCleanUp(A) failed with: > ... > 17:30:31,909 ERROR A - = > {code} > to > {code} > 17:30:31,891 INFO B - Shutting down remote daemon. > 17:30:31,887 ERROR A - > - > Test testJobManagerCleanUp(A) failed with: > ... > = > {code} > Any opinions? Does this improve readability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Flink 1745
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-145449146 Thanks for this impressive PR. A minor comment: could you edit the title of the PR to include more details than the issue ID (we can't do it because the ASF is managing the github account). It's hard to track the PRs otherwise. I would just go with the JIRA title: `[FLINK-1745] Add exact k-nearest-neighbours algorithm to machine learning library` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943004#comment-14943004 ] ASF GitHub Bot commented on FLINK-1745: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-145449146 Thanks for this impressive PR. A minor comment: could you edit the title of the PR to include more details than the issue ID (we can't do it because the ASF is managing the github account). It's hard to track the PRs otherwise. I would just go with the JIRA title: `[FLINK-1745] Add exact k-nearest-neighbours algorithm to machine learning library` > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel Blazevski > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145450115 I think TMs are only kept alive if their containers have been properly started. If the AM happens to die while the TM container are started up, I think they will be terminated as well. Another question is how did you kill the AM and what do you mean with "[...] restarting properly. But I think that's not the expected behavior"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943008#comment-14943008 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145450115 I think TMs are only kept alive if their containers have been properly started. If the AM happens to die while the TM container are started up, I think they will be terminated as well. Another question is how did you kill the AM and what do you mean with "[...] restarting properly. But I think that's not the expected behavior"? > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943014#comment-14943014 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145451351 Looks like a lot of work to figure out the different version behaviours. Good job and thanks for the clear explanation. :) I guess Robert meant with "not restarting properly" that the TMs were restarted as well. How does the way you kill the AM affect recovery? I will try this out later today. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145456411 I was just curious whether he killed them gracefully with a `PoisonPill` or via killing the JVM process. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943029#comment-14943029 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145456411 I was just curious whether he killed them gracefully with a `PoisonPill` or via killing the JVM process. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145458067 On the user end, a url '/config' makes more sense IMO. On the server end, `config` is already mapped to something else, so I chose a `/jmconfig` there. If you think `/jobManagerConfig` is better however, let me know. I will push a commit for that. On a related note, '/jobmanager/config' might be a better choice for url. Are there any job manager statistics which should be made available under a `/jobmanager` url? Running jobs and completed jobs are already there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2811) Add page with configuration overview
[ https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943030#comment-14943030 ] ASF GitHub Bot commented on FLINK-2811: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1219#issuecomment-145458067 On the user end, a url '/config' makes more sense IMO. On the server end, `config` is already mapped to something else, so I chose a `/jmconfig` there. If you think `/jobManagerConfig` is better however, let me know. I will push a commit for that. On a related note, '/jobmanager/config' might be a better choice for url. Are there any job manager statistics which should be made available under a `/jobmanager` url? Running jobs and completed jobs are already there. > Add page with configuration overview > > > Key: FLINK-2811 > URL: https://issues.apache.org/jira/browse/FLINK-2811 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 0.10 > > > The old web interface contained a page to view the configuration of the > JobManager. > This issue is about adding the page again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2806] [scala-api] Add a TypeInformation...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1212#issuecomment-145462515 Looks good to me. @tillrohrmann can you have a quick look? You recently did some work on the TypeInformation and TypeSerializers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2642] [table] Scala Table API crashes w...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1209#issuecomment-145462685 @twalthr Please go ahead and merge it. :smile: Then we can close the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2730) Add CPU/Network utilization graphs to new web dashboard
[ https://issues.apache.org/jira/browse/FLINK-2730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943067#comment-14943067 ] Sachin Goel commented on FLINK-2730: Of course the charts are updated in real time. Here's an example: http://jsfiddle.net/sachingoel0101/4je935Lu/ > Add CPU/Network utilization graphs to new web dashboard > --- > > Key: FLINK-2730 > URL: https://issues.apache.org/jira/browse/FLINK-2730 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Sachin Goel > Fix For: 0.10 > > > The charts rendered in the previous dashboard should be added to the new web > dashboard. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2550) Rework DataStream API
[ https://issues.apache.org/jira/browse/FLINK-2550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-2550: Issue Type: Sub-task (was: Improvement) Parent: FLINK-2674 > Rework DataStream API > - > > Key: FLINK-2550 > URL: https://issues.apache.org/jira/browse/FLINK-2550 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 0.9 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > After discussions on the mailing list we arrived at a consensus to rework the > streaming API to make it more fool-proof and easier to use. The resulting > design document is available here: > https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2642) Scala Table API crashes when executing word count example
[ https://issues.apache.org/jira/browse/FLINK-2642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943063#comment-14943063 ] ASF GitHub Bot commented on FLINK-2642: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1209#issuecomment-145462685 @twalthr Please go ahead and merge it. :smile: Then we can close the issue. > Scala Table API crashes when executing word count example > - > > Key: FLINK-2642 > URL: https://issues.apache.org/jira/browse/FLINK-2642 > Project: Flink > Issue Type: Bug > Components: Table API > Environment: current master (0.10) >Reporter: Jonas Traub >Assignee: Timo Walther > > I tried to run the examples provided in the documentation of Flink's Table > API. Unfortunately, the Scala word count example provided in the > [documentation|https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html] > doesn't work and does not give a meaningful exception. > (Other examples work fine) > Here my code: > {code:java} > package org.apache.flink.examples.scala > import org.apache.flink.api.scala._ > import org.apache.flink.api.scala.table._ > object WordCount { > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > case class WC(word: String, count: Int) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > val expr = input.toTable > val result = expr.groupBy('word).select('word, 'count.sum as > 'count).toDataSet[WC] > result.print() > } > } > {code} > Here the thrown exception: > {code} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open(Configuration)' method > in class org.apache.flink.api.table.runtime.ExpressionSelectFunction caused > an exception: null > at > org.apache.flink.runtime.operators.RegularPactTask.openUserCode(RegularPactTask.java:1368) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.openTask(ChainedMapDriver.java:47) > at > org.apache.flink.runtime.operators.RegularPactTask.openChainedTasks(RegularPactTask.java:1408) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:142) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.table.codegen.IndentStringContext$$anonfun$j$2.apply(Indenter.scala:30) > at > org.apache.flink.api.table.codegen.IndentStringContext$$anonfun$j$2.apply(Indenter.scala:23) > at >
[jira] [Commented] (FLINK-2806) No TypeInfo for Scala's Nothing type
[ https://issues.apache.org/jira/browse/FLINK-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943062#comment-14943062 ] ASF GitHub Bot commented on FLINK-2806: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1212#issuecomment-145462515 Looks good to me. @tillrohrmann can you have a quick look? You recently did some work on the TypeInformation and TypeSerializers. > No TypeInfo for Scala's Nothing type > > > Key: FLINK-2806 > URL: https://issues.apache.org/jira/browse/FLINK-2806 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > When writing some generic code, I encountered a situation where I needed a > TypeInformation[Nothing]. Two problems prevent me from getting it: > 1. TypeInformationGen.mkTypeInfo doesn't return a real > TypeInformation[Nothing]. (It actually returns a casted null in that case.) > 2. The line > implicit def createTypeInformation[T]: TypeInformation[T] = macro > TypeUtils.createTypeInfo[T] > does not fire in some situations when it should, when T = Nothing. (I guess > this is a compiler bug.) > I will open a PR shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stream API Refactoring
Github user ktzoumas commented on a diff in the pull request: https://github.com/apache/flink/pull/1215#discussion_r41125302 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java --- @@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream dataStream, long maxWaitTime) { /** * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a -* {@link ConnectedDataStream}. +* {@link ConnectedStreams}. * * * For type safety the user needs to define the feedback type * * @param feedbackTypeString *String describing the type information of the feedback stream. -* @return A {@link ConnectedIterativeDataStream}. +* @return A {@link ConnectedIterativeDataStreams}. */ - public ConnectedIterativeDataStreamwithFeedbackType(String feedbackTypeString) { + public ConnectedIterativeDataStreams withFeedbackType(String feedbackTypeString) { return withFeedbackType(TypeInfoParser. parse(feedbackTypeString)); } /** * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a -* {@link ConnectedDataStream}. +* {@link ConnectedStreams}. * * * For type safety the user needs to define the feedback type * * @param feedbackTypeClass *Class of the elements in the feedback stream. -* @return A {@link ConnectedIterativeDataStream}. +* @return A {@link ConnectedIterativeDataStreams}. */ - public ConnectedIterativeDataStream withFeedbackType(Class feedbackTypeClass) { + public ConnectedIterativeDataStreams withFeedbackType(Class feedbackTypeClass) { return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass)); --- End diff -- Why ConnectedIterativeDataStreams and not ConnectedIterativeStreams (following the naming of the other classes)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2806] [scala-api] Add a TypeInformation...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1212#discussion_r41125339 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer + +class ScalaNothingTypeInfo extends TypeInformation[Nothing] { + + override def isBasicType: Boolean = false + override def isTupleType: Boolean = false + override def getArity: Int = 0 + override def getTotalFields: Int = 0 + override def getTypeClass: Class[Nothing] = classOf[Nothing] + override def isKeyType: Boolean = false + + override def createSerializer(config: ExecutionConfig): TypeSerializer[Nothing] = +(new NothingSerializer).asInstanceOf[TypeSerializer[Nothing]] + + override def hashCode(): Int = 42 --- End diff -- Could we use `classOf[ScalaNothingTypeInfo].hashCode` here? Otherwise, all objects we create end up in the bucket with hash code `42`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2806] [scala-api] Add a TypeInformation...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1212#issuecomment-145476115 LGTM. Only one minor comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2806) No TypeInfo for Scala's Nothing type
[ https://issues.apache.org/jira/browse/FLINK-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943126#comment-14943126 ] ASF GitHub Bot commented on FLINK-2806: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1212#discussion_r41125339 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer + +class ScalaNothingTypeInfo extends TypeInformation[Nothing] { + + override def isBasicType: Boolean = false + override def isTupleType: Boolean = false + override def getArity: Int = 0 + override def getTotalFields: Int = 0 + override def getTypeClass: Class[Nothing] = classOf[Nothing] + override def isKeyType: Boolean = false + + override def createSerializer(config: ExecutionConfig): TypeSerializer[Nothing] = +(new NothingSerializer).asInstanceOf[TypeSerializer[Nothing]] + + override def hashCode(): Int = 42 --- End diff -- Could we use `classOf[ScalaNothingTypeInfo].hashCode` here? Otherwise, all objects we create end up in the bucket with hash code `42`. > No TypeInfo for Scala's Nothing type > > > Key: FLINK-2806 > URL: https://issues.apache.org/jira/browse/FLINK-2806 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > When writing some generic code, I encountered a situation where I needed a > TypeInformation[Nothing]. Two problems prevent me from getting it: > 1. TypeInformationGen.mkTypeInfo doesn't return a real > TypeInformation[Nothing]. (It actually returns a casted null in that case.) > 2. The line > implicit def createTypeInformation[T]: TypeInformation[T] = macro > TypeUtils.createTypeInfo[T] > does not fire in some situations when it should, when T = Nothing. (I guess > this is a compiler bug.) > I will open a PR shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stream API Refactoring
Github user ktzoumas commented on a diff in the pull request: https://github.com/apache/flink/pull/1215#discussion_r41125467 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -87,7 +207,7 @@ public GroupedDataStream(DataStream dataStream, KeySelectorkeySelect TypeInformation outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(), Utils.getCallLocationName(), true); - return transform("Grouped Fold", outType, new StreamGroupedFold (clean(folder), + return transform("Grouped Fold", outType, new StreamGroupedFold<>(clean(folder), keySelector, initialValue)); --- End diff -- "Grouped Fold" or simply "Fold"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stream API Refactoring
Github user ktzoumas commented on a diff in the pull request: https://github.com/apache/flink/pull/1215#discussion_r41125422 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -24,49 +24,169 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; -import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.EventTime; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; /** - * A GroupedDataStream represents a {@link DataStream} which has been - * partitioned by the given {@link KeySelector}. Operators like {@link #reduce}, - * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to - * get additional functionality by the grouping. + * A {@code KeyedStream} represents a {@link DataStream} on which operator state is + * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a + * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of + * partitioning methods such as shuffle, forward and keyBy. * - * @param The type of the elements in the Grouped Stream. + * + * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements + * that have the same key. + * + * @param The type of the elements in the Keyed Stream. * @param The type of the key in the Keyed Stream. */ -public class GroupedDataStreamextends KeyedDataStream { +public class KeyedStream extends DataStream { + + protected final KeySelector keySelector; + + /** +* Creates a new {@link KeyedStream} using the given {@link KeySelector} +* to partition operator state by key. +* +* @param dataStream +*Base stream of data +* @param keySelector +*Function for determining state partitions +*/ + public KeyedStream(DataStream dataStream, KeySelector keySelector) { + super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector))); + this.keySelector = keySelector; + } + + + public KeySelector getKeySelector() { + return this.keySelector; + } + + + @Override + protected DataStream setConnectionType(StreamPartitioner partitioner) { + throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream."); + } + + + @Override + public SingleOutputStreamOperator transform(String operatorName, + TypeInformation outTypeInfo, OneInputStreamOperator operator) { + + SingleOutputStreamOperator returnStream = super.transform(operatorName, outTypeInfo,operator); + + ((OneInputTransformation ) returnStream.getTransformation()).setStateKeySelector(keySelector); + return returnStream; + } + + + + @Override + public DataStreamSink addSink(SinkFunction sinkFunction) { + DataStreamSink result =
[jira] [Commented] (FLINK-2806) No TypeInfo for Scala's Nothing type
[ https://issues.apache.org/jira/browse/FLINK-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943127#comment-14943127 ] ASF GitHub Bot commented on FLINK-2806: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1212#issuecomment-145476115 LGTM. Only one minor comment. > No TypeInfo for Scala's Nothing type > > > Key: FLINK-2806 > URL: https://issues.apache.org/jira/browse/FLINK-2806 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > When writing some generic code, I encountered a situation where I needed a > TypeInformation[Nothing]. Two problems prevent me from getting it: > 1. TypeInformationGen.mkTypeInfo doesn't return a real > TypeInformation[Nothing]. (It actually returns a casted null in that case.) > 2. The line > implicit def createTypeInformation[T]: TypeInformation[T] = macro > TypeUtils.createTypeInfo[T] > does not fire in some situations when it should, when T = Nothing. (I guess > this is a compiler bug.) > I will open a PR shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stream API Refactoring
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1215#discussion_r41125469 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java --- @@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream dataStream, long maxWaitTime) { /** * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a -* {@link ConnectedDataStream}. +* {@link ConnectedStreams}. * * * For type safety the user needs to define the feedback type * * @param feedbackTypeString *String describing the type information of the feedback stream. -* @return A {@link ConnectedIterativeDataStream}. +* @return A {@link ConnectedIterativeDataStreams}. */ - public ConnectedIterativeDataStreamwithFeedbackType(String feedbackTypeString) { + public ConnectedIterativeDataStreams withFeedbackType(String feedbackTypeString) { return withFeedbackType(TypeInfoParser. parse(feedbackTypeString)); } /** * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a -* {@link ConnectedDataStream}. +* {@link ConnectedStreams}. * * * For type safety the user needs to define the feedback type * * @param feedbackTypeClass *Class of the elements in the feedback stream. -* @return A {@link ConnectedIterativeDataStream}. +* @return A {@link ConnectedIterativeDataStreams}. */ - public ConnectedIterativeDataStream withFeedbackType(Class feedbackTypeClass) { + public ConnectedIterativeDataStreams withFeedbackType(Class feedbackTypeClass) { return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass)); --- End diff -- You're right, I'll also rename those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943132#comment-14943132 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41126140 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -871,7 +898,13 @@ public Object call() throws Exception { } }, executionContext); } else { - restart(); + future(new Callable() { --- End diff -- Good catch :-) > Recover running jobs on JobManager failure > -- > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 0.10 > > > tl;dr Persist JobGraphs in state backend and coordinate reference to state > handle via ZooKeeper. > Problem: When running multiple JobManagers in high availability mode, the > leading job manager looses all running jobs when it fails. After a new > leading job manager is elected, it is not possible to recover any previously > running jobs. > Solution: The leading job manager, which receives the job graph writes 1) the > job graph to a state backend, and 2) a reference to the respective state > handle to ZooKeeper. In general, job graphs can become large (multiple MBs, > because they include closures etc.). ZooKeeper is not designed for data of > this size. The level of indirection via the reference to the state backend > keeps the data in ZooKeeper small. > Proposed ZooKeeper layout: > /flink (default) > +- currentJobs >+- job id i > +- state handle reference of job graph i > The 'currentJobs' node needs to be persistent to allow recovery of jobs > between job managers. The currentJobs node needs to satisfy the following > invariant: There is a reference to a job graph with id i IFF the respective > job graph needs to be recovered by a newly elected job manager leader. > With this in place, jobs will be recovered from their initial state (as if > resubmitted). The next step is to backup the runtime state handles of > checkpoints in a similar manner. > --- > This work will be based on [~trohrm...@apache.org]'s implementation of > FLINK-2291. The leader election service notifies the job manager about > granted/revoked leadership. This notification happens via Akka and thus > serially *per* job manager, but results in eventually consistent state > between job managers. For some snapshots of time it is possible to have a new > leader granted leadership, before the old one has been revoked its leadership. > [~trohrm...@apache.org], can you confirm that leadership does not guarantee > mutually exclusive access to the shared 'currentJobs' state? > For example, the following can happen: > - JM 1 is leader, JM 2 is standby > - JOB i is running (and hence /flink/currentJobs/i exists) > - ZK notifies leader election service (LES) of JM 1 and JM 2 > - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 > notification revoking leadership takes longer > - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives > final JobStatusChange > - JM 2 resubmits the job /flink/currentJobs/i > - JM 1 removes /flink/currentJobs/i, because it is now finished > => inconsistent state (wrt the specified invariant above) > If it is indeed a problem, we can circumvent this with a Curator recipe for > [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to > coordinate the access to currentJobs. The lock needs to be acquired on > leadership. > --- > Minimum required tests: > - Unit tests for job graph serialization and writing to state backend and > ZooKeeper with expected nodes > - Unit tests for job submission to job manager in leader/non-leader state > - Unit tests for leadership granting/revoking and job submission/restarting > interleavings > - Process failure integration tests with single and multiple running jobs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stream API Refactoring
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1215#issuecomment-145482078 Impressive work, looks good! Merging this means that we need to commit to reworking the `join()` implementation very soon. If that is the case, +1 from my side --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41126140 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -871,7 +898,13 @@ public Object call() throws Exception { } }, executionContext); } else { - restart(); + future(new Callable() { --- End diff -- Good catch :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stream API Refactoring
Github user ktzoumas commented on the pull request: https://github.com/apache/flink/pull/1215#issuecomment-145482079 +1 to merge. This will make the testing of the new API much easier --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41126346 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.Collections; +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this + * recovery mode. + */ +public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs { + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + // Nothing to do + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + // Nothing to do + } + + @Override + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + throw new IllegalStateException("StandaloneSubmittedJobGraphs cannot recover job graphs. " + + "How did you end up here?"); + } + + @Override + public List recoverJobGraphs() throws Exception { + return Collections.emptyList(); --- End diff -- In `recoverJobGraph`, an exception is thrown, whereas here an empty list is returned. Maybe we should do it in a consistent manner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943135#comment-14943135 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41126346 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.Collections; +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this + * recovery mode. + */ +public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs { + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + // Nothing to do + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + // Nothing to do + } + + @Override + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + throw new IllegalStateException("StandaloneSubmittedJobGraphs cannot recover job graphs. " + + "How did you end up here?"); + } + + @Override + public List recoverJobGraphs() throws Exception { + return Collections.emptyList(); --- End diff -- In `recoverJobGraph`, an exception is thrown, whereas here an empty list is returned. Maybe we should do it in a consistent manner. > Recover running jobs on JobManager failure > -- > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 0.10 > > > tl;dr Persist JobGraphs in state backend and coordinate reference to state > handle via ZooKeeper. > Problem: When running multiple JobManagers in high availability mode, the > leading job manager looses all running jobs when it fails. After a new > leading job manager is elected, it is not possible to recover any previously > running jobs. > Solution: The leading job manager, which receives the job graph writes 1) the > job graph to a state backend, and 2) a reference to the respective state > handle to ZooKeeper. In general, job graphs can become large (multiple MBs, > because they include closures etc.). ZooKeeper is not designed for data of > this size. The level of indirection via the reference to the state backend > keeps the data in ZooKeeper small. > Proposed ZooKeeper layout: > /flink (default) > +- currentJobs >+- job id i > +- state handle reference of job graph i > The 'currentJobs' node needs to be persistent to allow recovery of jobs > between job managers. The currentJobs node needs to satisfy the following > invariant: There is a reference to a job graph with id i IFF the respective > job graph needs to be recovered by a newly elected job manager leader. > With this in place, jobs will be recovered from their initial state (as if > resubmitted). The next step is to backup the runtime state handles of > checkpoints in a similar manner. > --- > This work will be based on [~trohrm...@apache.org]'s
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943137#comment-14943137 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41126510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/job-id 1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/job-id N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); --- End diff -- `this.client = checkNotNull()`. > Recover running jobs on JobManager failure > -- > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Ufuk
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41126510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/job-id 1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/job-id N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); --- End diff -- `this.client = checkNotNull()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2809] [scala-api] Added UnitTypeInfo an...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41126581 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () + + def copy(from: Unit): Unit = () + + def copy(from: Unit, reuse: Unit): Unit = () + + def getLength: Int = 1 + + def serialize(record: Unit, target: DataOutputView) { +target.write(0) + } + + def deserialize(source: DataInputView): Unit = { +source.readByte --- End diff -- The `readByte()` and `write()` method should have parenthesis (both for semantics and to avoid confusion that the Unit parenthesis belong to the method call... I think we need an entry in the coding guidelines for Scala there (or better, a style check), that because it seems that dropping the parenthesis when Scala calls Java functions happens a bit too eagerly (I think most IDEs make not very good suggestions there as well). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943140#comment-14943140 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41126581 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () + + def copy(from: Unit): Unit = () + + def copy(from: Unit, reuse: Unit): Unit = () + + def getLength: Int = 1 + + def serialize(record: Unit, target: DataOutputView) { +target.write(0) + } + + def deserialize(source: DataInputView): Unit = { +source.readByte --- End diff -- The `readByte()` and `write()` method should have parenthesis (both for semantics and to avoid confusion that the Unit parenthesis belong to the method call... I think we need an entry in the coding guidelines for Scala there (or better, a style check), that because it seems that dropping the parenthesis when Scala calls Java functions happens a bit too eagerly (I think most IDEs make not very good suggestions there as well). > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at >
[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1214#issuecomment-145483163 Fair enough, we can do this but the error reporting must be pretty good. Otherwise it might confuse users because not everyone gets that eager execution spawns multiple jobs. The documentation should also explain why detached jobs cannot contain eager executions. I'm not sure whether we can still include this in 0.10 but please go ahead if you want to improve the pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode
[ https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943141#comment-14943141 ] ASF GitHub Bot commented on FLINK-2797: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1214#issuecomment-145483163 Fair enough, we can do this but the error reporting must be pretty good. Otherwise it might confuse users because not everyone gets that eager execution spawns multiple jobs. The documentation should also explain why detached jobs cannot contain eager executions. I'm not sure whether we can still include this in 0.10 but please go ahead if you want to improve the pull request. > CLI: Missing option to submit jobs in detached mode > --- > > Key: FLINK-2797 > URL: https://issues.apache.org/jira/browse/FLINK-2797 > Project: Flink > Issue Type: Bug > Components: Command-line client >Affects Versions: 0.9, 0.10 >Reporter: Maximilian Michels >Assignee: Sachin Goel > Fix For: 0.10 > > > Jobs can only be submitted in detached mode using YARN but not on a > standalone installation. This has been requested by users who want to submit > a job, get the job id, and later query its status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs
Robert Metzger created FLINK-2821: - Summary: Change Akka configuration to allow accessing actors from different URLs Key: FLINK-2821 URL: https://issues.apache.org/jira/browse/FLINK-2821 Project: Flink Issue Type: Bug Components: Distributed Runtime Reporter: Robert Metzger Akka expects the actor's URL to be exactly matching. As pointed out here, cases where users were complaining about this: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html - Proxy routing (as described here, send to the proxy URL, receiver recognizes only original URL) - Using hostname / IP interchangeably does not work (we solved this by always putting IP addresses into URLs, never hostnames) - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still no solution to that (but seems not too much of a restriction) I am aware that this is not possible due to Akka, so it is actually not a Flink bug. But I think we should track the resolution of the issue here anyways because its affecting our user's satisfaction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944324#comment-14944324 ] ASF GitHub Bot commented on FLINK-2066: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1223#issuecomment-145708133 Thanks a lot for your contribution to Flink! Sorry for not giving you feedback earlier. Many committers are currently busy preparing the next Flink release. Can you also update the documentation about the Execution Conf ? There is an entire section on the available parameters: https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#execution-configuration You can find the markdown files for the documentation in the `docs/` directory of the Flink source. I quickly looked over the code, and I think its in a good shape! > Make delay between execution retries configurable > - > > Key: FLINK-2066 > URL: https://issues.apache.org/jira/browse/FLINK-2066 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Nuno Miguel Marques dos Santos >Priority: Blocker > Labels: starter > Fix For: 0.10 > > > Flink allows to specify a delay between execution retries. This helps to let > some external failure causes fully manifest themselves before the restart is > attempted. > The delay is currently defined only system wide. > We should add it to the {{ExecutionConfig}} of a job to allow per-job > specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2066][core] Add delay between execution...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1223#issuecomment-145708133 Thanks a lot for your contribution to Flink! Sorry for not giving you feedback earlier. Many committers are currently busy preparing the next Flink release. Can you also update the documentation about the Execution Conf ? There is an entire section on the available parameters: https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#execution-configuration You can find the markdown files for the documentation in the `docs/` directory of the Flink source. I quickly looked over the code, and I think its in a good shape! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2767) Add support Scala 2.11 to Scala shell
[ https://issues.apache.org/jira/browse/FLINK-2767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944329#comment-14944329 ] ASF GitHub Bot commented on FLINK-2767: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1197#issuecomment-145708723 Thank you for the review @nikste! > Add support Scala 2.11 to Scala shell > - > > Key: FLINK-2767 > URL: https://issues.apache.org/jira/browse/FLINK-2767 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 0.10 >Reporter: Chiwan Park >Assignee: Chiwan Park > > Since FLINK-2200 is resolved, the Flink community provides JARs for Scala > 2.11. But currently, there is no Scala shell with Scala 2.11. If we add > support Scala 2.11 to Scala shell, the user with Scala 2.11 could use Flink > easily. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2767] [scala shell] Add Scala 2.11 supp...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1197#issuecomment-145708723 Thank you for the review @nikste! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2786) Remove Spargel from source code and update documentation in favor of Gelly
[ https://issues.apache.org/jira/browse/FLINK-2786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943876#comment-14943876 ] ASF GitHub Bot commented on FLINK-2786: --- GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1229 [FLINK-2786] Remove Spargel from source code and update docs. I also ported 2 Spargel tests that we hadn't copied over to Gelly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink flink-2786 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1229.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1229 commit 4391e8abf121f5d8ba6b8bdcd7d9a811c4d67806 Author: vasiaDate: 2015-10-05T18:08:55Z [FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; Remove Beta badge from Gelly > Remove Spargel from source code and update documentation in favor of Gelly > -- > > Key: FLINK-2786 > URL: https://issues.apache.org/jira/browse/FLINK-2786 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Spargel >Reporter: Henry Saputra >Assignee: Vasia Kalavri > > With Gelly getting more mature and ready to be top level project for Flink, > we need to remove deprecated Spargel library from source and documentation. > Gelly copies the library needed from Spargel so there should not be hard > dependency between the 2 modules. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2798] Serve static files for the new we...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1222#issuecomment-145733149 Thank you for the review. I'm trying to update the PR within the next 24 hours Sorry for not explaining why I changed all the request URLs to relative paths: When accessing the web interface through YARN, the files are not served from the root address, but using a proxy from the RM ` http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/`. By making all request URLs relative, we can make sure everything is working as expected. Let me know if there is a more elegant way of avoiding this. I made a similar change to the old web interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2798) Prepare new web dashboard for executing in on YARN
[ https://issues.apache.org/jira/browse/FLINK-2798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944453#comment-14944453 ] ASF GitHub Bot commented on FLINK-2798: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1222#issuecomment-145733149 Thank you for the review. I'm trying to update the PR within the next 24 hours Sorry for not explaining why I changed all the request URLs to relative paths: When accessing the web interface through YARN, the files are not served from the root address, but using a proxy from the RM ` http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/`. By making all request URLs relative, we can make sure everything is working as expected. Let me know if there is a more elegant way of avoiding this. I made a similar change to the old web interface. > Prepare new web dashboard for executing in on YARN > -- > > Key: FLINK-2798 > URL: https://issues.apache.org/jira/browse/FLINK-2798 > Project: Flink > Issue Type: Improvement > Components: Webfrontend, YARN Client >Reporter: Robert Metzger >Assignee: Robert Metzger > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2066][core] Add delay between execution...
Github user WangCHX commented on the pull request: https://github.com/apache/flink/pull/1223#issuecomment-145733727 Thank you. Sure. I will do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944455#comment-14944455 ] ASF GitHub Bot commented on FLINK-2066: --- Github user WangCHX commented on the pull request: https://github.com/apache/flink/pull/1223#issuecomment-145733727 Thank you. Sure. I will do it. > Make delay between execution retries configurable > - > > Key: FLINK-2066 > URL: https://issues.apache.org/jira/browse/FLINK-2066 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Nuno Miguel Marques dos Santos >Priority: Blocker > Labels: starter > Fix For: 0.10 > > > Flink allows to specify a delay between execution retries. This helps to let > some external failure causes fully manifest themselves before the restart is > attempted. > The delay is currently defined only system wide. > We should add it to the {{ExecutionConfig}} of a job to allow per-job > specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2066: -- Affects Version/s: 0.10 > Make delay between execution retries configurable > - > > Key: FLINK-2066 > URL: https://issues.apache.org/jira/browse/FLINK-2066 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Nuno Miguel Marques dos Santos >Priority: Blocker > Labels: starter > Fix For: 0.10 > > > Flink allows to specify a delay between execution retries. This helps to let > some external failure causes fully manifest themselves before the restart is > attempted. > The delay is currently defined only system wide. > We should add it to the {{ExecutionConfig}} of a job to allow per-job > specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2066: -- Fix Version/s: 0.10 > Make delay between execution retries configurable > - > > Key: FLINK-2066 > URL: https://issues.apache.org/jira/browse/FLINK-2066 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Nuno Miguel Marques dos Santos >Priority: Blocker > Labels: starter > Fix For: 0.10 > > > Flink allows to specify a delay between execution retries. This helps to let > some external failure causes fully manifest themselves before the restart is > attempted. > The delay is currently defined only system wide. > We should add it to the {{ExecutionConfig}} of a job to allow per-job > specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41127946 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -47,8 +55,10 @@ public static CuratorFramework startCuratorFramework(Configuration configuration) { String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, ""); - if(zkQuorum == null || zkQuorum.equals("")) { - throw new RuntimeException("No valid ZooKeeper quorum has been specified."); + if (zkQuorum == null || zkQuorum.equals("")) { + throw new RuntimeException("No valid ZooKeeper quorum has been specified. " + + "You can specify the quorum via the configuration key '" + + ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'."); --- End diff -- Good one :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-2802) Watermark triggered operators cannot progress with cyclic flows
[ https://issues.apache.org/jira/browse/FLINK-2802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-2802. --- Resolution: Fixed Fix Version/s: 0.10 As far as I understand this has been resolved. Resolving but feel free to reopen. > Watermark triggered operators cannot progress with cyclic flows > --- > > Key: FLINK-2802 > URL: https://issues.apache.org/jira/browse/FLINK-2802 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > Fix For: 0.10 > > > The problem is that we can easily create a cyclic watermark (time) dependency > in the stream graph which will result in a deadlock for watermark triggered > operators such as the `WindowOperator`. > A solution to this could be to emit a Long.MAX_VALUE watermark from the > iteration sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2504) ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed spuriously
[ https://issues.apache.org/jira/browse/FLINK-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2504: -- Affects Version/s: 0.10 > ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed > spuriously > - > > Key: FLINK-2504 > URL: https://issues.apache.org/jira/browse/FLINK-2504 > Project: Flink > Issue Type: Bug >Affects Versions: 0.10 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 0.10 > > > The test > {{ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed}} > failed in one of my Travis builds: > https://travis-ci.org/tillrohrmann/flink/jobs/74881883 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2670) Unstable CombineTaskTest
[ https://issues.apache.org/jira/browse/FLINK-2670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2670: -- Fix Version/s: 0.10 > Unstable CombineTaskTest > > > Key: FLINK-2670 > URL: https://issues.apache.org/jira/browse/FLINK-2670 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10 >Reporter: Matthias J. Sax >Assignee: Stephan Ewen >Priority: Critical > Labels: test-stability > Fix For: 0.10 > > > Fails with > {noformat} > == > Maven produced no output for 300 seconds. > == > {noformat} > https://travis-ci.org/apache/flink/jobs/80344487 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2504) ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed spuriously
[ https://issues.apache.org/jira/browse/FLINK-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2504: -- Fix Version/s: 0.10 > ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed > spuriously > - > > Key: FLINK-2504 > URL: https://issues.apache.org/jira/browse/FLINK-2504 > Project: Flink > Issue Type: Bug >Affects Versions: 0.10 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 0.10 > > > The test > {{ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed}} > failed in one of my Travis builds: > https://travis-ci.org/tillrohrmann/flink/jobs/74881883 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2670) Unstable CombineTaskTest
[ https://issues.apache.org/jira/browse/FLINK-2670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2670: -- Affects Version/s: 0.10 > Unstable CombineTaskTest > > > Key: FLINK-2670 > URL: https://issues.apache.org/jira/browse/FLINK-2670 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10 >Reporter: Matthias J. Sax >Assignee: Stephan Ewen >Priority: Critical > Labels: test-stability > Fix For: 0.10 > > > Fails with > {noformat} > == > Maven produced no output for 300 seconds. > == > {noformat} > https://travis-ci.org/apache/flink/jobs/80344487 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129860 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129801 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943207#comment-14943207 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129801 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper,
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943209#comment-14943209 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper,
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943205#comment-14943205 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper,
[jira] [Resolved] (FLINK-2367) “flink-xx-jobmanager-linux-3lsu.log" file can't auto be recovered/detected after mistaking delete
[ https://issues.apache.org/jira/browse/FLINK-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-2367. --- Resolution: Auto Closed Until further clarification I'm closing this issue. Feel free to reopen. > “flink-xx-jobmanager-linux-3lsu.log" file can't auto be recovered/detected > after mistaking delete > - > > Key: FLINK-2367 > URL: https://issues.apache.org/jira/browse/FLINK-2367 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 0.9 > Environment: Linux >Reporter: chenliang613 >Assignee: chenliang613 >Priority: Minor > Labels: reliability > Fix For: 0.9.0 > > > For checking system whether be adequately reliability, testers usually > designedly do some delete operation. > Steps: > 1.go to "flink\build-target\log" > 2.delete “flink-xx-jobmanager-linux-3lsu.log" file > 3.Run jobs along with writing log info, meanwhile the system didn't give any > error info when the log info can't be wrote correctly. > 4.when some jobs be run failed , go to check log file for finding the reason, > can't find the log file. > Must restart Job Manager to regenerate the log file, then continue to run > jobs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943208#comment-14943208 ] ASF GitHub Bot commented on FLINK-2354: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41129860 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper,
[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943218#comment-14943218 ] ASF GitHub Bot commented on FLINK-2354: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41130914 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41130914 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[jira] [Commented] (FLINK-2767) Add support Scala 2.11 to Scala shell
[ https://issues.apache.org/jira/browse/FLINK-2767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943297#comment-14943297 ] ASF GitHub Bot commented on FLINK-2767: --- Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1197#issuecomment-145510215 For me it says: ~~~bash Failed to created JLineReader: java.lang.NoClassDefFoundError: jline/console/completer/Completer Falling back to SimpleReader. ~~~ > Add support Scala 2.11 to Scala shell > - > > Key: FLINK-2767 > URL: https://issues.apache.org/jira/browse/FLINK-2767 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 0.10 >Reporter: Chiwan Park >Assignee: Chiwan Park > > Since FLINK-2200 is resolved, the Flink community provides JARs for Scala > 2.11. But currently, there is no Scala shell with Scala 2.11. If we add > support Scala 2.11 to Scala shell, the user with Scala 2.11 could use Flink > easily. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2767] [scala shell] Add Scala 2.11 supp...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1197#issuecomment-145510215 For me it says: ~~~bash Failed to created JLineReader: java.lang.NoClassDefFoundError: jline/console/completer/Completer Falling back to SimpleReader. ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1675) Rework Accumulators
[ https://issues.apache.org/jira/browse/FLINK-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943301#comment-14943301 ] Ufuk Celebi commented on FLINK-1675: Max, there was this PR which addressed (5): https://github.com/apache/flink/pull/570 > Rework Accumulators > --- > > Key: FLINK-1675 > URL: https://issues.apache.org/jira/browse/FLINK-1675 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Stephan Ewen > Fix For: 0.10 > > > The accumulators need an overhaul to address various issues: > 1. User defined Accumulator classes crash the client, because it is not > using the user code classloader to decode the received message. > 2. They should be attached to the ExecutionGraph, not the dedicated > AccumulatorManager. That makes them accessible also for archived execution > graphs. > 3. Accumulators should be sent periodically, as part of the heart beat that > sends metrics. This allows them to be updated in real time > 4. Accumulators should be stored fine grained (per executionvertex, or per > execution) and the final value should be on computed by merging all involved > ones. This allows users to access the per-subtask accumulators, which is > often interesting. > 5. Accumulators should subsume the aggregators by allowing to be "versioned" > with a superstep. The versioned ones should be redistributed to the cluster > after each superstep. -- This message was sent by Atlassian JIRA (v6.3.4#6332)