[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837834#comment-15837834
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97789775
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -80,25 +82,17 @@
// return a factory for empty NFAs
return new NFAFactoryImpl(inputTypeSerializer, 0, 
Collections.emptyList(), timeoutHandling);
} else {
+   ArrayList> patterns = 
createPatternsList(pattern);
--- End diff --

The `patterns` can become a `List` instead of `ArrayList`. It is good to 
have the most generic type possible as argument or return type, as 
implementations may change.


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790271
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
+   Pattern currentPattern = patterns.get(patternPos);
+
+   State currentRepeatingState = null;
+   State nextRepeatingState = currentState;
+   for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+   currentRepeatingState = nextRepeatingState;
+   nextRepeatingState = new State<>(
+   currentState.getName() + "#" + i,
+   State.StateType.Normal);
+   states.put(nextRepeatingState.getName(), 
nextRepeatingState);
+   

[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837835#comment-15837835
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790228
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
+   Pattern currentPattern = patterns.get(patternPos);
+
+   State currentRepeatingState = null;
+   State nextRepeatingState = currentState;
+   for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+   currentRepeatingState = nextRepeatingState;
+   nextRepeatingState = new State<>(
   

[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837831#comment-15837831
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790026
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
--- End diff --

The `patterns` can become a `List` instead of `ArrayList`.


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good 

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790228
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
+   Pattern currentPattern = patterns.get(patternPos);
+
+   State currentRepeatingState = null;
+   State nextRepeatingState = currentState;
+   for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+   currentRepeatingState = nextRepeatingState;
+   nextRepeatingState = new State<>(
+   currentState.getName() + "#" + i,
+   State.StateType.Normal);
+   states.put(nextRepeatingState.getName(), 
nextRepeatingState);
+   

[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837828#comment-15837828
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790105
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
+   Pattern currentPattern = patterns.get(patternPos);
+
+   State currentRepeatingState = null;
+   State nextRepeatingState = currentState;
+   for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+   currentRepeatingState = nextRepeatingState;
+   nextRepeatingState = new State<>(
   

[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837830#comment-15837830
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97789988
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
--- End diff --

The `patterns` can become a `List` instead of `ArrayList`.


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837832#comment-15837832
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790271
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
+   Pattern currentPattern = patterns.get(patternPos);
+
+   State currentRepeatingState = null;
+   State nextRepeatingState = currentState;
+   for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+   currentRepeatingState = nextRepeatingState;
+   nextRepeatingState = new State<>(
   

[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837833#comment-15837833
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97789351
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---
@@ -43,7 +43,14 @@ public State(final String name, final StateType 
stateType) {
this.name = name;
this.stateType = stateType;
 
-   stateTransitions = new ArrayList();
+   stateTransitions = new ArrayList<>();
+   }
+
+   public State(String name, StateType stateType, 
Collection stateTransitions) {
--- End diff --

I agree with @chermenin and this class can be reverted to its previous 
state. In general, PRs should have the smallest diff possible in order to be 
easier to review.


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97790385
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
+   int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+   for (int optionalPatternPos = patternPos + 2;
+   optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
+   optionalPatternPos++) {
+
+   Pattern optionalPattern = 
patterns.get(optionalPatternPos);
+   State optionalState = 
states.get(optionalPattern.getName());
+   currentState.addStateTransition(new StateTransition<>(
+   StateTransitionAction.TAKE,
+   optionalState,
+   (FilterFunction) 
optionalPattern.getFilterFunction()));
}
}
 
/**
+* Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
+*
+* +-+  +---+  +---+
+* |State+->|State#1+->|State#2+
+* +--+--+  +---+  +--++
+*/
+   private static  void expandRepeatingPattern(State currentState, 
int patternPos,
+   
ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
+   Pattern currentPattern = patterns.get(patternPos);
+
+   State currentRepeatingState = null;
+   State nextRepeatingState = currentState;
+   for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+   currentRepeatingState = nextRepeatingState;
+   nextRepeatingState = new State<>(
+   currentState.getName() + "#" + i,
+   State.StateType.Normal);
+   states.put(nextRepeatingState.getName(), 
nextRepeatingState);
+   

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97789988
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
+   Pattern succeedingPattern = patterns.get(patternPos + 1);
+   State succeedingState = 
states.get(succeedingPattern.getName());
 
-   beginningState.addStateTransition(new 
StateTransition(
+   if (shouldRepeatPattern(patternPos, patterns)) {
+   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
+   } else {
+   currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
-   currentState,
-   (FilterFunction) 
currentPattern.getFilterFunction()
+   succeedingState,
+   (FilterFunction) 
succeedingPattern.getFilterFunction()
));
 
-   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   if (shouldAddSelfTransition(succeedingPattern))  {
+   addTransitionToSelf(succeedingPattern, 
succeedingState);
+   }
+   if (isPatternOptional(succeedingPattern)) {
+   addOptionalTransitions(currentState, 
patternPos, patterns, states);
+   }
+   }
+   }
+
+   private static  void addOptionalTransitions(State currentState, 
int patternPos, ArrayList> patterns, Map 
states) {
--- End diff --

The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97789900
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -130,26 +114,166 @@
}
 
// add the beginning state
-   final State beginningState;
+   State beginningState = 
states.get(BEGINNING_STATE_NAME);;
+   addTransitions(beginningState, -1, patterns, states);
+   return new NFAFactoryImpl(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+   }
+   }
 
-   if (states.containsKey(BEGINNING_STATE_NAME)) {
-   beginningState = 
states.get(BEGINNING_STATE_NAME);
-   } else {
-   beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-   states.put(BEGINNING_STATE_NAME, 
beginningState);
-   }
+   private static  void addTransitions(State currentState, int 
patternPos, ArrayList> patterns, Map states) {
--- End diff --

The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97789775
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -80,25 +82,17 @@
// return a factory for empty NFAs
return new NFAFactoryImpl(inputTypeSerializer, 0, 
Collections.emptyList(), timeoutHandling);
} else {
+   ArrayList> patterns = 
createPatternsList(pattern);
--- End diff --

The `patterns` can become a `List` instead of `ArrayList`. It is good to 
have the most generic type possible as argument or return type, as 
implementations may change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5599) State interface docs often refer to keyed state only

2017-01-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837815#comment-15837815
 ] 

Aljoscha Krettek commented on FLINK-5599:
-

Definite +1, yes.

> State interface docs often refer to keyed state only
> 
>
> Key: FLINK-5599
> URL: https://issues.apache.org/jira/browse/FLINK-5599
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Minor
>
> The JavaDocs of the {{State}} interface (and related classes) often mention 
> keyed state only as the state interface was only exposed for keyed state 
> until Flink 1.1. With the new {{CheckpointedFunction}} interface, this has 
> changed and the docs should be adjusted accordingly.
> Would be nice to address this with 1.2.0 so that the JavaDocs are updated for 
> users. [~stefanrichte...@gmail.com] or [~aljoscha] maybe you can have a look 
> at this briefly?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5092) Add integration with Sonarqube and code coverage

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837809#comment-15837809
 ] 

ASF GitHub Bot commented on FLINK-5092:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2836#discussion_r97788511
  
--- Diff: flink-contrib/flink-storm-examples/pom.xml ---
@@ -364,6 +363,18 @@ under the License.



+   
+   shade-flink
--- End diff --

With this we execute the shade-plugin also for other build-profiles, 
correct? (same for `flink-examples/flink-examples-streaming`) It would be cool 
if this would only be done when creating the coverage report, so that the 
normal build-time is unaffected.


> Add integration with Sonarqube and code coverage
> 
>
> Key: FLINK-5092
> URL: https://issues.apache.org/jira/browse/FLINK-5092
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Boris Osipov
>Assignee: Boris Osipov
>
> It would be good to have the opportunity to generate test coverage reports 
> for Flink and analyze code by SonarQube.
> Parts of tasks:
> - add generate test coverage reports for Flink with new maven profile
> - implement integration with https://analysis.apache.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...

2017-01-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2836#discussion_r97788511
  
--- Diff: flink-contrib/flink-storm-examples/pom.xml ---
@@ -364,6 +363,18 @@ under the License.



+   
+   shade-flink
--- End diff --

With this we execute the shade-plugin also for other build-profiles, 
correct? (same for `flink-examples/flink-examples-streaming`) It would be cool 
if this would only be done when creating the coverage report, so that the 
normal build-time is unaffected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837807#comment-15837807
 ] 

ASF GitHub Bot commented on FLINK-5638:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3210
  
Agreed, the problem is a little bit that you don't control from where the 
`close` method is called... Thanks for the review @StephanEwen.

Merging once Travis has passed.


> Deadlock when closing two chained async I/O operators
> -
>
> Key: FLINK-5638
> URL: https://issues.apache.org/jira/browse/FLINK-5638
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
> chained {{AsyncWaitOperator}} while there is still one element between these 
> two operators in flight.
> The deadlock scenario is the following: Given two chained 
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
> from the queue and output it to {{a2}}. This poll and output operation 
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the 
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In 
> this function, we try to add the new element to the {{StreamElementQueue}}. 
> Now assume that this queue is full. Then the operation will release the 
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
> have consumed all input. The close operation also happens under the 
> checkpoint lock. First the close method waits until all elements from the 
> {{StreamElementQueue}} have been processed (== empty). This happens by 
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on 
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. 
> Since the closing operation does not release the checkpoint lock, {{e1}} 
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
> the queue is empty. This is usually the case if the output operation is 
> atomic. However in the chained case it can happen that the emitter thread has 
> to wait to insert the element into the queue of the next 
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint 
> lock and, thus, the output operation is no longer atomic. We can solve this 
> problem by polling the last queue element after we have outputted it instead 
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not 
> freeing it again. Under these circumstances, the interrupt signal is 
> meaningless because the emitter thread also needs control over the checkpoint 
> lock. We should solve the problem by waiting on the checkpoint lock and 
> periodically checking whether the thread has already stopped or not.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3210: [backport] [FLINK-5638] [asyncIO] Fix deadlock when closi...

2017-01-25 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3210
  
Agreed, the problem is a little bit that you don't control from where the 
`close` method is called... Thanks for the review @StephanEwen.

Merging once Travis has passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837805#comment-15837805
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3212


> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5643.

Resolution: Fixed

1.3.0: cf9f4c77c8e785567f036cc2fbb97c0cd16979a7
1.2.0: f61f331ff04426e8b07117e2a568749fb036e1cc

> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3212: [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3212


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837800#comment-15837800
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3212
  
Forwarding review from #3213. Merging this PR.


> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3212: [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3212
  
Forwarding review from #3213. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837798#comment-15837798
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/3213


> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3213: [backport] [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/3213


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3213: [backport] [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3213
  
Thanks for the review @rmetzger. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837795#comment-15837795
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3213
  
Thanks for the review @rmetzger. Merging this PR.


> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837776#comment-15837776
 ] 

ASF GitHub Bot commented on FLINK-5546:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
@greghogan That make sense. Let JUnit to create root dir and delete 
temporary dir recursively.


> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-01-25 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
@greghogan That make sense. Let JUnit to create root dir and delete 
temporary dir recursively.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3214: [FLINK-5644] Remove metric: Task#lastCheckpointSiz...

2017-01-25 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3214

[FLINK-5644] Remove metric: Task#lastCheckpointSize

This PR removes the lastCheckpointSize metric that was broken when the 
key-groups were introduced. I couldn't find an easy way to fix it instead.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5644_metric_chsize

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3214






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5644) Task#lastCheckpointSize metric broken

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837773#comment-15837773
 ] 

ASF GitHub Bot commented on FLINK-5644:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3214

[FLINK-5644] Remove metric: Task#lastCheckpointSize

This PR removes the lastCheckpointSize metric that was broken when the 
key-groups were introduced. I couldn't find an easy way to fix it instead.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5644_metric_chsize

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3214






> Task#lastCheckpointSize metric broken
> -
>
> Key: FLINK-5644
> URL: https://issues.apache.org/jira/browse/FLINK-5644
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Streaming
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0, 1.3.0
>
>
> The lastCheckpointSIze metric was broken when we introduced the key-groups. I 
> couldn't find an easy way to fix the metric, as such i propose to remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5644) Task#lastCheckpointSize metric broken

2017-01-25 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-5644:
---

 Summary: Task#lastCheckpointSize metric broken
 Key: FLINK-5644
 URL: https://issues.apache.org/jira/browse/FLINK-5644
 Project: Flink
  Issue Type: Bug
  Components: Metrics, Streaming
Affects Versions: 1.2.0, 1.3.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.2.0, 1.3.0


The lastCheckpointSIze metric was broken when we introduced the key-groups. I 
couldn't find an easy way to fix the metric, as such i propose to remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5551) NPE at SourceStreamTask

2017-01-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837769#comment-15837769
 ] 

Aljoscha Krettek commented on FLINK-5551:
-

I think this can happen when canceling happens before the Task/Operator is 
properly set up? The exception is a follow-up exceptions that happens because 
of the canceling. Do we need to address this, [~StephanEwen]?

> NPE at SourceStreamTask
> ---
>
> Key: FLINK-5551
> URL: https://issues.apache.org/jira/browse/FLINK-5551
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Prerequisites:
>  * Configure hdfs-based backend for the job
>  * cancel it using webapp admin
> In logs during job cancel:
> {code}
> 2017-01-17 16:22:08,756 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Using 
> user-defined state backend: File State Backend @ 
> hdfs://host:port/flink-checkpoints
> 2017-01-17 16:22:08,756 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to cancel task Source: Custom Source -> Filter 
> (3/8)
> 2017-01-17 16:22:08,756 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Filter (3/8) switched to CANCELING
> 2017-01-17 16:22:08,756 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code Source: Custom Source -> 
> Filter (3/8) (559a224c8ef78884db727dab0b2d5e99).
> 2017-01-17 16:22:08,756 ERROR org.apache.flink.runtime.taskmanager.Task   
>   - Error while canceling the task
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:349)
> at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1214)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5637) Default Flink configuration contains whitespace characters, causing parser WARNings

2017-01-25 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-5637:
-

Assignee: Robert Metzger

> Default Flink configuration contains whitespace characters, causing parser 
> WARNings
> ---
>
> Key: FLINK-5637
> URL: https://issues.apache.org/jira/browse/FLINK-5637
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>  Labels: starter
>
> {code}
> 2017-01-25 09:45:30,670 WARN  
> org.apache.flink.configuration.GlobalConfiguration- Error while 
> trying to split key and value in configuration file 
> /yarn/nm/usercache/robert/appcache/application_1485249546281_0018/container_1485249546281_0018_01_01/flink-conf.yaml:
>   
> {code}
> The whitespace is currently in line 67:
> {code}
> #==
>  
> # The address under which the web-based runtime monitor listens.
> {code}
> I think we should add a test to the {{GlobalConfigurationTest}} that ensures 
> the configuration file we are shipping doesn't produce any WARNings by 
> default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837761#comment-15837761
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3213
  
+1


> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5516) Hardcoded paths in flink-python/.../PythonPlanBinder.java

2017-01-25 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837760#comment-15837760
 ] 

Chesnay Schepler commented on FLINK-5516:
-

Yeah this is easy to implement; it is pretty much a one-liner in the 
PythonPlanBinder.

There is only one small thing to be wary of:

The path we are talking about here is where we upload the python library, to 
then register it in the DistributedCache. The default for this is "hdfs:/tmp". 
However. if you execute in a local environment (i.e. the tests) then this is 
changed to "file:/flink".

So...we could change the default to "file:..." and force the user to configure 
a path. Or keep the current behavior, but introduce a flag so that we don't 
override the user-specified location.

> Hardcoded paths in flink-python/.../PythonPlanBinder.java
> -
>
> Key: FLINK-5516
> URL: https://issues.apache.org/jira/browse/FLINK-5516
> Project: Flink
>  Issue Type: Improvement
>  Components: Python API
>Reporter: Felix seibert
>
> The PythonPlanBinder.java contains three hardcoded filesystem paths:
> {code:java}
> public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> private static String FLINK_HDFS_PATH = "hdfs:/tmp";
> public static final String FLINK_TMP_DATA_DIR = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
> {code}
> _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by 
> modifying _java.io.tmpdir_.
> For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying 
> the source. 
> Is it possible to make all three parameters configurable in the usual flink 
> configuration files (like flink-conf.yaml)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3213: [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3213
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem

2017-01-25 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-5492.
---
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

> BootstrapTools log wrong address of started ActorSystem
> ---
>
> Key: FLINK-5492
> URL: https://issues.apache.org/jira/browse/FLINK-5492
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the 
> {{startActorSystem}} function logs the IP resolved from the provided hostname 
> as the {{ActorSystem}} address. However, then the function uses the 
> unresolved hostname to start the {{ActorSystem}}. Since Akka matches the 
> ActorSystem's address and the destination address of the incoming message we 
> should log the URL which is used to start the {{ActorSystem}} and not the 
> resolved IP (messages with the IP will usually be rejected).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem

2017-01-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837756#comment-15837756
 ] 

Robert Metzger commented on FLINK-5492:
---

Fixed for 1.2 in http://git-wip-us.apache.org/repos/asf/flink/commit/887c074a
Fixed for 1.3 in http://git-wip-us.apache.org/repos/asf/flink/commit/cf6b3fb2

> BootstrapTools log wrong address of started ActorSystem
> ---
>
> Key: FLINK-5492
> URL: https://issues.apache.org/jira/browse/FLINK-5492
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the 
> {{startActorSystem}} function logs the IP resolved from the provided hostname 
> as the {{ActorSystem}} address. However, then the function uses the 
> unresolved hostname to start the {{ActorSystem}}. Since Akka matches the 
> ActorSystem's address and the destination address of the incoming message we 
> should log the URL which is used to start the {{ActorSystem}} and not the 
> resolved IP (messages with the IP will usually be rejected).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3213: [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3213

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
stateFutureFixDiscardBackport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3213


commit 99452d926a2cb210dbbabdd3c8eee8fdcf74058e
Author: Till Rohrmann 
Date:   2017-01-25T13:39:51Z

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837752#comment-15837752
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3213

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
stateFutureFixDiscardBackport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3213


commit 99452d926a2cb210dbbabdd3c8eee8fdcf74058e
Author: Till Rohrmann 
Date:   2017-01-25T13:39:51Z

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.




> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails

2017-01-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837750#comment-15837750
 ] 

Robert Metzger commented on FLINK-5229:
---

Thank you!

> Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator 
> fails 
> --
>
> Key: FLINK-5229
> URL: https://issues.apache.org/jira/browse/FLINK-5229
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4, 1.3.0
>
>
> Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If 
> the first operators succeed in creating a checkpoint but a subsequent 
> operator in the chain fails, the {{StreamTask}} has to clean up the already 
> completed checkpoints. Otherwise we might end up with orphaned state data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837751#comment-15837751
 ] 

ASF GitHub Bot commented on FLINK-5643:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3212

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink stateFutureFixDiscard

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3212


commit a3062af22b2be71f6e85a89bd554a87ad1ced2de
Author: Till Rohrmann 
Date:   2017-01-25T13:39:51Z

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.




> StateUtil.discardStateFuture fails when state future contains null value
> 
>
> Key: FLINK-5643
> URL: https://issues.apache.org/jira/browse/FLINK-5643
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> When calling {{StateUtil.discardStateFuture}} with a state future which 
> contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3212: [FLINK-5643] Fix NPE in StateUtil

2017-01-25 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3212

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink stateFutureFixDiscard

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3212


commit a3062af22b2be71f6e85a89bd554a87ad1ced2de
Author: Till Rohrmann 
Date:   2017-01-25T13:39:51Z

[FLINK-5643] Fix NPE in StateUtil

Introduces a null check to deal with state futures which have a null value.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails

2017-01-25 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837748#comment-15837748
 ] 

Till Rohrmann commented on FLINK-5229:
--

Will be addressed with FLINK-5643.

> Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator 
> fails 
> --
>
> Key: FLINK-5229
> URL: https://issues.apache.org/jira/browse/FLINK-5229
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4, 1.3.0
>
>
> Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If 
> the first operators succeed in creating a checkpoint but a subsequent 
> operator in the chain fails, the {{StreamTask}} has to clean up the already 
> completed checkpoints. Otherwise we might end up with orphaned state data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5643) StateUtil.discardStateFuture fails when state future contains null value

2017-01-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5643:


 Summary: StateUtil.discardStateFuture fails when state future 
contains null value
 Key: FLINK-5643
 URL: https://issues.apache.org/jira/browse/FLINK-5643
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0, 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.2.0, 1.3.0


When calling {{StateUtil.discardStateFuture}} with a state future which 
contains a null value, then the operation fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5516) Hardcoded paths in flink-python/.../PythonPlanBinder.java

2017-01-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837744#comment-15837744
 ] 

Aljoscha Krettek commented on FLINK-5516:
-

[~Zentol] would this be possible?

I think no one is currently working on these parts but we would be very happy 
about contributions. :-)

> Hardcoded paths in flink-python/.../PythonPlanBinder.java
> -
>
> Key: FLINK-5516
> URL: https://issues.apache.org/jira/browse/FLINK-5516
> Project: Flink
>  Issue Type: Improvement
>  Components: Python API
>Reporter: Felix seibert
>
> The PythonPlanBinder.java contains three hardcoded filesystem paths:
> {code:java}
> public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> private static String FLINK_HDFS_PATH = "hdfs:/tmp";
> public static final String FLINK_TMP_DATA_DIR = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
> {code}
> _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by 
> modifying _java.io.tmpdir_.
> For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying 
> the source. 
> Is it possible to make all three parameters configurable in the usual flink 
> configuration files (like flink-conf.yaml)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-25 Thread Andrew Efimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Efimov closed FLINK-4905.

   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed by Stephan Ewen

> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Fix For: 1.3.0
>
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails

2017-01-25 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837740#comment-15837740
 ] 

Till Rohrmann commented on FLINK-5229:
--

Should be fixed. Apparently, there are state handle futures which contain 
{{null}} values.

> Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator 
> fails 
> --
>
> Key: FLINK-5229
> URL: https://issues.apache.org/jira/browse/FLINK-5229
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4, 1.3.0
>
>
> Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If 
> the first operators succeed in creating a checkpoint but a subsequent 
> operator in the chain fails, the {{StreamTask}} has to clean up the already 
> completed checkpoints. Otherwise we might end up with orphaned state data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5641) Directories of expired checkpoints are not cleaned up

2017-01-25 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-5641:
--
Attachment: application-1485249546281-0021-misbehaved-1.2.0-rc1

> Directories of expired checkpoints are not cleaned up
> -
>
> Key: FLINK-5641
> URL: https://issues.apache.org/jira/browse/FLINK-5641
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
> Attachments: application-1485249546281-0021-misbehaved-1.2.0-rc1
>
>
> While testing Flink 1.2.0 RC1, some checkpoint files were not removed.
> {code}
> hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d
> lsr: DEPRECATED: Please use 'ls -R' instead.
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:35 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10
> -rw-r--r--   3 robert hadoop 144375 2017-01-25 12:14 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:46 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12
> -rw-r--r--   3 robert hadoop 146351 2017-01-25 12:30 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13
> -rw-r--r--   3 robert hadoop 148431 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:45 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6
> -rw-r--r--   3 robert hadoop  37151 2017-01-25 11:31 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:55 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7
> -rw-r--r--   3 robert hadoop 142269 2017-01-25 11:41 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5642) queryable state: race condition with HeadListState

2017-01-25 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-5642:
--

 Summary: queryable state: race condition with HeadListState
 Key: FLINK-5642
 URL: https://issues.apache.org/jira/browse/FLINK-5642
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.2.0
Reporter: Nico Kruber
Assignee: Nico Kruber


If queryable state accesses a HeapListState instance that is being modified 
during the value's serialisation, it may crash, e.g. with a 
NullPointerException during the serialisation or with an EOFException during 
de-serialisation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5641) Directories of expired checkpoints are not cleaned up

2017-01-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837738#comment-15837738
 ] 

Robert Metzger commented on FLINK-5641:
---

I have a job that has expiring checkpoints.
I'll attach the job's full logs.

> Directories of expired checkpoints are not cleaned up
> -
>
> Key: FLINK-5641
> URL: https://issues.apache.org/jira/browse/FLINK-5641
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>
> While testing Flink 1.2.0 RC1, some checkpoint files were not removed.
> {code}
> hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d
> lsr: DEPRECATED: Please use 'ls -R' instead.
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:35 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10
> -rw-r--r--   3 robert hadoop 144375 2017-01-25 12:14 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:46 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12
> -rw-r--r--   3 robert hadoop 146351 2017-01-25 12:30 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13
> -rw-r--r--   3 robert hadoop 148431 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:45 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6
> -rw-r--r--   3 robert hadoop  37151 2017-01-25 11:31 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:55 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7
> -rw-r--r--   3 robert hadoop 142269 2017-01-25 11:41 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5641) Directories of expired checkpoints are not cleaned up

2017-01-25 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837736#comment-15837736
 ] 

Till Rohrmann commented on FLINK-5641:
--

[~rmetzger] how have you produced this?

> Directories of expired checkpoints are not cleaned up
> -
>
> Key: FLINK-5641
> URL: https://issues.apache.org/jira/browse/FLINK-5641
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>
> While testing Flink 1.2.0 RC1, some checkpoint files were not removed.
> {code}
> hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d
> lsr: DEPRECATED: Please use 'ls -R' instead.
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:35 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10
> -rw-r--r--   3 robert hadoop 144375 2017-01-25 12:14 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:46 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12
> -rw-r--r--   3 robert hadoop 146351 2017-01-25 12:30 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13
> -rw-r--r--   3 robert hadoop 148431 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:45 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6
> -rw-r--r--   3 robert hadoop  37151 2017-01-25 11:31 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:55 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7
> -rw-r--r--   3 robert hadoop 142269 2017-01-25 11:41 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5229) Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator fails

2017-01-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837735#comment-15837735
 ] 

Robert Metzger commented on FLINK-5229:
---

I'm seeing the following exceptions in my 1.2.0 RC1 code

{code}
2017-01-25 12:47:48,775 WARN  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
properly clean up the async checkpoint runnable.
java.lang.Exception: Could not properly cancel managed operator state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:992)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:979)
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:224)
at 
org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:363)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1384)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:81)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:97)
... 7 more
{code}

I'm asking here because its code that has been touched while addressing this 
JIRA.

How critical is this?

> Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator 
> fails 
> --
>
> Key: FLINK-5229
> URL: https://issues.apache.org/jira/browse/FLINK-5229
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4, 1.3.0
>
>
> Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If 
> the first operators succeed in creating a checkpoint but a subsequent 
> operator in the chain fails, the {{StreamTask}} has to clean up the already 
> completed checkpoints. Otherwise we might end up with orphaned state data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97775252
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 ---
@@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input,
return input.transform(opName, resultType, 
operator).forceNonParallel();
}
 
+   // 

+   //  AggregateFunction
+   // 

+
+   /**
+* Applies the given fold function to each window. The window function 
is called for each
+* evaluation of the window for each key individually. The output of 
the reduce function is
+* interpreted as a regular non-windowed stream.
+*
+* @param function The fold function.
--- End diff --

Typo: "fold function"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837730#comment-15837730
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97775106
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 ---
@@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input,
return input.transform(opName, resultType, 
operator).forceNonParallel();
}
 
+   // 

+   //  AggregateFunction
+   // 

+
+   /**
+* Applies the given fold function to each window. The window function 
is called for each
+* evaluation of the window for each key individually. The output of 
the reduce function is
--- End diff --

Typo: "reduce function"


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97774934
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 ---
@@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input,
return input.transform(opName, resultType, 
operator).forceNonParallel();
}
 
+   // 

+   //  AggregateFunction
+   // 

+
+   /**
+* Applies the given fold function to each window. The window function 
is called for each
--- End diff --

Typo: "fold function"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837729#comment-15837729
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97776428
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 ---
@@ -101,6 +108,24 @@ public void testReduceWithRichReducerFails() throws 
Exception {
}
 
/**
+* .aggregate() does not support RichAggregateFunction, since the 
reduce function is used internally
--- End diff --

Typo: "aggregate function".


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97776155
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -515,12 +515,16 @@ public WindowedStream(KeyedStream input,
// 

 
/**
-* Applies the given fold function to each window. The window function 
is called for each
-* evaluation of the window for each key individually. The output of 
the reduce function is
-* interpreted as a regular non-windowed stream.
+* Applies the given aggregation function to each window. The 
aggregation function is called for
+* each element, aggregating values incrementally and keeping the state 
to one accumulator
+* per key and window.
 *
 * @param function The fold function.
--- End diff --

Pre-existing typo: "fold function".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837731#comment-15837731
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97775252
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 ---
@@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input,
return input.transform(opName, resultType, 
operator).forceNonParallel();
}
 
+   // 

+   //  AggregateFunction
+   // 

+
+   /**
+* Applies the given fold function to each window. The window function 
is called for each
+* evaluation of the window for each key individually. The output of 
the reduce function is
+* interpreted as a regular non-windowed stream.
+*
+* @param function The fold function.
--- End diff --

Typo: "fold function"


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97773952
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -18,14 +18,37 @@
 
 package org.apache.flink.api.common.functions;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 
 /**
+ * The {@code AggregateFunction} is a flexible aggregation function, 
characterized by the
+ * following features:
+ * 
+ * 
+ * The aggregates may use different types for input values, 
intermediate aggregates,
+ * and result type, to support a wide range of aggregation 
types.
+ * 
+ * Support for distributive aggregations: Different intermediate 
aggregates can be
+ * merged together, to allow for pre-aggregation/final-aggregation 
optimizations.
+ * 
+ *
+ * The {@code AggregateFunction}'s intermediate aggregate (in-progress 
aggregation state)
+ * in called the accumulator. Values are added to the accumulator, 
and final aggregates are
--- End diff --

Typo: "is called"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837733#comment-15837733
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97776155
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -515,12 +515,16 @@ public WindowedStream(KeyedStream input,
// 

 
/**
-* Applies the given fold function to each window. The window function 
is called for each
-* evaluation of the window for each key individually. The output of 
the reduce function is
-* interpreted as a regular non-windowed stream.
+* Applies the given aggregation function to each window. The 
aggregation function is called for
+* each element, aggregating values incrementally and keeping the state 
to one accumulator
+* per key and window.
 *
 * @param function The fold function.
--- End diff --

Pre-existing typo: "fold function".


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97774041
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -81,14 +104,55 @@
  * }
  * }
  * }
+ * 
+ * @param   The type of the values that are aggregated (input values)
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  The type of the aggregated result
  */
+@PublicEvolving
 public interface AggregateFunction extends Function, 
Serializable {
 
+   /**
+* Creates a new accumulator, starting a new aggregate.
+* 
+* The new accumulator iy typically meaningless unless a value is 
added
--- End diff --

Typo: "is typically"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97776428
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 ---
@@ -101,6 +108,24 @@ public void testReduceWithRichReducerFails() throws 
Exception {
}
 
/**
+* .aggregate() does not support RichAggregateFunction, since the 
reduce function is used internally
--- End diff --

Typo: "aggregate function".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97775106
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 ---
@@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input,
return input.transform(opName, resultType, 
operator).forceNonParallel();
}
 
+   // 

+   //  AggregateFunction
+   // 

+
+   /**
+* Applies the given fold function to each window. The window function 
is called for each
+* evaluation of the window for each key individually. The output of 
the reduce function is
--- End diff --

Typo: "reduce function"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837732#comment-15837732
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97774041
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -81,14 +104,55 @@
  * }
  * }
  * }
+ * 
+ * @param   The type of the values that are aggregated (input values)
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  The type of the aggregated result
  */
+@PublicEvolving
 public interface AggregateFunction extends Function, 
Serializable {
 
+   /**
+* Creates a new accumulator, starting a new aggregate.
+* 
+* The new accumulator iy typically meaningless unless a value is 
added
--- End diff --

Typo: "is typically"


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837728#comment-15837728
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97773952
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -18,14 +18,37 @@
 
 package org.apache.flink.api.common.functions;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 
 /**
+ * The {@code AggregateFunction} is a flexible aggregation function, 
characterized by the
+ * following features:
+ * 
+ * 
+ * The aggregates may use different types for input values, 
intermediate aggregates,
+ * and result type, to support a wide range of aggregation 
types.
+ * 
+ * Support for distributive aggregations: Different intermediate 
aggregates can be
+ * merged together, to allow for pre-aggregation/final-aggregation 
optimizations.
+ * 
+ *
+ * The {@code AggregateFunction}'s intermediate aggregate (in-progress 
aggregation state)
+ * in called the accumulator. Values are added to the accumulator, 
and final aggregates are
--- End diff --

Typo: "is called"


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837734#comment-15837734
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3207#discussion_r97774934
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 ---
@@ -279,6 +286,202 @@ public AllWindowedStream(DataStream input,
return input.transform(opName, resultType, 
operator).forceNonParallel();
}
 
+   // 

+   //  AggregateFunction
+   // 

+
+   /**
+* Applies the given fold function to each window. The window function 
is called for each
--- End diff --

Typo: "fold function"


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5641) Directories of expired checkpoints are not cleaned up

2017-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-5641:


Assignee: Till Rohrmann

> Directories of expired checkpoints are not cleaned up
> -
>
> Key: FLINK-5641
> URL: https://issues.apache.org/jira/browse/FLINK-5641
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>
> While testing Flink 1.2.0 RC1, some checkpoint files were not removed.
> {code}
> hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d
> lsr: DEPRECATED: Please use 'ls -R' instead.
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:35 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10
> -rw-r--r--   3 robert hadoop 144375 2017-01-25 12:14 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:46 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12
> -rw-r--r--   3 robert hadoop 146351 2017-01-25 12:30 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07
> drwxr-xr-x   - robert hadoop  0 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13
> -rw-r--r--   3 robert hadoop 148431 2017-01-25 12:44 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:45 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6
> -rw-r--r--   3 robert hadoop  37151 2017-01-25 11:31 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb
> drwxr-xr-x   - robert hadoop  0 2017-01-25 11:55 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7
> -rw-r--r--   3 robert hadoop 142269 2017-01-25 11:41 
> /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem

2017-01-25 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837726#comment-15837726
 ] 

Till Rohrmann commented on FLINK-5492:
--

Arg, you're right. The grizzled logger used in the Scala files does not support 
parameterized logging statements. Instead the idiomatic way is to use string 
interpolation.

> BootstrapTools log wrong address of started ActorSystem
> ---
>
> Key: FLINK-5492
> URL: https://issues.apache.org/jira/browse/FLINK-5492
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the 
> {{startActorSystem}} function logs the IP resolved from the provided hostname 
> as the {{ActorSystem}} address. However, then the function uses the 
> unresolved hostname to start the {{ActorSystem}}. Since Akka matches the 
> ActorSystem's address and the destination address of the incoming message we 
> should log the URL which is used to start the {{ActorSystem}} and not the 
> resolved IP (messages with the IP will usually be rejected).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4719) KryoSerializer random exception

2017-01-25 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-4719:
---
Assignee: (was: Nico Kruber)

> KryoSerializer random exception
> ---
>
> Key: FLINK-4719
> URL: https://issues.apache.org/jira/browse/FLINK-4719
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.1
>Reporter: Flavio Pompermaier
>  Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
> at 
> 

[GitHub] flink issue #3210: [FLINK-5638] [asyncIO] Fix deadlock when closing two chai...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3210
  
It's a bit hacky to release the lock in scope via `wait()` but seems to 
work.

One more reason to go for a post-box model...

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837722#comment-15837722
 ] 

ASF GitHub Bot commented on FLINK-5638:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3210
  
It's a bit hacky to release the lock in scope via `wait()` but seems to 
work.

One more reason to go for a post-box model...

+1


> Deadlock when closing two chained async I/O operators
> -
>
> Key: FLINK-5638
> URL: https://issues.apache.org/jira/browse/FLINK-5638
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
> chained {{AsyncWaitOperator}} while there is still one element between these 
> two operators in flight.
> The deadlock scenario is the following: Given two chained 
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
> from the queue and output it to {{a2}}. This poll and output operation 
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the 
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In 
> this function, we try to add the new element to the {{StreamElementQueue}}. 
> Now assume that this queue is full. Then the operation will release the 
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
> have consumed all input. The close operation also happens under the 
> checkpoint lock. First the close method waits until all elements from the 
> {{StreamElementQueue}} have been processed (== empty). This happens by 
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on 
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. 
> Since the closing operation does not release the checkpoint lock, {{e1}} 
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
> the queue is empty. This is usually the case if the output operation is 
> atomic. However in the chained case it can happen that the emitter thread has 
> to wait to insert the element into the queue of the next 
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint 
> lock and, thus, the output operation is no longer atomic. We can solve this 
> problem by polling the last queue element after we have outputted it instead 
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not 
> freeing it again. Under these circumstances, the interrupt signal is 
> meaningless because the emitter thread also needs control over the checkpoint 
> lock. We should solve the problem by waiting on the checkpoint lock and 
> periodically checking whether the thread has already stopped or not.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837721#comment-15837721
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I pushed a fix for that to master in 
e7cda75b8594417559d6aac6229b5893f5459f0f


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem

2017-01-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837720#comment-15837720
 ] 

Robert Metzger commented on FLINK-5492:
---

I will provide a fix for the issue.

> BootstrapTools log wrong address of started ActorSystem
> ---
>
> Key: FLINK-5492
> URL: https://issues.apache.org/jira/browse/FLINK-5492
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the 
> {{startActorSystem}} function logs the IP resolved from the provided hostname 
> as the {{ActorSystem}} address. However, then the function uses the 
> unresolved hostname to start the {{ActorSystem}}. Since Akka matches the 
> ActorSystem's address and the destination address of the incoming message we 
> should log the URL which is used to start the {{ActorSystem}} and not the 
> resolved IP (messages with the IP will usually be rejected).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I pushed a fix for that to master in 
e7cda75b8594417559d6aac6229b5893f5459f0f


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5640) configure the explicit Unit Test file suffix

2017-01-25 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5640:
-
Description: 
There are four types of Unit Test file: *ITCase.java, *Test.java, 
*ITSuite.scala, *Suite.scala
File name ending with "IT.java" is integration test. File name ending with 
"Test.java"  is unit test.

It's clear for Surefire plugin of default-test execution to declare that 
"*Test.*" is Java Unit Test.

The test file statistics below:
* Suite  total: 10
* ITCase  total: 378
* Test  total: 1008
* ITSuite  total: 14

  was:

There are four types of Unit Test file: *ITCase.java, *Test.java, 
*ITSuite.scala, *Suite.scala
File name ending with "IT.java" is integration test. File name ending with 
"Test.java"  is unit test.

It's clear for Surefire plugin of default-test execution to declare that 
"*Test.*" is Java Unit Test.

* Suite  total: 10
* ITCase  total: 378
* Test  total: 1008
* ITSuite  total: 14


> configure the explicit Unit Test file suffix
> 
>
> Key: FLINK-5640
> URL: https://issues.apache.org/jira/browse/FLINK-5640
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: shijinkui
>
> There are four types of Unit Test file: *ITCase.java, *Test.java, 
> *ITSuite.scala, *Suite.scala
> File name ending with "IT.java" is integration test. File name ending with 
> "Test.java"  is unit test.
> It's clear for Surefire plugin of default-test execution to declare that 
> "*Test.*" is Java Unit Test.
> The test file statistics below:
> * Suite  total: 10
> * ITCase  total: 378
> * Test  total: 1008
> * ITSuite  total: 14



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5640) configure the explicit Unit Test file suffix

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837719#comment-15837719
 ] 

ASF GitHub Bot commented on FLINK-5640:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3211

c[FLINK-5640][test]onfigure the explicit Unit Test file suffix

There are four types of Unit Test file: *ITCase.java, *Test.java, 
*ITSuite.scala, *Suite.scala
File name ending with "IT.java" is integration test. File name ending with 
"Test.java"  is unit test.

It's clear for Surefire plugin of default-test execution to declare that 
"*Test.*" is Java Unit Test.

The test file statistics:
* Suite  total: 10
* ITCase  total: 378
* Test  total: 1008
* ITSuite  total: 14

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink defined_ut_suffix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3211


commit aeea3f0b4cd9de5a3ea6a7eb37ce7b70d66d4942
Author: shijinkui 
Date:   2017-01-25T13:18:13Z

c[FLINK-5640][test]onfigure the explicit Unit Test file suffix




> configure the explicit Unit Test file suffix
> 
>
> Key: FLINK-5640
> URL: https://issues.apache.org/jira/browse/FLINK-5640
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: shijinkui
>
> There are four types of Unit Test file: *ITCase.java, *Test.java, 
> *ITSuite.scala, *Suite.scala
> File name ending with "IT.java" is integration test. File name ending with 
> "Test.java"  is unit test.
> It's clear for Surefire plugin of default-test execution to declare that 
> "*Test.*" is Java Unit Test.
> * Suite  total: 10
> * ITCase  total: 378
> * Test  total: 1008
> * ITSuite  total: 14



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3211: c[FLINK-5640][test]onfigure the explicit Unit Test...

2017-01-25 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3211

c[FLINK-5640][test]onfigure the explicit Unit Test file suffix

There are four types of Unit Test file: *ITCase.java, *Test.java, 
*ITSuite.scala, *Suite.scala
File name ending with "IT.java" is integration test. File name ending with 
"Test.java"  is unit test.

It's clear for Surefire plugin of default-test execution to declare that 
"*Test.*" is Java Unit Test.

The test file statistics:
* Suite  total: 10
* ITCase  total: 378
* Test  total: 1008
* ITSuite  total: 14

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink defined_ut_suffix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3211


commit aeea3f0b4cd9de5a3ea6a7eb37ce7b70d66d4942
Author: shijinkui 
Date:   2017-01-25T13:18:13Z

c[FLINK-5640][test]onfigure the explicit Unit Test file suffix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837716#comment-15837716
 ] 

ASF GitHub Bot commented on FLINK-5638:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3209

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3209


commit 30520d95786d630eb14ff613d0990ce03779dd3c
Author: Till Rohrmann 
Date:   2017-01-25T13:11:48Z

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.




> Deadlock when closing two chained async I/O operators
> -
>
> Key: FLINK-5638
> URL: https://issues.apache.org/jira/browse/FLINK-5638
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
> chained {{AsyncWaitOperator}} while there is still one element between these 
> two operators in flight.
> The deadlock scenario is the following: Given two chained 
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
> from the queue and output it to {{a2}}. This poll and output operation 
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the 
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In 
> this function, we try to add the new element to the {{StreamElementQueue}}. 
> Now assume that this queue is full. Then the operation will release the 
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
> have consumed all input. The close operation also happens under the 
> checkpoint lock. First the close method waits until all elements from the 
> {{StreamElementQueue}} have been processed (== empty). This happens by 
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on 
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. 
> Since the closing operation does not release the checkpoint lock, {{e1}} 
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
> the queue is empty. This is usually the case if the output operation is 
> atomic. However in the chained case it can happen that the emitter thread has 
> to wait to insert the element into the queue of the next 
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint 
> lock and, thus, the output operation is no longer atomic. We can solve this 
> problem by polling the last queue element after we have outputted it instead 
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not 
> freeing it again. Under these circumstances, the interrupt signal is 
> meaningless because the emitter thread also needs control over the checkpoint 
> lock. We should solve the problem by waiting on the checkpoint lock and 
> periodically checking whether the thread has already stopped or not.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837717#comment-15837717
 ] 

ASF GitHub Bot commented on FLINK-5638:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3210

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR is a backport of #3209 onto `release-1.2.`

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlockBackport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3210


commit 770589dd286b1bc30d9bf79d813bc30a371e1995
Author: Till Rohrmann 
Date:   2017-01-25T13:11:48Z

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.




> Deadlock when closing two chained async I/O operators
> -
>
> Key: FLINK-5638
> URL: https://issues.apache.org/jira/browse/FLINK-5638
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
> chained {{AsyncWaitOperator}} while there is still one element between these 
> two operators in flight.
> The deadlock scenario is the following: Given two chained 
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
> from the queue and output it to {{a2}}. This poll and output operation 
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the 
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In 
> this function, we try to add the new element to the {{StreamElementQueue}}. 
> Now assume that this queue is full. Then the operation will release the 
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
> have consumed all input. The close operation also happens under the 
> checkpoint lock. First the close method waits until all elements from the 
> {{StreamElementQueue}} have been processed (== empty). This happens by 
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on 
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. 
> Since the closing operation does not release the checkpoint lock, {{e1}} 
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
> the queue is empty. This is usually the case if the output operation is 
> atomic. However in the chained case it can happen that the emitter thread has 
> to wait to insert the element into the queue of the next 
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint 
> lock and, thus, the output operation is no longer atomic. We can solve this 
> problem by polling the last queue element after we have outputted it instead 
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not 
> freeing it again. Under these circumstances, the interrupt signal is 
> meaningless because the emitter thread also needs control over the checkpoint 
> lock. We should solve the problem by waiting on the checkpoint lock and 
> periodically checking whether the thread has already stopped or not.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3210: [FLINK-5638] [asyncIO] Fix deadlock when closing t...

2017-01-25 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3210

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR is a backport of #3209 onto `release-1.2.`

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlockBackport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3210


commit 770589dd286b1bc30d9bf79d813bc30a371e1995
Author: Till Rohrmann 
Date:   2017-01-25T13:11:48Z

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3209: [FLINK-5638] [asyncIO] Fix deadlock when closing t...

2017-01-25 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3209

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3209


commit 30520d95786d630eb14ff613d0990ce03779dd3c
Author: Till Rohrmann 
Date:   2017-01-25T13:11:48Z

[FLINK-5638] [asyncIO] Fix deadlock when closing two chained 
AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first 
output the
element before removing it from the StreamElementQueue. That way the close 
method waits
until also the Emitter has outputted the last completed element. 
Additionally, the
stopResources method now frees the checkpoint lock in order to let the 
emitter thread
react to the interrupt signal.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837713#comment-15837713
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user BrainLogic commented on the issue:

https://github.com/apache/flink/pull/3035
  
I caught yours idea and together with the argument which I mentioned above 
- users will not extensively use 8 version of kafka connector, I agree with 
this proposal.
Let me finish this jira, I can create fix based on yours proposal and try 
to implement test tonight.


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-25 Thread BrainLogic
Github user BrainLogic commented on the issue:

https://github.com/apache/flink/pull/3035
  
I caught yours idea and together with the argument which I mentioned above 
- users will not extensively use 8 version of kafka connector, I agree with 
this proposal.
Let me finish this jira, I can create fix based on yours proposal and try 
to implement test tonight.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5628) CheckpointStatsTracker implements Serializable but isn't

2017-01-25 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-5628:
--

Assignee: Ufuk Celebi

> CheckpointStatsTracker implements Serializable but isn't
> 
>
> Key: FLINK-5628
> URL: https://issues.apache.org/jira/browse/FLINK-5628
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The CheckpointStatsTracker implements the Serializable interface, even though 
> it no longer is serializable as it contains a List.
> This was introduced in 579bc96446d598a2cfe8237b4ebd62d8c9df3483 which 
> reworked the checkpoint stats tracking,
> This does not affect 1.2 or 1.3 in any way (since these objects aren't 
> serialized there), but it blocks the implementation of the HistoryServer.
> The AccessExecution*/ArchivedExecution* classes, which are supposed to be a 
> serializable form of the ExecutionGraph, are thus broken as they also make 
> use of the CheckpointStatsTracker.
> (Note: * = Graph/ExecutionJobVertex/ExecutionVertex/Execution) .
> This wasn't catched in tests since no ExecutionJobVertices were given to the 
> CheckpointStatsTracker in ArchivedExecutionGraphTest#setExecutionGraph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5608) Cancel button not always visible

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837708#comment-15837708
 ] 

ASF GitHub Bot commented on FLINK-5608:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3189
  
Good point. @rehevkor5 is it possible to only hide the color boxes when the 
browser window is very narrow?


> Cancel button not always visible
> 
>
> Key: FLINK-5608
> URL: https://issues.apache.org/jira/browse/FLINK-5608
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.1.4
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>Priority: Minor
>
> When the window is not wide enough, or when the job name is too long, the 
> "Cancel" button in the Job view of the web UI is not visible because it is 
> the first element that gets wrapped down and gets covered by the secondary 
> navbar (the tabs). This causes us to often need to resize the browser wider 
> than our monitor in order to use the cancel button.
> In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the 
> content may wrap, especially if the content's horizontal width if not known & 
> fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any 
> unexpected change in height will result in overlap with the rest of the 
> normal-flow content in the page. The Bootstrap docs explain this in their 
> "Overflowing content" callout.
> I am submitting a PR which does not attempt to resolve all issues with the 
> fixed navbar approach, but attempts to improve the situation by using less 
> horizontal space and by altering the layout approach of the Cancel button.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5641) Directories of expired checkpoints are not cleaned up

2017-01-25 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-5641:
-

 Summary: Directories of expired checkpoints are not cleaned up
 Key: FLINK-5641
 URL: https://issues.apache.org/jira/browse/FLINK-5641
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Robert Metzger


While testing Flink 1.2.0 RC1, some checkpoint files were not removed.

{code}
hadoop fs -lsr /shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x   - robert hadoop  0 2017-01-25 12:35 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10
-rw-r--r--   3 robert hadoop 144375 2017-01-25 12:14 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-10/dbc9181b-e291-4935-9e5b-bd0e447cdc25
drwxr-xr-x   - robert hadoop  0 2017-01-25 12:46 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12
-rw-r--r--   3 robert hadoop 146351 2017-01-25 12:30 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-12/6fcab0d8-c6e8-4744-8361-e8bedd90cf07
drwxr-xr-x   - robert hadoop  0 2017-01-25 12:44 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13
-rw-r--r--   3 robert hadoop 148431 2017-01-25 12:44 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-13/64579d41-9895-49fd-8c76-06814778ab58
drwxr-xr-x   - robert hadoop  0 2017-01-25 11:45 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6
-rw-r--r--   3 robert hadoop  37151 2017-01-25 11:31 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-6/6bddff32-257a-4890-8dcb-6397b4772deb
drwxr-xr-x   - robert hadoop  0 2017-01-25 11:55 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7
-rw-r--r--   3 robert hadoop 142269 2017-01-25 11:41 
/shared/checkpoint-dir-rocks/8e414a5dc3f863d85e6eb68e599c127d/chk-7/bb7f0474-1661-4360-89d4-ed73b37cf6f0
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3189: [FLINK-5608] [webfrontend] Cancel button stays visible in...

2017-01-25 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3189
  
Good point. @rehevkor5 is it possible to only hide the color boxes when the 
browser window is very narrow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5638) Deadlock when closing two chained async I/O operators

2017-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5638:
-
Fix Version/s: (was: 1.2.1)
   1.2.0

> Deadlock when closing two chained async I/O operators
> -
>
> Key: FLINK-5638
> URL: https://issues.apache.org/jira/browse/FLINK-5638
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
> chained {{AsyncWaitOperator}} while there is still one element between these 
> two operators in flight.
> The deadlock scenario is the following: Given two chained 
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
> from the queue and output it to {{a2}}. This poll and output operation 
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the 
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In 
> this function, we try to add the new element to the {{StreamElementQueue}}. 
> Now assume that this queue is full. Then the operation will release the 
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
> have consumed all input. The close operation also happens under the 
> checkpoint lock. First the close method waits until all elements from the 
> {{StreamElementQueue}} have been processed (== empty). This happens by 
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on 
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. 
> Since the closing operation does not release the checkpoint lock, {{e1}} 
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
> the queue is empty. This is usually the case if the output operation is 
> atomic. However in the chained case it can happen that the emitter thread has 
> to wait to insert the element into the queue of the next 
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint 
> lock and, thus, the output operation is no longer atomic. We can solve this 
> problem by polling the last queue element after we have outputted it instead 
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not 
> freeing it again. Under these circumstances, the interrupt signal is 
> meaningless because the emitter thread also needs control over the checkpoint 
> lock. We should solve the problem by waiting on the checkpoint lock and 
> periodically checking whether the thread has already stopped or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5638) Deadlock when closing two chained async I/O operators

2017-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5638:
-
Description: 
The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
chained {{AsyncWaitOperator}} while there is still one element between these 
two operators in flight.

The deadlock scenario is the following: Given two chained 
{{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
from the queue and output it to {{a2}}. This poll and output operation happens 
under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} 
thread will directly call {{a2's}} {{processElement}} function. In this 
function, we try to add the new element to the {{StreamElementQueue}}. Now 
assume that this queue is full. Then the operation will release the checkpoint 
lock and wait until it is notified again.

In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
have consumed all input. The close operation also happens under the checkpoint 
lock. First the close method waits until all elements from the 
{{StreamElementQueue}} have been processed (== empty). This happens by waiting 
on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}. 
When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the 
closing operation does not release the checkpoint lock, {{e1}} cannot regain 
the synchronization lock and voila we have a deadlock.

There are two problems which cause the problem:

1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
the queue is empty. This is usually the case if the output operation is atomic. 
However in the chained case it can happen that the emitter thread has to wait 
to insert the element into the queue of the next {{AsyncWaitOperator}}. Under 
these circumstances, we release the checkpoint lock and, thus, the output 
operation is no longer atomic. We can solve this problem by polling the last 
queue element after we have outputted it instead of before.

2. We interrupt the emitter thread while holding the checkpoint lock and not 
freeing it again. Under these circumstances, the interrupt signal is 
meaningless because the emitter thread also needs control over the checkpoint 
lock. We should solve the problem by waiting on the checkpoint lock and 
periodically checking whether the thread has already stopped or not.

https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt

  was:
The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
chained {{AsyncWaitOperator}} while there is still one element between these 
two operators in flight.

The deadlock scenario is the following: Given two chained 
{{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
from the queue and output it to {{a2}}. This poll and output operation happens 
under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} 
thread will directly call {{a2's}} {{processElement}} function. In this 
function, we try to add the new element to the {{StreamElementQueue}}. Now 
assume that this queue is full. Then the operation will release the checkpoint 
lock and wait until it is notified again.

In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
have consumed all input. The close operation also happens under the checkpoint 
lock. First the close method waits until all elements from the 
{{StreamElementQueue}} have been processed (== empty). This happens by waiting 
on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}. 
When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the 
closing operation does not release the checkpoint lock, {{e1}} cannot regain 
the synchronization lock and voila we have a deadlock.

There are two problems which cause the problem:

1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
the queue is empty. This is usually the case if the output operation is atomic. 
However in the chained case it can happen that the emitter thread has to wait 
to insert the element into the queue of the next {{AsyncWaitOperator}}. Under 
these circumstances, we release the checkpoint lock and, thus, the output 
operation is no longer atomic. We can solve this problem by polling the last 
queue element after we have outputted it instead of before.

2. We interrupt the emitter thread while holding the checkpoint lock and not 
freeing it again. Under these circumstances, the interrupt signal is 
meaningless because the emitter thread also needs control over the checkpoint 
lock. We should solve the problem by waiting on the checkpoint lock and 
periodically checking whether the thread has already 

[GitHub] flink pull request #3208: [hotfix] [rat] Add exclusion for rolling-sink snap...

2017-01-25 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3208

[hotfix] [rat] Add exclusion for rolling-sink snapshot

Adds a RAT exclusion for the rolling sink snapshot used for testing 
backwards compatibility. Note that the RAT plugin only complains about these 
files on Windows. Which is weird.

Anyway, this should go in master and 1.2.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink hotfix_rat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3208


commit e397f2d3a3ec41662235e05ffa90f8f5b8e233cb
Author: zentol 
Date:   2017-01-25T12:50:17Z

[hotfix] [rat] Add exclusion for rolling-sink snapshot




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837691#comment-15837691
 ] 

ASF GitHub Bot commented on FLINK-5634:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3204
  
Looks good to me, +1

@uce the ScriptMeister should probably give his blessing as well...


> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3204: [FLINK-5634] Flink should not always redirect stdout to a...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3204
  
Looks good to me, +1

@uce the ScriptMeister should probably give his blessing as well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837684#comment-15837684
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I this case, the exception would not be logged, true.
It is a very rare corner case that should not affect correctness, and not 
really distinguishable from the case where an exception is thrown after the 
source went into finishing mode.

My feeling is to not log here.
  - The advantage is that we don't pollute the log with meaningless 
exception in the common case (many users would be led onto a false track).
  - Given that the corner case you described does not affect any expected 
behavior (the committing action might as well not have happened at all it it 
were started a few msecs later) it is okay to "swallow" the exception.
  - If it does in fact affect the offset committing in more cases, it will 
surely also occur at another committing attempt during execution.


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I this case, the exception would not be logged, true.
It is a very rare corner case that should not affect correctness, and not 
really distinguishable from the case where an exception is thrown after the 
source went into finishing mode.

My feeling is to not log here.
  - The advantage is that we don't pollute the log with meaningless 
exception in the common case (many users would be led onto a false track).
  - Given that the corner case you described does not affect any expected 
behavior (the committing action might as well not have happened at all it it 
were started a few msecs later) it is okay to "swallow" the exception.
  - If it does in fact affect the offset committing in more cases, it will 
surely also occur at another committing attempt during execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5640) configure the explicit Unit Test file suffix

2017-01-25 Thread shijinkui (JIRA)
shijinkui created FLINK-5640:


 Summary: configure the explicit Unit Test file suffix
 Key: FLINK-5640
 URL: https://issues.apache.org/jira/browse/FLINK-5640
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: shijinkui
Assignee: shijinkui



There are four types of Unit Test file: *ITCase.java, *Test.java, 
*ITSuite.scala, *Suite.scala
File name ending with "IT.java" is integration test. File name ending with 
"Test.java"  is unit test.

It's clear for Surefire plugin of default-test execution to declare that 
"*Test.*" is Java Unit Test.

* Suite  total: 10
* ITCase  total: 378
* Test  total: 1008
* ITSuite  total: 14



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837665#comment-15837665
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user BrainLogic commented on the issue:

https://github.com/apache/flink/pull/3035
  
Distributed systems and multithreading environments make us think in term 
of logical clock, like Lamport clock, step by step:
Thread1 - fetcher is running `running = true`
Thread2 performs `zkHandler.prepareAndCommitOffsets(offsets)`
Thread2 `running = true` and `zkHandler.prepareAndCommitOffsets(offsets)` 
throws an exception
Thread1 stop fetcher and change flag `running = false` in normal way 
without any exception
Thread2 read the flag `running = false` and `return` although, there is a 
reason of the commit failure that is different from "Client was closed and 
running= false"
Thread1 fetcher is stopped successfully
There is no exception or any information in log regarding the exception 
from `zkHandler.prepareAndCommitOffsets(offsets)`

Again this is a rare case when we can lose a root cause of strange behavior 
- offset will not be committed although there are no any exceptions. Am I wrong?


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-25 Thread BrainLogic
Github user BrainLogic commented on the issue:

https://github.com/apache/flink/pull/3035
  
Distributed systems and multithreading environments make us think in term 
of logical clock, like Lamport clock, step by step:
Thread1 - fetcher is running `running = true`
Thread2 performs `zkHandler.prepareAndCommitOffsets(offsets)`
Thread2 `running = true` and `zkHandler.prepareAndCommitOffsets(offsets)` 
throws an exception
Thread1 stop fetcher and change flag `running = false` in normal way 
without any exception
Thread2 read the flag `running = false` and `return` although, there is a 
reason of the commit failure that is different from "Client was closed and 
running= false"
Thread1 fetcher is stopped successfully
There is no exception or any information in log regarding the exception 
from `zkHandler.prepareAndCommitOffsets(offsets)`

Again this is a rare case when we can lose a root cause of strange behavior 
- offset will not be committed although there are no any exceptions. Am I wrong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3206: [FLINK-5612] Fix GlobPathFilter not-serializable e...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3206#discussion_r97768800
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -138,4 +141,30 @@ public void doubleStarPattern() {
assertFalse(matcher.filterPath(new Path("a/b")));
assertFalse(matcher.filterPath(new Path("a/b/c")));
}
+
+   @Test(expected = NullPointerException.class)
+   public void testIncluePatternIsNull() {
+   new GlobFilePathFilter(
+   null,
+   Collections.emptyList());
+   }
+
+   @Test(expected = NullPointerException.class)
+   public void testExcludePatternIsNull() {
+   new GlobFilePathFilter(
+   Collections.singletonList("**"),
+   null);
+   }
+
+   @Test
+   public void testGlobFilterSerializable() throws IOException {
--- End diff --

How about using `CommonTestUtils.createCopySerializable()` here? That you 
can validate also that the serialized copy behaves valid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    1   2   3   4   >