[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837834#comment-15837834 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789775 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -80,25 +82,17 @@ // return a factory for empty NFAs return new NFAFactoryImpl(inputTypeSerializer, 0, Collections.emptyList(), timeoutHandling); } else { + ArrayList > patterns = createPatternsList(pattern); --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. It is good to have the most generic type possible as argument or return type, as implementations may change. > Add support for quantifiers to CEP's pattern API > > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > It would be a good addition to extend the pattern API to support quantifiers > known from regular expressions (e.g. Kleene star, ?, +, or count bounds). > This would considerably enrich the set of supported patterns. > Implementing the count bounds could be done by unrolling the pattern state. > In order to support the Kleene star operator, the {{NFACompiler}} has to be > extended to insert epsilon-transition between a Kleene start state and the > succeeding pattern state. In order to support {{?}}, one could insert two > paths from the preceding state, one which accepts the event and another which > directly goes into the next pattern state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790271 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); + Pattern currentPattern = patterns.get(patternPos); + + State currentRepeatingState = null; + State nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); +
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837835#comment-15837835 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790228 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); + Pattern currentPattern = patterns.get(patternPos); + + State currentRepeatingState = null; + State nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>(
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837831#comment-15837831 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790026 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. > Add support for quantifiers to CEP's pattern API > > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > It would be a good
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790228 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); + Pattern currentPattern = patterns.get(patternPos); + + State currentRepeatingState = null; + State nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); +
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837828#comment-15837828 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790105 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); + Pattern currentPattern = patterns.get(patternPos); + + State currentRepeatingState = null; + State nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>(
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837830#comment-15837830 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789988 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. > Add support for quantifiers to CEP's pattern API > > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > It would be a good addition to extend the pattern API to support quantifiers > known from regular expressions (e.g. Kleene star, ?, +, or count bounds). > This would considerably enrich the set of supported patterns. > Implementing the count bounds could be done by unrolling the pattern state. > In order to support the Kleene star operator, the {{NFACompiler}} has to be > extended to insert epsilon-transition between a Kleene start state and the > succeeding pattern state. In order to support {{?}}, one could insert two > paths from the preceding state, one which accepts the event and another which > directly goes into the next pattern state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837832#comment-15837832 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790271 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); + Pattern currentPattern = patterns.get(patternPos); + + State currentRepeatingState = null; + State nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>(
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837833#comment-15837833 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789351 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java --- @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList(); + stateTransitions = new ArrayList<>(); + } + + public State(String name, StateType stateType, Collection stateTransitions) { --- End diff -- I agree with @chermenin and this class can be reverted to its previous state. In general, PRs should have the smallest diff possible in order to be easier to review. > Add support for quantifiers to CEP's pattern API > > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > It would be a good addition to extend the pattern API to support quantifiers > known from regular expressions (e.g. Kleene star, ?, +, or count bounds). > This would considerably enrich the set of supported patterns. > Implementing the count bounds could be done by unrolling the pattern state. > In order to support the Kleene star operator, the {{NFACompiler}} has to be > extended to insert epsilon-transition between a Kleene start state and the > succeeding pattern state. In order to support {{?}}, one could insert two > paths from the preceding state, one which accepts the event and another which > directly goes into the next pattern state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790385 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern optionalPattern = patterns.get(optionalPatternPos); + State optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction) optionalPattern.getFilterFunction())); } } /** +* Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: +* +* +-+ +---+ +---+ +* |State+->|State#1+->|State#2+ +* +--+--+ +---+ +--++ +*/ + private static void expandRepeatingPattern(State currentState, int patternPos, + ArrayList > patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); + Pattern currentPattern = patterns.get(patternPos); + + State currentRepeatingState = null; + State nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); +
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789988 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { + Pattern succeedingPattern = patterns.get(patternPos + 1); + State succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition( StateTransitionAction.TAKE, - currentState, - (FilterFunction) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static void addOptionalTransitions(State currentState, int patternPos, ArrayList > patterns, Map states) { --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789900 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State beginningState; + State beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static void addTransitions(State currentState, int patternPos, ArrayList> patterns, Map states) { --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789775 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -80,25 +82,17 @@ // return a factory for empty NFAs return new NFAFactoryImpl(inputTypeSerializer, 0, Collections.emptyList(), timeoutHandling); } else { + ArrayList > patterns = createPatternsList(pattern); --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. It is good to have the most generic type possible as argument or return type, as implementations may change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5599) State interface docs often refer to keyed state only
[ https://issues.apache.org/jira/browse/FLINK-5599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837815#comment-15837815 ] Aljoscha Krettek commented on FLINK-5599: - Definite +1, yes. > State interface docs often refer to keyed state only > > > Key: FLINK-5599 > URL: https://issues.apache.org/jira/browse/FLINK-5599 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Minor > > The JavaDocs of the {{State}} interface (and related classes) often mention > keyed state only as the state interface was only exposed for keyed state > until Flink 1.1. With the new {{CheckpointedFunction}} interface, this has > changed and the docs should be adjusted accordingly. > Would be nice to address this with 1.2.0 so that the JavaDocs are updated for > users. [~stefanrichte...@gmail.com] or [~aljoscha] maybe you can have a look > at this briefly? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5092) Add integration with Sonarqube and code coverage
[ https://issues.apache.org/jira/browse/FLINK-5092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837809#comment-15837809 ] ASF GitHub Bot commented on FLINK-5092: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r97788511 --- Diff: flink-contrib/flink-storm-examples/pom.xml --- @@ -364,6 +363,18 @@ under the License. + + shade-flink --- End diff -- With this we execute the shade-plugin also for other build-profiles, correct? (same for `flink-examples/flink-examples-streaming`) It would be cool if this would only be done when creating the coverage report, so that the normal build-time is unaffected. > Add integration with Sonarqube and code coverage > > > Key: FLINK-5092 > URL: https://issues.apache.org/jira/browse/FLINK-5092 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Boris Osipov >Assignee: Boris Osipov > > It would be good to have the opportunity to generate test coverage reports > for Flink and analyze code by SonarQube. > Parts of tasks: > - add generate test coverage reports for Flink with new maven profile > - implement integration with https://analysis.apache.org/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r97788511 --- Diff: flink-contrib/flink-storm-examples/pom.xml --- @@ -364,6 +363,18 @@ under the License. + + shade-flink --- End diff -- With this we execute the shade-plugin also for other build-profiles, correct? (same for `flink-examples/flink-examples-streaming`) It would be cool if this would only be done when creating the coverage report, so that the normal build-time is unaffected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators
[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837807#comment-15837807 ] ASF GitHub Bot commented on FLINK-5638: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3210 Agreed, the problem is a little bit that you don't control from where the `close` method is called... Thanks for the review @StephanEwen. Merging once Travis has passed. > Deadlock when closing two chained async I/O operators > - > > Key: FLINK-5638 > URL: https://issues.apache.org/jira/browse/FLINK-5638 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > The {{AsyncWaitOperator}} can deadlock in a special cases when closing two > chained {{AsyncWaitOperator}} while there is still one element between these > two operators in flight. > The deadlock scenario is the following: Given two chained > {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element > completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element > from the queue and output it to {{a2}}. This poll and output operation > happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the > {{e1}} thread will directly call {{a2's}} {{processElement}} function. In > this function, we try to add the new element to the {{StreamElementQueue}}. > Now assume that this queue is full. Then the operation will release the > checkpoint lock and wait until it is notified again. > In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we > have consumed all input. The close operation also happens under the > checkpoint lock. First the close method waits until all elements from the > {{StreamElementQueue}} have been processed (== empty). This happens by > waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on > {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. > Since the closing operation does not release the checkpoint lock, {{e1}} > cannot regain the synchronization lock and voila we have a deadlock. > There are two problems which cause the problem: > 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if > the queue is empty. This is usually the case if the output operation is > atomic. However in the chained case it can happen that the emitter thread has > to wait to insert the element into the queue of the next > {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint > lock and, thus, the output operation is no longer atomic. We can solve this > problem by polling the last queue element after we have outputted it instead > of before. > 2. We interrupt the emitter thread while holding the checkpoint lock and not > freeing it again. Under these circumstances, the interrupt signal is > meaningless because the emitter thread also needs control over the checkpoint > lock. We should solve the problem by waiting on the checkpoint lock and > periodically checking whether the thread has already stopped or not. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3210: [backport] [FLINK-5638] [asyncIO] Fix deadlock when closi...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3210 Agreed, the problem is a little bit that you don't control from where the `close` method is called... Thanks for the review @StephanEwen. Merging once Travis has passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837805#comment-15837805 ] ASF GitHub Bot commented on FLINK-5643: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3212 > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5643. Resolution: Fixed 1.3.0: cf9f4c77c8e785567f036cc2fbb97c0cd16979a7 1.2.0: f61f331ff04426e8b07117e2a568749fb036e1cc > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3212: [FLINK-5643] Fix NPE in StateUtil
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3212 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837800#comment-15837800 ] ASF GitHub Bot commented on FLINK-5643: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3212 Forwarding review from #3213. Merging this PR. > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3212: [FLINK-5643] Fix NPE in StateUtil
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3212 Forwarding review from #3213. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837798#comment-15837798 ] ASF GitHub Bot commented on FLINK-5643: --- Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3213 > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3213: [backport] [FLINK-5643] Fix NPE in StateUtil
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3213 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3213: [backport] [FLINK-5643] Fix NPE in StateUtil
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3213 Thanks for the review @rmetzger. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837795#comment-15837795 ] ASF GitHub Bot commented on FLINK-5643: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3213 Thanks for the review @rmetzger. Merging this PR. > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837776#comment-15837776 ] ASF GitHub Bot commented on FLINK-5546: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 @greghogan That make sense. Let JUnit to create root dir and delete temporary dir recursively. > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 @greghogan That make sense. Let JUnit to create root dir and delete temporary dir recursively. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3214: [FLINK-5644] Remove metric: Task#lastCheckpointSiz...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3214 [FLINK-5644] Remove metric: Task#lastCheckpointSize This PR removes the lastCheckpointSize metric that was broken when the key-groups were introduced. I couldn't find an easy way to fix it instead. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5644_metric_chsize Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5644) Task#lastCheckpointSize metric broken
[ https://issues.apache.org/jira/browse/FLINK-5644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837773#comment-15837773 ] ASF GitHub Bot commented on FLINK-5644: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3214 [FLINK-5644] Remove metric: Task#lastCheckpointSize This PR removes the lastCheckpointSize metric that was broken when the key-groups were introduced. I couldn't find an easy way to fix it instead. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5644_metric_chsize Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3214 > Task#lastCheckpointSize metric broken > - > > Key: FLINK-5644 > URL: https://issues.apache.org/jira/browse/FLINK-5644 > Project: Flink > Issue Type: Bug > Components: Metrics, Streaming >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0, 1.3.0 > > > The lastCheckpointSIze metric was broken when we introduced the key-groups. I > couldn't find an easy way to fix the metric, as such i propose to remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5644) Task#lastCheckpointSize metric broken
Chesnay Schepler created FLINK-5644: --- Summary: Task#lastCheckpointSize metric broken Key: FLINK-5644 URL: https://issues.apache.org/jira/browse/FLINK-5644 Project: Flink Issue Type: Bug Components: Metrics, Streaming Affects Versions: 1.2.0, 1.3.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.2.0, 1.3.0 The lastCheckpointSIze metric was broken when we introduced the key-groups. I couldn't find an easy way to fix the metric, as such i propose to remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5551) NPE at SourceStreamTask
[ https://issues.apache.org/jira/browse/FLINK-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837769#comment-15837769 ] Aljoscha Krettek commented on FLINK-5551: - I think this can happen when canceling happens before the Task/Operator is properly set up? The exception is a follow-up exceptions that happens because of the canceling. Do we need to address this, [~StephanEwen]? > NPE at SourceStreamTask > --- > > Key: FLINK-5551 > URL: https://issues.apache.org/jira/browse/FLINK-5551 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Andrey > > Prerequisites: > * Configure hdfs-based backend for the job > * cancel it using webapp admin > In logs during job cancel: > {code} > 2017-01-17 16:22:08,756 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Using > user-defined state backend: File State Backend @ > hdfs://host:port/flink-checkpoints > 2017-01-17 16:22:08,756 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Source: Custom Source -> Filter > (3/8) > 2017-01-17 16:22:08,756 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter (3/8) switched to CANCELING > 2017-01-17 16:22:08,756 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Source: Custom Source -> > Filter (3/8) (559a224c8ef78884db727dab0b2d5e99). > 2017-01-17 16:22:08,756 ERROR org.apache.flink.runtime.taskmanager.Task > - Error while canceling the task > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:61) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:349) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1214) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5637) Default Flink configuration contains whitespace characters, causing parser WARNings
[ https://issues.apache.org/jira/browse/FLINK-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-5637: - Assignee: Robert Metzger > Default Flink configuration contains whitespace characters, causing parser > WARNings > --- > > Key: FLINK-5637 > URL: https://issues.apache.org/jira/browse/FLINK-5637 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Labels: starter > > {code} > 2017-01-25 09:45:30,670 WARN > org.apache.flink.configuration.GlobalConfiguration- Error while > trying to split key and value in configuration file > /yarn/nm/usercache/robert/appcache/application_1485249546281_0018/container_1485249546281_0018_01_01/flink-conf.yaml: > > {code} > The whitespace is currently in line 67: > {code} > #== > > # The address under which the web-based runtime monitor listens. > {code} > I think we should add a test to the {{GlobalConfigurationTest}} that ensures > the configuration file we are shipping doesn't produce any WARNings by > default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837761#comment-15837761 ] ASF GitHub Bot commented on FLINK-5643: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3213 +1 > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5516) Hardcoded paths in flink-python/.../PythonPlanBinder.java
[ https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837760#comment-15837760 ] Chesnay Schepler commented on FLINK-5516: - Yeah this is easy to implement; it is pretty much a one-liner in the PythonPlanBinder. There is only one small thing to be wary of: The path we are talking about here is where we upload the python library, to then register it in the DistributedCache. The default for this is "hdfs:/tmp". However. if you execute in a local environment (i.e. the tests) then this is changed to "file:/flink". So...we could change the default to "file:..." and force the user to configure a path. Or keep the current behavior, but introduce a flag so that we don't override the user-specified location. > Hardcoded paths in flink-python/.../PythonPlanBinder.java > - > > Key: FLINK-5516 > URL: https://issues.apache.org/jira/browse/FLINK-5516 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Felix seibert > > The PythonPlanBinder.java contains three hardcoded filesystem paths: > {code:java} > public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > private static String FLINK_HDFS_PATH = "hdfs:/tmp"; > public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > {code} > _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by > modifying _java.io.tmpdir_. > For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying > the source. > Is it possible to make all three parameters configurable in the usual flink > configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3213: [FLINK-5643] Fix NPE in StateUtil
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3213 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem
[ https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-5492. --- Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 > BootstrapTools log wrong address of started ActorSystem > --- > > Key: FLINK-5492 > URL: https://issues.apache.org/jira/browse/FLINK-5492 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the > {{startActorSystem}} function logs the IP resolved from the provided hostname > as the {{ActorSystem}} address. However, then the function uses the > unresolved hostname to start the {{ActorSystem}}. Since Akka matches the > ActorSystem's address and the destination address of the incoming message we > should log the URL which is used to start the {{ActorSystem}} and not the > resolved IP (messages with the IP will usually be rejected). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem
[ https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837756#comment-15837756 ] Robert Metzger commented on FLINK-5492: --- Fixed for 1.2 in http://git-wip-us.apache.org/repos/asf/flink/commit/887c074a Fixed for 1.3 in http://git-wip-us.apache.org/repos/asf/flink/commit/cf6b3fb2 > BootstrapTools log wrong address of started ActorSystem > --- > > Key: FLINK-5492 > URL: https://issues.apache.org/jira/browse/FLINK-5492 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the > {{startActorSystem}} function logs the IP resolved from the provided hostname > as the {{ActorSystem}} address. However, then the function uses the > unresolved hostname to start the {{ActorSystem}}. Since Akka matches the > ActorSystem's address and the destination address of the incoming message we > should log the URL which is used to start the {{ActorSystem}} and not the > resolved IP (messages with the IP will usually be rejected). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3213: [FLINK-5643] Fix NPE in StateUtil
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3213 [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink stateFutureFixDiscardBackport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3213 commit 99452d926a2cb210dbbabdd3c8eee8fdcf74058e Author: Till RohrmannDate: 2017-01-25T13:39:51Z [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837752#comment-15837752 ] ASF GitHub Bot commented on FLINK-5643: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3213 [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink stateFutureFixDiscardBackport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3213 commit 99452d926a2cb210dbbabdd3c8eee8fdcf74058e Author: Till RohrmannDate: 2017-01-25T13:39:51Z [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails
[ https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837750#comment-15837750 ] Robert Metzger commented on FLINK-5229: --- Thank you! > Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator > fails > -- > > Key: FLINK-5229 > URL: https://issues.apache.org/jira/browse/FLINK-5229 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, TaskManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4, 1.3.0 > > > Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If > the first operators succeed in creating a checkpoint but a subsequent > operator in the chain fails, the {{StreamTask}} has to clean up the already > completed checkpoints. Otherwise we might end up with orphaned state data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
[ https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837751#comment-15837751 ] ASF GitHub Bot commented on FLINK-5643: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3212 [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink stateFutureFixDiscard Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3212 commit a3062af22b2be71f6e85a89bd554a87ad1ced2de Author: Till RohrmannDate: 2017-01-25T13:39:51Z [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. > StateUtil.discardStateFuture fails when state future contains null value > > > Key: FLINK-5643 > URL: https://issues.apache.org/jira/browse/FLINK-5643 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > When calling {{StateUtil.discardStateFuture}} with a state future which > contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3212: [FLINK-5643] Fix NPE in StateUtil
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3212 [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink stateFutureFixDiscard Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3212 commit a3062af22b2be71f6e85a89bd554a87ad1ced2de Author: Till RohrmannDate: 2017-01-25T13:39:51Z [FLINK-5643] Fix NPE in StateUtil Introduces a null check to deal with state futures which have a null value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails
[ https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837748#comment-15837748 ] Till Rohrmann commented on FLINK-5229: -- Will be addressed with FLINK-5643. > Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator > fails > -- > > Key: FLINK-5229 > URL: https://issues.apache.org/jira/browse/FLINK-5229 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, TaskManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4, 1.3.0 > > > Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If > the first operators succeed in creating a checkpoint but a subsequent > operator in the chain fails, the {{StreamTask}} has to clean up the already > completed checkpoints. Otherwise we might end up with orphaned state data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value
Till Rohrmann created FLINK-5643: Summary: StateUtil.discardStateFuture fails when state future contains null value Key: FLINK-5643 URL: https://issues.apache.org/jira/browse/FLINK-5643 Project: Flink Issue Type: Bug Affects Versions: 1.2.0, 1.3.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.2.0, 1.3.0 When calling {{StateUtil.discardStateFuture}} with a state future which contains a null value, then the operation fails with a NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5516) Hardcoded paths in flink-python/.../PythonPlanBinder.java
[ https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837744#comment-15837744 ] Aljoscha Krettek commented on FLINK-5516: - [~Zentol] would this be possible? I think no one is currently working on these parts but we would be very happy about contributions. :-) > Hardcoded paths in flink-python/.../PythonPlanBinder.java > - > > Key: FLINK-5516 > URL: https://issues.apache.org/jira/browse/FLINK-5516 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Felix seibert > > The PythonPlanBinder.java contains three hardcoded filesystem paths: > {code:java} > public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > private static String FLINK_HDFS_PATH = "hdfs:/tmp"; > public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > {code} > _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by > modifying _java.io.tmpdir_. > For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying > the source. > Is it possible to make all three parameters configurable in the usual flink > configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Efimov closed FLINK-4905. Resolution: Fixed Fix Version/s: 1.3.0 Fixed by Stephan Ewen > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Fix For: 1.3.0 > > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails
[ https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837740#comment-15837740 ] Till Rohrmann commented on FLINK-5229: -- Should be fixed. Apparently, there are state handle futures which contain {{null}} values. > Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator > fails > -- > > Key: FLINK-5229 > URL: https://issues.apache.org/jira/browse/FLINK-5229 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, TaskManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4, 1.3.0 > > > Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If > the first operators succeed in creating a checkpoint but a subsequent > operator in the chain fails, the {{StreamTask}} has to clean up the already > completed checkpoints. Otherwise we might end up with orphaned state data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5641) Directories of expired checkpoints are not cleaned up
[ https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5641: -- Attachment: application-1485249546281-0021-misbehaved-1.2.0-rc1 > Directories of expired checkpoints are not cleaned up > - > > Key: FLINK-5641 > URL: https://issues.apache.org/jira/browse/FLINK-5641 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Till Rohrmann > Attachments: application-1485249546281-0021-misbehaved-1.2.0-rc1 > > > While testing Flink 1.2.0 RC1, some checkpoint files were not removed. > {code} > hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d > lsr: DEPRECATED: Please use 'ls -R' instead. > drwxr-xr-x - robert hadoop 0 2017-01-25 12:35 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10 > -rw-r--r-- 3 robert hadoop 144375 2017-01-25 12:14 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:46 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12 > -rw-r--r-- 3 robert hadoop 146351 2017-01-25 12:30 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13 > -rw-r--r-- 3 robert hadoop 148431 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58 > drwxr-xr-x - robert hadoop 0 2017-01-25 11:45 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6 > -rw-r--r-- 3 robert hadoop 37151 2017-01-25 11:31 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb > drwxr-xr-x - robert hadoop 0 2017-01-25 11:55 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7 > -rw-r--r-- 3 robert hadoop 142269 2017-01-25 11:41 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5642) queryable state: race condition with HeadListState
Nico Kruber created FLINK-5642: -- Summary: queryable state: race condition with HeadListState Key: FLINK-5642 URL: https://issues.apache.org/jira/browse/FLINK-5642 Project: Flink Issue Type: Bug Components: Queryable State Affects Versions: 1.2.0 Reporter: Nico Kruber Assignee: Nico Kruber If queryable state accesses a HeapListState instance that is being modified during the value's serialisation, it may crash, e.g. with a NullPointerException during the serialisation or with an EOFException during de-serialisation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5641) Directories of expired checkpoints are not cleaned up
[ https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837738#comment-15837738 ] Robert Metzger commented on FLINK-5641: --- I have a job that has expiring checkpoints. I'll attach the job's full logs. > Directories of expired checkpoints are not cleaned up > - > > Key: FLINK-5641 > URL: https://issues.apache.org/jira/browse/FLINK-5641 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Till Rohrmann > > While testing Flink 1.2.0 RC1, some checkpoint files were not removed. > {code} > hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d > lsr: DEPRECATED: Please use 'ls -R' instead. > drwxr-xr-x - robert hadoop 0 2017-01-25 12:35 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10 > -rw-r--r-- 3 robert hadoop 144375 2017-01-25 12:14 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:46 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12 > -rw-r--r-- 3 robert hadoop 146351 2017-01-25 12:30 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13 > -rw-r--r-- 3 robert hadoop 148431 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58 > drwxr-xr-x - robert hadoop 0 2017-01-25 11:45 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6 > -rw-r--r-- 3 robert hadoop 37151 2017-01-25 11:31 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb > drwxr-xr-x - robert hadoop 0 2017-01-25 11:55 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7 > -rw-r--r-- 3 robert hadoop 142269 2017-01-25 11:41 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5641) Directories of expired checkpoints are not cleaned up
[ https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837736#comment-15837736 ] Till Rohrmann commented on FLINK-5641: -- [~rmetzger] how have you produced this? > Directories of expired checkpoints are not cleaned up > - > > Key: FLINK-5641 > URL: https://issues.apache.org/jira/browse/FLINK-5641 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Till Rohrmann > > While testing Flink 1.2.0 RC1, some checkpoint files were not removed. > {code} > hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d > lsr: DEPRECATED: Please use 'ls -R' instead. > drwxr-xr-x - robert hadoop 0 2017-01-25 12:35 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10 > -rw-r--r-- 3 robert hadoop 144375 2017-01-25 12:14 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:46 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12 > -rw-r--r-- 3 robert hadoop 146351 2017-01-25 12:30 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13 > -rw-r--r-- 3 robert hadoop 148431 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58 > drwxr-xr-x - robert hadoop 0 2017-01-25 11:45 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6 > -rw-r--r-- 3 robert hadoop 37151 2017-01-25 11:31 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb > drwxr-xr-x - robert hadoop 0 2017-01-25 11:55 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7 > -rw-r--r-- 3 robert hadoop 142269 2017-01-25 11:41 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails
[ https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837735#comment-15837735 ] Robert Metzger commented on FLINK-5229: --- I'm seeing the following exceptions in my 1.2.0 RC1 code {code} 2017-01-25 12:47:48,775 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - Could not properly clean up the async checkpoint runnable. java.lang.Exception: Could not properly cancel managed operator state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:992) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:979) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:224) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:363) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1384) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:81) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:97) ... 7 more {code} I'm asking here because its code that has been touched while addressing this JIRA. How critical is this? > Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator > fails > -- > > Key: FLINK-5229 > URL: https://issues.apache.org/jira/browse/FLINK-5229 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, TaskManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4, 1.3.0 > > > Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If > the first operators succeed in creating a checkpoint but a subsequent > operator in the chain fails, the {{StreamTask}} has to clean up the already > completed checkpoints. Otherwise we might end up with orphaned state data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97775252 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // + // AggregateFunction + // + + /** +* Applies the given fold function to each window. The window function is called for each +* evaluation of the window for each key individually. The output of the reduce function is +* interpreted as a regular non-windowed stream. +* +* @param function The fold function. --- End diff -- Typo: "fold function" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837730#comment-15837730 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97775106 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // + // AggregateFunction + // + + /** +* Applies the given fold function to each window. The window function is called for each +* evaluation of the window for each key individually. The output of the reduce function is --- End diff -- Typo: "reduce function" > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97774934 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // + // AggregateFunction + // + + /** +* Applies the given fold function to each window. The window function is called for each --- End diff -- Typo: "fold function" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837729#comment-15837729 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97776428 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java --- @@ -101,6 +108,24 @@ public void testReduceWithRichReducerFails() throws Exception { } /** +* .aggregate() does not support RichAggregateFunction, since the reduce function is used internally --- End diff -- Typo: "aggregate function". > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97776155 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java --- @@ -515,12 +515,16 @@ public WindowedStream(KeyedStreaminput, // /** -* Applies the given fold function to each window. The window function is called for each -* evaluation of the window for each key individually. The output of the reduce function is -* interpreted as a regular non-windowed stream. +* Applies the given aggregation function to each window. The aggregation function is called for +* each element, aggregating values incrementally and keeping the state to one accumulator +* per key and window. * * @param function The fold function. --- End diff -- Pre-existing typo: "fold function". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837731#comment-15837731 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97775252 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // + // AggregateFunction + // + + /** +* Applies the given fold function to each window. The window function is called for each +* evaluation of the window for each key individually. The output of the reduce function is +* interpreted as a regular non-windowed stream. +* +* @param function The fold function. --- End diff -- Typo: "fold function" > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97773952 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -18,14 +18,37 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; /** + * The {@code AggregateFunction} is a flexible aggregation function, characterized by the + * following features: + * + * + * The aggregates may use different types for input values, intermediate aggregates, + * and result type, to support a wide range of aggregation types. + * + * Support for distributive aggregations: Different intermediate aggregates can be + * merged together, to allow for pre-aggregation/final-aggregation optimizations. + * + * + * The {@code AggregateFunction}'s intermediate aggregate (in-progress aggregation state) + * in called the accumulator. Values are added to the accumulator, and final aggregates are --- End diff -- Typo: "is called" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837733#comment-15837733 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97776155 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java --- @@ -515,12 +515,16 @@ public WindowedStream(KeyedStreaminput, // /** -* Applies the given fold function to each window. The window function is called for each -* evaluation of the window for each key individually. The output of the reduce function is -* interpreted as a regular non-windowed stream. +* Applies the given aggregation function to each window. The aggregation function is called for +* each element, aggregating values incrementally and keeping the state to one accumulator +* per key and window. * * @param function The fold function. --- End diff -- Pre-existing typo: "fold function". > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97774041 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -81,14 +104,55 @@ * } * } * } + * + * @param The type of the values that are aggregated (input values) + * @param The type of the accumulator (intermediate aggregate state). + * @param The type of the aggregated result */ +@PublicEvolving public interface AggregateFunctionextends Function, Serializable { + /** +* Creates a new accumulator, starting a new aggregate. +* +* The new accumulator iy typically meaningless unless a value is added --- End diff -- Typo: "is typically" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97776428 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java --- @@ -101,6 +108,24 @@ public void testReduceWithRichReducerFails() throws Exception { } /** +* .aggregate() does not support RichAggregateFunction, since the reduce function is used internally --- End diff -- Typo: "aggregate function". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97775106 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // + // AggregateFunction + // + + /** +* Applies the given fold function to each window. The window function is called for each +* evaluation of the window for each key individually. The output of the reduce function is --- End diff -- Typo: "reduce function" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837732#comment-15837732 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97774041 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -81,14 +104,55 @@ * } * } * } + * + * @param The type of the values that are aggregated (input values) + * @param The type of the accumulator (intermediate aggregate state). + * @param The type of the aggregated result */ +@PublicEvolving public interface AggregateFunctionextends Function, Serializable { + /** +* Creates a new accumulator, starting a new aggregate. +* +* The new accumulator iy typically meaningless unless a value is added --- End diff -- Typo: "is typically" > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837728#comment-15837728 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97773952 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -18,14 +18,37 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; /** + * The {@code AggregateFunction} is a flexible aggregation function, characterized by the + * following features: + * + * + * The aggregates may use different types for input values, intermediate aggregates, + * and result type, to support a wide range of aggregation types. + * + * Support for distributive aggregations: Different intermediate aggregates can be + * merged together, to allow for pre-aggregation/final-aggregation optimizations. + * + * + * The {@code AggregateFunction}'s intermediate aggregate (in-progress aggregation state) + * in called the accumulator. Values are added to the accumulator, and final aggregates are --- End diff -- Typo: "is called" > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837734#comment-15837734 ] ASF GitHub Bot commented on FLINK-5630: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97774934 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // + // AggregateFunction + // + + /** +* Applies the given fold function to each window. The window function is called for each --- End diff -- Typo: "fold function" > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5641) Directories of expired checkpoints are not cleaned up
[ https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-5641: Assignee: Till Rohrmann > Directories of expired checkpoints are not cleaned up > - > > Key: FLINK-5641 > URL: https://issues.apache.org/jira/browse/FLINK-5641 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Till Rohrmann > > While testing Flink 1.2.0 RC1, some checkpoint files were not removed. > {code} > hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d > lsr: DEPRECATED: Please use 'ls -R' instead. > drwxr-xr-x - robert hadoop 0 2017-01-25 12:35 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10 > -rw-r--r-- 3 robert hadoop 144375 2017-01-25 12:14 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:46 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12 > -rw-r--r-- 3 robert hadoop 146351 2017-01-25 12:30 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07 > drwxr-xr-x - robert hadoop 0 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13 > -rw-r--r-- 3 robert hadoop 148431 2017-01-25 12:44 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58 > drwxr-xr-x - robert hadoop 0 2017-01-25 11:45 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6 > -rw-r--r-- 3 robert hadoop 37151 2017-01-25 11:31 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb > drwxr-xr-x - robert hadoop 0 2017-01-25 11:55 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7 > -rw-r--r-- 3 robert hadoop 142269 2017-01-25 11:41 > /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem
[ https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837726#comment-15837726 ] Till Rohrmann commented on FLINK-5492: -- Arg, you're right. The grizzled logger used in the Scala files does not support parameterized logging statements. Instead the idiomatic way is to use string interpolation. > BootstrapTools log wrong address of started ActorSystem > --- > > Key: FLINK-5492 > URL: https://issues.apache.org/jira/browse/FLINK-5492 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the > {{startActorSystem}} function logs the IP resolved from the provided hostname > as the {{ActorSystem}} address. However, then the function uses the > unresolved hostname to start the {{ActorSystem}}. Since Akka matches the > ActorSystem's address and the destination address of the incoming message we > should log the URL which is used to start the {{ActorSystem}} and not the > resolved IP (messages with the IP will usually be rejected). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4719) KryoSerializer random exception
[ https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-4719: --- Assignee: (was: Nico Kruber) > KryoSerializer random exception > --- > > Key: FLINK-4719 > URL: https://issues.apache.org/jira/browse/FLINK-4719 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.1 >Reporter: Flavio Pompermaier > Labels: kryo, serialization > > There's a random exception that involves somehow the KryoSerializer when > using POJOs in Flink jobs reading large volumes of data. > It is usually thrown in several places, e.g. (the Exceptions reported here > can refer to previous versions of Flink...): > {code} > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at > main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted > input: Thread 'SortMerger spilling thread' terminated due to an exception: > Unable to find class: java.ttil.HashSet > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger spilling thread' terminated due to an exception: Unable to > find class: java.ttil.HashSet > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > terminated due to an exception: Unable to find class: java.ttil.HashSet > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) > Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: > java.ttil.HashSet > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) > at > org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) > {code} > {code} > Caused by: java.io.IOException: Serializer consumed more bytes than the > record had. This indicates broken serialization. If you are using custom > serialization types (Value or Writable), check their serialization methods. > If you are using a Kryo-serialized type, check the corresponding Kryo > serializer. > at >
[GitHub] flink issue #3210: [FLINK-5638] [asyncIO] Fix deadlock when closing two chai...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3210 It's a bit hacky to release the lock in scope via `wait()` but seems to work. One more reason to go for a post-box model... +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators
[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837722#comment-15837722 ] ASF GitHub Bot commented on FLINK-5638: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3210 It's a bit hacky to release the lock in scope via `wait()` but seems to work. One more reason to go for a post-box model... +1 > Deadlock when closing two chained async I/O operators > - > > Key: FLINK-5638 > URL: https://issues.apache.org/jira/browse/FLINK-5638 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > The {{AsyncWaitOperator}} can deadlock in a special cases when closing two > chained {{AsyncWaitOperator}} while there is still one element between these > two operators in flight. > The deadlock scenario is the following: Given two chained > {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element > completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element > from the queue and output it to {{a2}}. This poll and output operation > happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the > {{e1}} thread will directly call {{a2's}} {{processElement}} function. In > this function, we try to add the new element to the {{StreamElementQueue}}. > Now assume that this queue is full. Then the operation will release the > checkpoint lock and wait until it is notified again. > In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we > have consumed all input. The close operation also happens under the > checkpoint lock. First the close method waits until all elements from the > {{StreamElementQueue}} have been processed (== empty). This happens by > waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on > {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. > Since the closing operation does not release the checkpoint lock, {{e1}} > cannot regain the synchronization lock and voila we have a deadlock. > There are two problems which cause the problem: > 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if > the queue is empty. This is usually the case if the output operation is > atomic. However in the chained case it can happen that the emitter thread has > to wait to insert the element into the queue of the next > {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint > lock and, thus, the output operation is no longer atomic. We can solve this > problem by polling the last queue element after we have outputted it instead > of before. > 2. We interrupt the emitter thread while holding the checkpoint lock and not > freeing it again. Under these circumstances, the interrupt signal is > meaningless because the emitter thread also needs control over the checkpoint > lock. We should solve the problem by waiting on the checkpoint lock and > periodically checking whether the thread has already stopped or not. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837721#comment-15837721 ] ASF GitHub Bot commented on FLINK-4905: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I pushed a fix for that to master in e7cda75b8594417559d6aac6229b5893f5459f0f > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem
[ https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837720#comment-15837720 ] Robert Metzger commented on FLINK-5492: --- I will provide a fix for the issue. > BootstrapTools log wrong address of started ActorSystem > --- > > Key: FLINK-5492 > URL: https://issues.apache.org/jira/browse/FLINK-5492 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the > {{startActorSystem}} function logs the IP resolved from the provided hostname > as the {{ActorSystem}} address. However, then the function uses the > unresolved hostname to start the {{ActorSystem}}. Since Akka matches the > ActorSystem's address and the destination address of the incoming message we > should log the URL which is used to start the {{ActorSystem}} and not the > resolved IP (messages with the IP will usually be rejected). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I pushed a fix for that to master in e7cda75b8594417559d6aac6229b5893f5459f0f --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5640) configure the explicit Unit Test file suffix
[ https://issues.apache.org/jira/browse/FLINK-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5640: - Description: There are four types of Unit Test file: *ITCase.java, *Test.java, *ITSuite.scala, *Suite.scala File name ending with "IT.java" is integration test. File name ending with "Test.java" is unit test. It's clear for Surefire plugin of default-test execution to declare that "*Test.*" is Java Unit Test. The test file statistics below: * Suite total: 10 * ITCase total: 378 * Test total: 1008 * ITSuite total: 14 was: There are four types of Unit Test file: *ITCase.java, *Test.java, *ITSuite.scala, *Suite.scala File name ending with "IT.java" is integration test. File name ending with "Test.java" is unit test. It's clear for Surefire plugin of default-test execution to declare that "*Test.*" is Java Unit Test. * Suite total: 10 * ITCase total: 378 * Test total: 1008 * ITSuite total: 14 > configure the explicit Unit Test file suffix > > > Key: FLINK-5640 > URL: https://issues.apache.org/jira/browse/FLINK-5640 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: shijinkui > > There are four types of Unit Test file: *ITCase.java, *Test.java, > *ITSuite.scala, *Suite.scala > File name ending with "IT.java" is integration test. File name ending with > "Test.java" is unit test. > It's clear for Surefire plugin of default-test execution to declare that > "*Test.*" is Java Unit Test. > The test file statistics below: > * Suite total: 10 > * ITCase total: 378 > * Test total: 1008 > * ITSuite total: 14 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5640) configure the explicit Unit Test file suffix
[ https://issues.apache.org/jira/browse/FLINK-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837719#comment-15837719 ] ASF GitHub Bot commented on FLINK-5640: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3211 c[FLINK-5640][test]onfigure the explicit Unit Test file suffix There are four types of Unit Test file: *ITCase.java, *Test.java, *ITSuite.scala, *Suite.scala File name ending with "IT.java" is integration test. File name ending with "Test.java" is unit test. It's clear for Surefire plugin of default-test execution to declare that "*Test.*" is Java Unit Test. The test file statistics: * Suite total: 10 * ITCase total: 378 * Test total: 1008 * ITSuite total: 14 You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink defined_ut_suffix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3211 commit aeea3f0b4cd9de5a3ea6a7eb37ce7b70d66d4942 Author: shijinkuiDate: 2017-01-25T13:18:13Z c[FLINK-5640][test]onfigure the explicit Unit Test file suffix > configure the explicit Unit Test file suffix > > > Key: FLINK-5640 > URL: https://issues.apache.org/jira/browse/FLINK-5640 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: shijinkui > > There are four types of Unit Test file: *ITCase.java, *Test.java, > *ITSuite.scala, *Suite.scala > File name ending with "IT.java" is integration test. File name ending with > "Test.java" is unit test. > It's clear for Surefire plugin of default-test execution to declare that > "*Test.*" is Java Unit Test. > * Suite total: 10 > * ITCase total: 378 > * Test total: 1008 > * ITSuite total: 14 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3211: c[FLINK-5640][test]onfigure the explicit Unit Test...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3211 c[FLINK-5640][test]onfigure the explicit Unit Test file suffix There are four types of Unit Test file: *ITCase.java, *Test.java, *ITSuite.scala, *Suite.scala File name ending with "IT.java" is integration test. File name ending with "Test.java" is unit test. It's clear for Surefire plugin of default-test execution to declare that "*Test.*" is Java Unit Test. The test file statistics: * Suite total: 10 * ITCase total: 378 * Test total: 1008 * ITSuite total: 14 You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink defined_ut_suffix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3211 commit aeea3f0b4cd9de5a3ea6a7eb37ce7b70d66d4942 Author: shijinkuiDate: 2017-01-25T13:18:13Z c[FLINK-5640][test]onfigure the explicit Unit Test file suffix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators
[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837716#comment-15837716 ] ASF GitHub Bot commented on FLINK-5638: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3209 [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlock Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3209 commit 30520d95786d630eb14ff613d0990ce03779dd3c Author: Till RohrmannDate: 2017-01-25T13:11:48Z [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. > Deadlock when closing two chained async I/O operators > - > > Key: FLINK-5638 > URL: https://issues.apache.org/jira/browse/FLINK-5638 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > The {{AsyncWaitOperator}} can deadlock in a special cases when closing two > chained {{AsyncWaitOperator}} while there is still one element between these > two operators in flight. > The deadlock scenario is the following: Given two chained > {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element > completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element > from the queue and output it to {{a2}}. This poll and output operation > happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the > {{e1}} thread will directly call {{a2's}} {{processElement}} function. In > this function, we try to add the new element to the {{StreamElementQueue}}. > Now assume that this queue is full. Then the operation will release the > checkpoint lock and wait until it is notified again. > In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we > have consumed all input. The close operation also happens under the > checkpoint lock. First the close method waits until all elements from the > {{StreamElementQueue}} have been processed (== empty). This happens by > waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on > {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. > Since the closing operation does not release the checkpoint lock, {{e1}} > cannot regain the synchronization lock and voila we have a deadlock. > There are two problems which cause the problem: > 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if > the queue is empty. This is usually the case if the output operation is > atomic. However in the chained case it can happen that the emitter thread has > to wait to insert the element into the queue of the next > {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint > lock and, thus, the output operation is no longer atomic. We can solve this > problem by polling the last queue element after we have outputted it instead > of before. > 2. We interrupt the emitter thread while holding the checkpoint lock and not > freeing it again. Under these circumstances, the interrupt signal is > meaningless because the emitter thread also needs control over the checkpoint > lock. We should solve the problem by waiting on the checkpoint lock and > periodically checking whether the thread has already stopped or not. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators
[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837717#comment-15837717 ] ASF GitHub Bot commented on FLINK-5638: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3210 [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR is a backport of #3209 onto `release-1.2.` This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlockBackport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3210 commit 770589dd286b1bc30d9bf79d813bc30a371e1995 Author: Till RohrmannDate: 2017-01-25T13:11:48Z [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. > Deadlock when closing two chained async I/O operators > - > > Key: FLINK-5638 > URL: https://issues.apache.org/jira/browse/FLINK-5638 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > The {{AsyncWaitOperator}} can deadlock in a special cases when closing two > chained {{AsyncWaitOperator}} while there is still one element between these > two operators in flight. > The deadlock scenario is the following: Given two chained > {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element > completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element > from the queue and output it to {{a2}}. This poll and output operation > happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the > {{e1}} thread will directly call {{a2's}} {{processElement}} function. In > this function, we try to add the new element to the {{StreamElementQueue}}. > Now assume that this queue is full. Then the operation will release the > checkpoint lock and wait until it is notified again. > In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we > have consumed all input. The close operation also happens under the > checkpoint lock. First the close method waits until all elements from the > {{StreamElementQueue}} have been processed (== empty). This happens by > waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on > {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. > Since the closing operation does not release the checkpoint lock, {{e1}} > cannot regain the synchronization lock and voila we have a deadlock. > There are two problems which cause the problem: > 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if > the queue is empty. This is usually the case if the output operation is > atomic. However in the chained case it can happen that the emitter thread has > to wait to insert the element into the queue of the next > {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint > lock and, thus, the output operation is no longer atomic. We can solve this > problem by polling the last queue element after we have outputted it instead > of before. > 2. We interrupt the emitter thread while holding the checkpoint lock and not > freeing it again. Under these circumstances, the interrupt signal is > meaningless because the emitter thread also needs control over the checkpoint > lock. We should solve the problem by waiting on the checkpoint lock and > periodically checking whether the thread has already stopped or not. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3210: [FLINK-5638] [asyncIO] Fix deadlock when closing t...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3210 [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR is a backport of #3209 onto `release-1.2.` This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlockBackport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3210 commit 770589dd286b1bc30d9bf79d813bc30a371e1995 Author: Till RohrmannDate: 2017-01-25T13:11:48Z [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3209: [FLINK-5638] [asyncIO] Fix deadlock when closing t...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3209 [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlock Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3209 commit 30520d95786d630eb14ff613d0990ce03779dd3c Author: Till RohrmannDate: 2017-01-25T13:11:48Z [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators This PR addresses the problem by changing the Emitter's behaviour to first output the element before removing it from the StreamElementQueue. That way the close method waits until also the Emitter has outputted the last completed element. Additionally, the stopResources method now frees the checkpoint lock in order to let the emitter thread react to the interrupt signal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837713#comment-15837713 ] ASF GitHub Bot commented on FLINK-4905: --- Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 I caught yours idea and together with the argument which I mentioned above - users will not extensively use 8 version of kafka connector, I agree with this proposal. Let me finish this jira, I can create fix based on yours proposal and try to implement test tonight. > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 I caught yours idea and together with the argument which I mentioned above - users will not extensively use 8 version of kafka connector, I agree with this proposal. Let me finish this jira, I can create fix based on yours proposal and try to implement test tonight. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5628) CheckpointStatsTracker implements Serializable but isn't
[ https://issues.apache.org/jira/browse/FLINK-5628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reassigned FLINK-5628: -- Assignee: Ufuk Celebi > CheckpointStatsTracker implements Serializable but isn't > > > Key: FLINK-5628 > URL: https://issues.apache.org/jira/browse/FLINK-5628 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Ufuk Celebi >Priority: Blocker > Fix For: 1.3.0 > > > The CheckpointStatsTracker implements the Serializable interface, even though > it no longer is serializable as it contains a List. > This was introduced in 579bc96446d598a2cfe8237b4ebd62d8c9df3483 which > reworked the checkpoint stats tracking, > This does not affect 1.2 or 1.3 in any way (since these objects aren't > serialized there), but it blocks the implementation of the HistoryServer. > The AccessExecution*/ArchivedExecution* classes, which are supposed to be a > serializable form of the ExecutionGraph, are thus broken as they also make > use of the CheckpointStatsTracker. > (Note: * = Graph/ExecutionJobVertex/ExecutionVertex/Execution) . > This wasn't catched in tests since no ExecutionJobVertices were given to the > CheckpointStatsTracker in ArchivedExecutionGraphTest#setExecutionGraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5608) Cancel button not always visible
[ https://issues.apache.org/jira/browse/FLINK-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837708#comment-15837708 ] ASF GitHub Bot commented on FLINK-5608: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3189 Good point. @rehevkor5 is it possible to only hide the color boxes when the browser window is very narrow? > Cancel button not always visible > > > Key: FLINK-5608 > URL: https://issues.apache.org/jira/browse/FLINK-5608 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.1.4 >Reporter: Shannon Carey >Assignee: Shannon Carey >Priority: Minor > > When the window is not wide enough, or when the job name is too long, the > "Cancel" button in the Job view of the web UI is not visible because it is > the first element that gets wrapped down and gets covered by the secondary > navbar (the tabs). This causes us to often need to resize the browser wider > than our monitor in order to use the cancel button. > In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the > content may wrap, especially if the content's horizontal width if not known & > fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any > unexpected change in height will result in overlap with the rest of the > normal-flow content in the page. The Bootstrap docs explain this in their > "Overflowing content" callout. > I am submitting a PR which does not attempt to resolve all issues with the > fixed navbar approach, but attempts to improve the situation by using less > horizontal space and by altering the layout approach of the Cancel button. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5641) Directories of expired checkpoints are not cleaned up
Robert Metzger created FLINK-5641: - Summary: Directories of expired checkpoints are not cleaned up Key: FLINK-5641 URL: https://issues.apache.org/jira/browse/FLINK-5641 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Robert Metzger While testing Flink 1.2.0 RC1, some checkpoint files were not removed. {code} hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d lsr: DEPRECATED: Please use 'ls -R' instead. drwxr-xr-x - robert hadoop 0 2017-01-25 12:35 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10 -rw-r--r-- 3 robert hadoop 144375 2017-01-25 12:14 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25 drwxr-xr-x - robert hadoop 0 2017-01-25 12:46 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12 -rw-r--r-- 3 robert hadoop 146351 2017-01-25 12:30 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07 drwxr-xr-x - robert hadoop 0 2017-01-25 12:44 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13 -rw-r--r-- 3 robert hadoop 148431 2017-01-25 12:44 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58 drwxr-xr-x - robert hadoop 0 2017-01-25 11:45 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6 -rw-r--r-- 3 robert hadoop 37151 2017-01-25 11:31 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb drwxr-xr-x - robert hadoop 0 2017-01-25 11:55 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7 -rw-r--r-- 3 robert hadoop 142269 2017-01-25 11:41 /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3189: [FLINK-5608] [webfrontend] Cancel button stays visible in...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3189 Good point. @rehevkor5 is it possible to only hide the color boxes when the browser window is very narrow? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5638) Deadlock when closing two chained async I/O operators
[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5638: - Fix Version/s: (was: 1.2.1) 1.2.0 > Deadlock when closing two chained async I/O operators > - > > Key: FLINK-5638 > URL: https://issues.apache.org/jira/browse/FLINK-5638 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > The {{AsyncWaitOperator}} can deadlock in a special cases when closing two > chained {{AsyncWaitOperator}} while there is still one element between these > two operators in flight. > The deadlock scenario is the following: Given two chained > {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element > completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element > from the queue and output it to {{a2}}. This poll and output operation > happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the > {{e1}} thread will directly call {{a2's}} {{processElement}} function. In > this function, we try to add the new element to the {{StreamElementQueue}}. > Now assume that this queue is full. Then the operation will release the > checkpoint lock and wait until it is notified again. > In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we > have consumed all input. The close operation also happens under the > checkpoint lock. First the close method waits until all elements from the > {{StreamElementQueue}} have been processed (== empty). This happens by > waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on > {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. > Since the closing operation does not release the checkpoint lock, {{e1}} > cannot regain the synchronization lock and voila we have a deadlock. > There are two problems which cause the problem: > 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if > the queue is empty. This is usually the case if the output operation is > atomic. However in the chained case it can happen that the emitter thread has > to wait to insert the element into the queue of the next > {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint > lock and, thus, the output operation is no longer atomic. We can solve this > problem by polling the last queue element after we have outputted it instead > of before. > 2. We interrupt the emitter thread while holding the checkpoint lock and not > freeing it again. Under these circumstances, the interrupt signal is > meaningless because the emitter thread also needs control over the checkpoint > lock. We should solve the problem by waiting on the checkpoint lock and > periodically checking whether the thread has already stopped or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5638) Deadlock when closing two chained async I/O operators
[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5638: - Description: The {{AsyncWaitOperator}} can deadlock in a special cases when closing two chained {{AsyncWaitOperator}} while there is still one element between these two operators in flight. The deadlock scenario is the following: Given two chained {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element from the queue and output it to {{a2}}. This poll and output operation happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} thread will directly call {{a2's}} {{processElement}} function. In this function, we try to add the new element to the {{StreamElementQueue}}. Now assume that this queue is full. Then the operation will release the checkpoint lock and wait until it is notified again. In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we have consumed all input. The close operation also happens under the checkpoint lock. First the close method waits until all elements from the {{StreamElementQueue}} have been processed (== empty). This happens by waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the closing operation does not release the checkpoint lock, {{e1}} cannot regain the synchronization lock and voila we have a deadlock. There are two problems which cause the problem: 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if the queue is empty. This is usually the case if the output operation is atomic. However in the chained case it can happen that the emitter thread has to wait to insert the element into the queue of the next {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint lock and, thus, the output operation is no longer atomic. We can solve this problem by polling the last queue element after we have outputted it instead of before. 2. We interrupt the emitter thread while holding the checkpoint lock and not freeing it again. Under these circumstances, the interrupt signal is meaningless because the emitter thread also needs control over the checkpoint lock. We should solve the problem by waiting on the checkpoint lock and periodically checking whether the thread has already stopped or not. https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt was: The {{AsyncWaitOperator}} can deadlock in a special cases when closing two chained {{AsyncWaitOperator}} while there is still one element between these two operators in flight. The deadlock scenario is the following: Given two chained {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element from the queue and output it to {{a2}}. This poll and output operation happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} thread will directly call {{a2's}} {{processElement}} function. In this function, we try to add the new element to the {{StreamElementQueue}}. Now assume that this queue is full. Then the operation will release the checkpoint lock and wait until it is notified again. In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we have consumed all input. The close operation also happens under the checkpoint lock. First the close method waits until all elements from the {{StreamElementQueue}} have been processed (== empty). This happens by waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the closing operation does not release the checkpoint lock, {{e1}} cannot regain the synchronization lock and voila we have a deadlock. There are two problems which cause the problem: 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if the queue is empty. This is usually the case if the output operation is atomic. However in the chained case it can happen that the emitter thread has to wait to insert the element into the queue of the next {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint lock and, thus, the output operation is no longer atomic. We can solve this problem by polling the last queue element after we have outputted it instead of before. 2. We interrupt the emitter thread while holding the checkpoint lock and not freeing it again. Under these circumstances, the interrupt signal is meaningless because the emitter thread also needs control over the checkpoint lock. We should solve the problem by waiting on the checkpoint lock and periodically checking whether the thread has already
[GitHub] flink pull request #3208: [hotfix] [rat] Add exclusion for rolling-sink snap...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3208 [hotfix] [rat] Add exclusion for rolling-sink snapshot Adds a RAT exclusion for the rolling sink snapshot used for testing backwards compatibility. Note that the RAT plugin only complains about these files on Windows. Which is weird. Anyway, this should go in master and 1.2. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink hotfix_rat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3208.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3208 commit e397f2d3a3ec41662235e05ffa90f8f5b8e233cb Author: zentolDate: 2017-01-25T12:50:17Z [hotfix] [rat] Add exclusion for rolling-sink snapshot --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837691#comment-15837691 ] ASF GitHub Bot commented on FLINK-5634: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3204 Looks good to me, +1 @uce the ScriptMeister should probably give his blessing as well... > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier > Fix For: 1.2.0 > > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3204: [FLINK-5634] Flink should not always redirect stdout to a...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3204 Looks good to me, +1 @uce the ScriptMeister should probably give his blessing as well... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837684#comment-15837684 ] ASF GitHub Bot commented on FLINK-4905: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I this case, the exception would not be logged, true. It is a very rare corner case that should not affect correctness, and not really distinguishable from the case where an exception is thrown after the source went into finishing mode. My feeling is to not log here. - The advantage is that we don't pollute the log with meaningless exception in the common case (many users would be led onto a false track). - Given that the corner case you described does not affect any expected behavior (the committing action might as well not have happened at all it it were started a few msecs later) it is okay to "swallow" the exception. - If it does in fact affect the offset committing in more cases, it will surely also occur at another committing attempt during execution. > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I this case, the exception would not be logged, true. It is a very rare corner case that should not affect correctness, and not really distinguishable from the case where an exception is thrown after the source went into finishing mode. My feeling is to not log here. - The advantage is that we don't pollute the log with meaningless exception in the common case (many users would be led onto a false track). - Given that the corner case you described does not affect any expected behavior (the committing action might as well not have happened at all it it were started a few msecs later) it is okay to "swallow" the exception. - If it does in fact affect the offset committing in more cases, it will surely also occur at another committing attempt during execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5640) configure the explicit Unit Test file suffix
shijinkui created FLINK-5640: Summary: configure the explicit Unit Test file suffix Key: FLINK-5640 URL: https://issues.apache.org/jira/browse/FLINK-5640 Project: Flink Issue Type: Test Components: Tests Reporter: shijinkui Assignee: shijinkui There are four types of Unit Test file: *ITCase.java, *Test.java, *ITSuite.scala, *Suite.scala File name ending with "IT.java" is integration test. File name ending with "Test.java" is unit test. It's clear for Surefire plugin of default-test execution to declare that "*Test.*" is Java Unit Test. * Suite total: 10 * ITCase total: 378 * Test total: 1008 * ITSuite total: 14 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837665#comment-15837665 ] ASF GitHub Bot commented on FLINK-4905: --- Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 Distributed systems and multithreading environments make us think in term of logical clock, like Lamport clock, step by step: Thread1 - fetcher is running `running = true` Thread2 performs `zkHandler.prepareAndCommitOffsets(offsets)` Thread2 `running = true` and `zkHandler.prepareAndCommitOffsets(offsets)` throws an exception Thread1 stop fetcher and change flag `running = false` in normal way without any exception Thread2 read the flag `running = false` and `return` although, there is a reason of the commit failure that is different from "Client was closed and running= false" Thread1 fetcher is stopped successfully There is no exception or any information in log regarding the exception from `zkHandler.prepareAndCommitOffsets(offsets)` Again this is a rare case when we can lose a root cause of strange behavior - offset will not be committed although there are no any exceptions. Am I wrong? > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 Distributed systems and multithreading environments make us think in term of logical clock, like Lamport clock, step by step: Thread1 - fetcher is running `running = true` Thread2 performs `zkHandler.prepareAndCommitOffsets(offsets)` Thread2 `running = true` and `zkHandler.prepareAndCommitOffsets(offsets)` throws an exception Thread1 stop fetcher and change flag `running = false` in normal way without any exception Thread2 read the flag `running = false` and `return` although, there is a reason of the commit failure that is different from "Client was closed and running= false" Thread1 fetcher is stopped successfully There is no exception or any information in log regarding the exception from `zkHandler.prepareAndCommitOffsets(offsets)` Again this is a rare case when we can lose a root cause of strange behavior - offset will not be committed although there are no any exceptions. Am I wrong? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3206: [FLINK-5612] Fix GlobPathFilter not-serializable e...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3206#discussion_r97768800 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java --- @@ -138,4 +141,30 @@ public void doubleStarPattern() { assertFalse(matcher.filterPath(new Path("a/b"))); assertFalse(matcher.filterPath(new Path("a/b/c"))); } + + @Test(expected = NullPointerException.class) + public void testIncluePatternIsNull() { + new GlobFilePathFilter( + null, + Collections.emptyList()); + } + + @Test(expected = NullPointerException.class) + public void testExcludePatternIsNull() { + new GlobFilePathFilter( + Collections.singletonList("**"), + null); + } + + @Test + public void testGlobFilterSerializable() throws IOException { --- End diff -- How about using `CommonTestUtils.createCopySerializable()` here? That you can validate also that the serialized copy behaves valid. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---