[jira] [Commented] (FLINK-4913) Per-job Yarn clusters: include user jar in system class loader

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666468#comment-15666468
 ] 

ASF GitHub Bot commented on FLINK-4913:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2795
  
Hey Max! I was also not aware of this issue but Stephan pointed it out to 
me. You can access the user code loader in rich functions via
```
getRuntimeContext().getUserCodeClassLoader()
```
I think it's very good that we changed it for 1.2, but just to make sure 
that there are no surprises (although unlikely) for users upgrading within the 
1.1 series.


> Per-job Yarn clusters: include user jar in system class loader 
> ---
>
> Key: FLINK-4913
> URL: https://issues.apache.org/jira/browse/FLINK-4913
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> Including the jar directly in the system classloader avoids loading it for 
> every instantiation of the ExecutionGraph and every Task execution. Note, 
> this is only possible for per-job clusters (i.e. Yarn/Mesos).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2795: Revert "[FLINK-4913][yarn] include user jars in system cl...

2016-11-15 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2795
  
Hey Max! I was also not aware of this issue but Stephan pointed it out to 
me. You can access the user code loader in rich functions via
```
getRuntimeContext().getUserCodeClassLoader()
```
I think it's very good that we changed it for 1.2, but just to make sure 
that there are no surprises (although unlikely) for users upgrading within the 
1.1 series.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5059) only serialise events once in RecordWriter#broadcastEvent

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666474#comment-15666474
 ] 

ASF GitHub Bot commented on FLINK-5059:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/2805#discussion_r87965585
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
// yet be created
final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
-   output.writeEventToAllChannels(message);
+   final Buffer eventBuffer = 
EventSerializer.toBuffer(message);
--- End diff --

That was the case before and I could have adapted 
`ResultPartitionWriter#writeEventToAllChannels()` accordingly. The question is, 
however, whether we want `ResultPartitionWriter` to be aware of the difference 
between events and buffers or offer a cleaner API that is based on buffers 
only...


> only serialise events once in RecordWriter#broadcastEvent
> -
>
> Key: FLINK-5059
> URL: https://issues.apache.org/jira/browse/FLINK-5059
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent 
> serialises the event once per target channel. Instead, it could serialise the 
> event only once and use the serialised form for every channel and thus save 
> resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2805: [FLINK-5059] only serialise events once in RecordW...

2016-11-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/2805#discussion_r87965585
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
// yet be created
final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
-   output.writeEventToAllChannels(message);
+   final Buffer eventBuffer = 
EventSerializer.toBuffer(message);
--- End diff --

That was the case before and I could have adapted 
`ResultPartitionWriter#writeEventToAllChannels()` accordingly. The question is, 
however, whether we want `ResultPartitionWriter` to be aware of the difference 
between events and buffers or offer a cleaner API that is based on buffers 
only...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2800: Update README.md

2016-11-15 Thread bitchelov
Github user bitchelov commented on the issue:

https://github.com/apache/flink/pull/2800
  
Okay @greghogan,  I understand. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2805: [FLINK-5059] only serialise events once in RecordW...

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2805#discussion_r87967007
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
// yet be created
final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
-   output.writeEventToAllChannels(message);
+   final Buffer eventBuffer = 
EventSerializer.toBuffer(message);
--- End diff --

in any case the entire try/finally block could be moved into the 
ResultPartitionWriter, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5059) only serialise events once in RecordWriter#broadcastEvent

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666496#comment-15666496
 ] 

ASF GitHub Bot commented on FLINK-5059:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2805#discussion_r87967007
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
// yet be created
final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
-   output.writeEventToAllChannels(message);
+   final Buffer eventBuffer = 
EventSerializer.toBuffer(message);
--- End diff --

in any case the entire try/finally block could be moved into the 
ResultPartitionWriter, correct?


> only serialise events once in RecordWriter#broadcastEvent
> -
>
> Key: FLINK-5059
> URL: https://issues.apache.org/jira/browse/FLINK-5059
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent 
> serialises the event once per target channel. Instead, it could serialise the 
> event only once and use the serialised form for every channel and thus save 
> resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5068) YARN HA: Job scheduled before TM is up leading to "Not enough free slots" error

2016-11-15 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-5068:
-

 Summary: YARN HA: Job scheduled before TM is up leading to "Not 
enough free slots" error
 Key: FLINK-5068
 URL: https://issues.apache.org/jira/browse/FLINK-5068
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.1.3
Reporter: Gyula Fora


On one occasion after a job manager failure, the job was could not be recovered 
as it was scheduled before the TM was up causing an unrecoverable failure.

So I ended up with a running yarn jm + tm with enough slots with a job failed 
(and not restarting)

Please drop me a mail for the logs :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2805: [FLINK-5059] only serialise events once in RecordW...

2016-11-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/2805#discussion_r87968119
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
// yet be created
final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
-   output.writeEventToAllChannels(message);
+   final Buffer eventBuffer = 
EventSerializer.toBuffer(message);
--- End diff --

yes, that's right - Let's go with `writeBufferToAllChannels(Buffer)` 
instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5059) only serialise events once in RecordWriter#broadcastEvent

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666510#comment-15666510
 ] 

ASF GitHub Bot commented on FLINK-5059:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/2805#discussion_r87968119
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
// yet be created
final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
-   output.writeEventToAllChannels(message);
+   final Buffer eventBuffer = 
EventSerializer.toBuffer(message);
--- End diff --

yes, that's right - Let's go with `writeBufferToAllChannels(Buffer)` 
instead.


> only serialise events once in RecordWriter#broadcastEvent
> -
>
> Key: FLINK-5059
> URL: https://issues.apache.org/jira/browse/FLINK-5059
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent 
> serialises the event once per target channel. Instead, it could serialise the 
> event only once and use the serialised form for every channel and thus save 
> resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5006) SystemProcessingTimeServiceTest.testTimerSorting fails

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666568#comment-15666568
 ] 

ASF GitHub Bot commented on FLINK-5006:
---

Github user BorisOsipov commented on the issue:

https://github.com/apache/flink/pull/2785
  
@aljoscha thank you for additional research and comments.
@StephanEwen I think we can remove it as I proposed earlier.

I'v removed test and added javadoc to registerTimer method. Please look at 
this.
Is it ok?


> SystemProcessingTimeServiceTest.testTimerSorting fails
> --
>
> Key: FLINK-5006
> URL: https://issues.apache.org/jira/browse/FLINK-5006
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Ufuk Celebi
>  Labels: test-stability
>
> {code}
> testTimerSorting(org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest)
>   Time elapsed: 0.023 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<1478173518115> but was:<1478173518122>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.testTimerSorting(SystemProcessingTimeServiceTest.java:298)
> {code}
> Failed in a private branch with unrelated changes (the test is very much self 
> contained).
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/172910645/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2785: [FLINK-5006] SystemProcessingTimeServiceTest.testTimerSor...

2016-11-15 Thread BorisOsipov
Github user BorisOsipov commented on the issue:

https://github.com/apache/flink/pull/2785
  
@aljoscha thank you for additional research and comments.
@StephanEwen I think we can remove it as I proposed earlier.

I'v removed test and added javadoc to registerTimer method. Please look at 
this.
Is it ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2736
  
I merged it. Thanks for your work and working with me on this: 😃 

Could you please close this PR and the two Jira Issues that it solves?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666587#comment-15666587
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2736
  
I merged it. Thanks for your work and working with me on this: 😃 

Could you please close this PR and the two Jira Issues that it solves?


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-15 Thread VenturaDelMonte
Github user VenturaDelMonte commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r87974146
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

Yes, I know about it, but having a WrappingFunction would 
not be possible here because we need to create a ProcessWindowFunction. Anyway 
as I would get rid of the triple nesting 
InternalWindowFunction(ProcessWindowFunction(WindowFunction))), this is no 
longer an issue. By the way, I added this triple nesting because I thought you 
wanted to completely decouple the WindowFunction interface from runtime 
internals. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1502#comment-1502
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user VenturaDelMonte commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r87974146
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

Yes, I know about it, but having a WrappingFunction would 
not be possible here because we need to create a ProcessWindowFunction. Anyway 
as I would get rid of the triple nesting 
InternalWindowFunction(ProcessWindowFunction(WindowFunction))), this is no 
longer an issue. By the way, I added this triple nesting because I thought you 
wanted to completely decouple the WindowFunction interface from runtime 
internals. 


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5068) YARN HA: Job scheduled before TM is up leading to "Not enough free slots" error

2016-11-15 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1520#comment-1520
 ] 

Ufuk Celebi commented on FLINK-5068:


Do you have a restart strategy configured? Shouldn't this be recovered 
eventually when the TM is up and the job restarts? Feel free to send me the 
logs to uce at apache.

> YARN HA: Job scheduled before TM is up leading to "Not enough free slots" 
> error
> ---
>
> Key: FLINK-5068
> URL: https://issues.apache.org/jira/browse/FLINK-5068
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.3
>Reporter: Gyula Fora
>
> On one occasion after a job manager failure, the job was could not be 
> recovered as it was scheduled before the TM was up causing an unrecoverable 
> failure.
> So I ended up with a running yarn jm + tm with enough slots with a job failed 
> (and not restarting)
> Please drop me a mail for the logs :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1527#comment-1527
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user VenturaDelMonte commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r87975777
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -459,7 +545,28 @@ public WindowedStream(KeyedStream input,
 * @param resultType Type information for the result type of the window 
function
 * @return The data stream that is the result of applying the window 
function to the window.
 */
-   public  SingleOutputStreamOperator apply(R initialValue, 
FoldFunction foldFunction, WindowFunction function, 
TypeInformation resultType) {
+   public  SingleOutputStreamOperator apply(R initialValue, 
FoldFunction foldFunction,
--- End diff --

Thx for the review, first of all. I see your point here. I will introduce 
these ProcessWindowFunction-specific fold/reduce methods, w/o altering the old 
WindowFunction API of course.


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-15 Thread VenturaDelMonte
Github user VenturaDelMonte commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r87975777
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -459,7 +545,28 @@ public WindowedStream(KeyedStream input,
 * @param resultType Type information for the result type of the window 
function
 * @return The data stream that is the result of applying the window 
function to the window.
 */
-   public  SingleOutputStreamOperator apply(R initialValue, 
FoldFunction foldFunction, WindowFunction function, 
TypeInformation resultType) {
+   public  SingleOutputStreamOperator apply(R initialValue, 
FoldFunction foldFunction,
--- End diff --

Thx for the review, first of all. I see your point here. I will introduce 
these ProcessWindowFunction-specific fold/reduce methods, w/o altering the old 
WindowFunction API of course.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2802: Minor fixes

2016-11-15 Thread bitchelov
Github user bitchelov commented on the issue:

https://github.com/apache/flink/pull/2802
  
Hi, @greghogan . I made some update in "sidenav.html". But lets find out 
what’s going on. The first div opens (74 line) and closed at (76).  Div that 
we forgot to close is on line 110 and I’m gonna close it (129 line).

![sidenav](https://cloud.githubusercontent.com/assets/5806063/20299321/813ef298-ab2b-11e6-8d53-300cacc182db.PNG)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-15 Thread zhuhaifeng (JIRA)
zhuhaifeng created FLINK-5069:
-

 Summary: Pending checkpoint statistics gauge
 Key: FLINK-5069
 URL: https://issues.apache.org/jira/browse/FLINK-5069
 Project: Flink
  Issue Type: Improvement
Reporter: zhuhaifeng
Assignee: zhuhaifeng
Priority: Minor
 Fix For: 1.2.0


Add the statistics of pending checkpoints as a gauge metric. When the 
checkpoint appears not to be completed, this metric would  help to get the 
state of a pending checkpoint , e.g  which task did not complete the 
checkpoint. 
The statistic will be as the follows:
checkpointID, 
Number Of Acknowledged Tasks,
Number Of Not yet Acknowledged Tasks, 
The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-15 Thread zhuhaifeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhuhaifeng updated FLINK-5069:
--
Component/s: Metrics

> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2807: [FLINK-4631] Prevent some possible NPEs.

2016-11-15 Thread chermenin
GitHub user chermenin opened a pull request:

https://github.com/apache/flink/pull/2807

[FLINK-4631] Prevent some possible NPEs.

Added additional conditions in several places to check possible NPEs. This 
PR must completely solve 
[FLINK-4631](https://issues.apache.org/jira/browse/FLINK-4631).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chermenin/flink flink-4631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2807


commit b8e8a4507de1c4a101863116b92edefc12fb14f1
Author: Aleksandr Chermenin 
Date:   2016-10-28T09:36:01Z

[FLINK-4631] Avoided NPE in OneInputStreamTask.

commit 9a8ec134900eda502539448e8ecde42dc019fe7a
Author: Aleksandr Chermenin 
Date:   2016-11-14T09:49:48Z

[FLINK-4631] Fixed sink functions.

commit 78176dae7168006a8430f27d9df2abc8e5a9f364
Author: Aleksandr Chermenin 
Date:   2016-11-14T14:06:21Z

[FLINK-4631] Fixed sources and stream tasks.

commit dd70279a1a0b31f5c49c96efebed7399a77811f5
Author: Aleksandr Chermenin 
Date:   2016-11-15T09:27:09Z

[FLINK-4631] Some streaming fixes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4631) NullPointerException during stream task cleanup

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1553#comment-1553
 ] 

ASF GitHub Bot commented on FLINK-4631:
---

GitHub user chermenin opened a pull request:

https://github.com/apache/flink/pull/2807

[FLINK-4631] Prevent some possible NPEs.

Added additional conditions in several places to check possible NPEs. This 
PR must completely solve 
[FLINK-4631](https://issues.apache.org/jira/browse/FLINK-4631).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chermenin/flink flink-4631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2807


commit b8e8a4507de1c4a101863116b92edefc12fb14f1
Author: Aleksandr Chermenin 
Date:   2016-10-28T09:36:01Z

[FLINK-4631] Avoided NPE in OneInputStreamTask.

commit 9a8ec134900eda502539448e8ecde42dc019fe7a
Author: Aleksandr Chermenin 
Date:   2016-11-14T09:49:48Z

[FLINK-4631] Fixed sink functions.

commit 78176dae7168006a8430f27d9df2abc8e5a9f364
Author: Aleksandr Chermenin 
Date:   2016-11-14T14:06:21Z

[FLINK-4631] Fixed sources and stream tasks.

commit dd70279a1a0b31f5c49c96efebed7399a77811f5
Author: Aleksandr Chermenin 
Date:   2016-11-15T09:27:09Z

[FLINK-4631] Some streaming fixes.




> NullPointerException during stream task cleanup
> ---
>
> Key: FLINK-4631
> URL: https://issues.apache.org/jira/browse/FLINK-4631
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
> Environment: Ubuntu server 12.04.5 64 bit
> java version "1.8.0_40"
> Java(TM) SE Runtime Environment (build 1.8.0_40-b26)
> Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)
>Reporter: Avihai Berkovitz
> Fix For: 1.2.0
>
>
> If a streaming job failed during startup (in my case, due to lack of network 
> buffers), all the tasks are being cancelled before they started. This causes 
> many instances of the following exception:
> {noformat}
> 2016-09-18 14:17:12,177 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> cleanup of stream task
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4693) Add session group-windows for batch tables

2016-11-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-4693:
--

Assignee: sunjincheng

> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2808: [FLINK-4927] [yarn]Implement FLI-6 YARN Resource M...

2016-11-15 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/2808

[FLINK-4927] [yarn]Implement FLI-6 YARN Resource Manager

This pr is for issue 
#[4927](https://issues.apache.org/jira/browse/FLINK-4927)

The Flink YARN Resource Manager communicates with YARN's Resource Manager 
to acquire and release containers. And it will start the allocated containers.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-4927

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2808


commit 180d5e9bc6dbb0d669975e73c2c8b880f3074a07
Author: shuai.xus 
Date:   2016-11-03T08:24:47Z

[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

commit 9c1910cd12546582f1ef3549485bedff9a153e11
Author: shuai.xus 
Date:   2016-11-03T08:37:19Z

[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

commit d9d099601f31684b40c3215b6056d783117ca426
Author: shuai.xus 
Date:   2016-11-15T09:16:41Z

[FLINK-4928] [yarn] start yarn cluster pass and change file format

commit 09318c98bc8210305ffda2dc639cd15ac2b0aaf6
Author: shuai.xus 
Date:   2016-11-15T09:35:55Z

[FLINK-4928] [yarn]Implement FLI-6 YARN Resource Manager

commit 7ca8cbe88de0e775ba7239423eb7b9ec1cbfe721
Author: shuai.xus 
Date:   2016-11-15T09:43:28Z

remove YarnFlinkApplicationMasterRunner from branch jira-4927




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4927) Implement FLI-6 YARN Resource Manager

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1588#comment-1588
 ] 

ASF GitHub Bot commented on FLINK-4927:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/2808

[FLINK-4927] [yarn]Implement FLI-6 YARN Resource Manager

This pr is for issue 
#[4927](https://issues.apache.org/jira/browse/FLINK-4927)

The Flink YARN Resource Manager communicates with YARN's Resource Manager 
to acquire and release containers. And it will start the allocated containers.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-4927

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2808


commit 180d5e9bc6dbb0d669975e73c2c8b880f3074a07
Author: shuai.xus 
Date:   2016-11-03T08:24:47Z

[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

commit 9c1910cd12546582f1ef3549485bedff9a153e11
Author: shuai.xus 
Date:   2016-11-03T08:37:19Z

[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

commit d9d099601f31684b40c3215b6056d783117ca426
Author: shuai.xus 
Date:   2016-11-15T09:16:41Z

[FLINK-4928] [yarn] start yarn cluster pass and change file format

commit 09318c98bc8210305ffda2dc639cd15ac2b0aaf6
Author: shuai.xus 
Date:   2016-11-15T09:35:55Z

[FLINK-4928] [yarn]Implement FLI-6 YARN Resource Manager

commit 7ca8cbe88de0e775ba7239423eb7b9ec1cbfe721
Author: shuai.xus 
Date:   2016-11-15T09:43:28Z

remove YarnFlinkApplicationMasterRunner from branch jira-4927




> Implement FLI-6 YARN Resource Manager
> -
>
> Key: FLINK-4927
> URL: https://issues.apache.org/jira/browse/FLINK-4927
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>Assignee: shuai.xu
>
> The Flink YARN Resource Manager communicates with YARN's Resource Manager to 
> acquire and release containers.
> It is also responsible to notify the JobManager eagerly about container 
> failures.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-15 Thread zhuhaifengleon
GitHub user zhuhaifengleon opened a pull request:

https://github.com/apache/flink/pull/2809

[FLINK-5069] [Metrics] Pending checkpoint statistics gauge

This PR introduce pending checkpoints statistics gauge, which help to check 
the state When checkpoints can't be done.
following is the statistics will be collected:
checkpointID, 
Number Of Acknowledged Tasks,
Number Of Not yet Acknowledged Tasks, 
The detail of not yet acknowledged JobVertexID, taskID

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuhaifengleon/flink FLINK-5069

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2809


commit f94e28ed2276b470e408b7116c4ef5b227858df3
Author: zhuhaifengleon 
Date:   2016-11-15T09:29:09Z

[FLINK-5069] [Metrics] Pending checkpoint statistics gauge




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1594#comment-1594
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

GitHub user zhuhaifengleon opened a pull request:

https://github.com/apache/flink/pull/2809

[FLINK-5069] [Metrics] Pending checkpoint statistics gauge

This PR introduce pending checkpoints statistics gauge, which help to check 
the state When checkpoints can't be done.
following is the statistics will be collected:
checkpointID, 
Number Of Acknowledged Tasks,
Number Of Not yet Acknowledged Tasks, 
The detail of not yet acknowledged JobVertexID, taskID

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuhaifengleon/flink FLINK-5069

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2809


commit f94e28ed2276b470e408b7116c4ef5b227858df3
Author: zhuhaifengleon 
Date:   2016-11-15T09:29:09Z

[FLINK-5069] [Metrics] Pending checkpoint statistics gauge




> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666724#comment-15666724
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/2810

[FLINK-3848] Add ProjectableTableSource interface and translation rule.

Extend CsvTableSource to implement projection

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-3848

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2810


commit 95f55a3783ff849357b58d06f3b8c99b20edbe5e
Author: tonycox 
Date:   2016-11-07T16:18:31Z

[FLINK-3848] Add ProjectableTableSource interface and translation rule. 
Extend CsvTableSource to implement projection




> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-15 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/2810

[FLINK-3848] Add ProjectableTableSource interface and translation rule.

Extend CsvTableSource to implement projection

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-3848

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2810


commit 95f55a3783ff849357b58d06f3b8c99b20edbe5e
Author: tonycox 
Date:   2016-11-07T16:18:31Z

[FLINK-3848] Add ProjectableTableSource interface and translation rule. 
Extend CsvTableSource to implement projection




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3703) Add sequence matching semantics to discard matched events

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666744#comment-15666744
 ] 

ASF GitHub Bot commented on FLINK-3703:
---

Github user LordFB commented on the issue:

https://github.com/apache/flink/pull/2367
  
Hi @mushketyk and @tillrohrmann,

are there any updates on this Pull Request? Or is there already a way to 
change the matching behaviour in FlinkCEP?


> Add sequence matching semantics to discard matched events
> -
>
> Key: FLINK-3703
> URL: https://issues.apache.org/jira/browse/FLINK-3703
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> There is no easy way to decide whether events can be part of multiple 
> matching sequences or not. Currently, the default is that an event can 
> participate in multiple matching sequences. E.g. if you have the pattern 
> {{Pattern.begin("a").followedBy("b")}} and the input event stream 
> {{Event("A"), Event("B"), Event("C")}}, then you will generate the following 
> matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}} 
> and {{Event("B"), Event("C")}}. 
> It would be useful to allow the user to define where the matching algorithm 
> should continue after a matching sequence has been found. Possible option 
> values could be 
>  * {{from first}} - continue keeping all events for future matches (that is 
> the current behaviour) 
>  * {{after first}} -  continue after the first element (remove first matching 
> event and continue with the second event)
>  * {{after last}} - continue after the last element (effectively discarding 
> all elements of the matching sequence)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2367: [FLINK-3703][cep] Add sequence matching semantics to disc...

2016-11-15 Thread LordFB
Github user LordFB commented on the issue:

https://github.com/apache/flink/pull/2367
  
Hi @mushketyk and @tillrohrmann,

are there any updates on this Pull Request? Or is there already a way to 
change the matching behaviour in FlinkCEP?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2653: [FLINK-4469] [table] Add support for user defined table f...

2016-11-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2653
  
@wuchong thanks for the PR. I will also review it the next days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4498) Better Cassandra sink documentation

2016-11-15 Thread Jakub Nowacki (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666756#comment-15666756
 ] 

Jakub Nowacki commented on FLINK-4498:
--

Adding to the above, Scala example in the documentation is in fact Java code 
and does not work. The Scala code to create a sink looks as follows:

{code:java}
CassandraSink.addSink(input.javaStream)
.setClusterBuilder(new ClusterBuilder() {
@Override
def buildCluster(builder: Cluster.Builder): Cluster = {
builder.addContactPoint("127.0.0.1").build()
}
})
.build()
{code}

> Better Cassandra sink documentation
> ---
>
> Key: FLINK-4498
> URL: https://issues.apache.org/jira/browse/FLINK-4498
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector, Documentation
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>
> The Cassandra sink documentation is somewhat muddled and could be improved.  
> For instance, the fact that is only supports tuples and POJO's that use 
> DataStax Mapper annotations is only mentioned in passing, and it is not clear 
> that the reference to tuples only applies to Flink Java tuples and not Scala 
> tuples.  
> The documentation also does not mention that setQuery() is only necessary for 
> tuple streams. 
> The explanation of the write ahead log could use some cleaning up to clarify 
> when it is appropriate to use, ideally with an example.  Maybe this would be 
> best as a blog post to expand on the type of non-deterministic streams this 
> applies to.
> It would also be useful to mention that tuple elements will be mapped to 
> Cassandra columns using the Datastax Java driver's default encoders, which 
> are somewhat limited (e.g. to write to a blob column the type in the tuple 
> must be a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666754#comment-15666754
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2653
  
@wuchong thanks for the PR. I will also review it the next days.


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666782#comment-15666782
 ] 

Timo Walther commented on FLINK-4832:
-

We should first fix FLINK-4263 before we implement this issue. I will assign it 
to me.

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4263) SQL's VALUES does not work properly

2016-11-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-4263:
---

Assignee: Timo Walther  (was: Jark Wu)

> SQL's VALUES does not work properly
> ---
>
> Key: FLINK-4263
> URL: https://issues.apache.org/jira/browse/FLINK-4263
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Executing the following SQL leads to very strange output:
> {code}
> SELECT  *
> FROM(
> VALUES
> (1, 2),
> (3, 4)
> ) AS q (col1, col2)"
> {code}
> {code}
> org.apache.flink.optimizer.CompilerException: Error translating node 'Data 
> Source "at translateToPlan(DataSetValues.scala:88) 
> (org.apache.flink.api.table.runtime.ValuesInputFormat)" : NONE [[ 
> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties 
> [ordering=null, grouped=null, unique=null] ]]': Could not write the user code 
> wrapper class 
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
> java.io.NotSerializableException: org.apache.flink.api.table.Row
>   at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381)
>   at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106)
>   at 
> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>   at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192)
>   at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
>   at 
> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
>   at 
> org.apache.flink.api.scala.batch.sql.SortITCase.testOrderByMultipleFieldsWithSql(SortITCase.scala:56)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: 
> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
> not write the user code wrapper class 
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
> java.io.NotSerializableException: org.apache.flink.api.table.Row
>   at 
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)
>   at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888)
>   at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281)
>   ... 51 more
> Caused by: java.io.NotSerializableException: org.apache.flink.api.table.Row
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectO

[GitHub] flink pull request #2811: [Flink-4541] Support for SQL NOT IN operator

2016-11-15 Thread AlexanderShoshin
GitHub user AlexanderShoshin opened a pull request:

https://github.com/apache/flink/pull/2811

[Flink-4541] Support for SQL NOT IN operator

NOT IN was not working with nested queries because of missed DataSet cross 
join rule.

I added DataSetSingleRowCrossRule that converts a cross join into a 
DataSetSingleRowCross only if one of inputs is a global aggregation (so it 
contains only one row). It is enough to make NOT IN work and it won't enable 
queries with classical cross joins.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AlexanderShoshin/flink FLINK-4541

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2811.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2811


commit ca24a3a55fb2c55280386eecaaaf4d640350f62a
Author: Alexander Shoshin 
Date:   2016-11-09T08:27:57Z

Add unit tests for a cross join

commit 193eedd378df87391a41345a0f9a9cb6d5d35232
Author: Alexander Shoshin 
Date:   2016-11-03T16:24:25Z

[FLINK-4541] Add support for DataSet cross join operation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2758: [FLINK-4260] Support specifying ESCAPE character i...

2016-11-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2758#discussion_r87988814
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
 ---
@@ -118,4 +119,39 @@ public void testJoin() throws Exception {
String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
compareResultAsText(results, expected);
}
+
+
+   @Test
+   public void testLikeWithEscapeFromDataSet() throws Exception {
--- End diff --

IMHO we don't need it for the Table API. There are very few use cases for 
it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4260) Allow SQL's LIKE ESCAPE

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666791#comment-15666791
 ] 

ASF GitHub Bot commented on FLINK-4260:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2758#discussion_r87988814
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
 ---
@@ -118,4 +119,39 @@ public void testJoin() throws Exception {
String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
compareResultAsText(results, expected);
}
+
+
+   @Test
+   public void testLikeWithEscapeFromDataSet() throws Exception {
--- End diff --

IMHO we don't need it for the Table API. There are very few use cases for 
it.


> Allow SQL's LIKE ESCAPE
> ---
>
> Key: FLINK-4260
> URL: https://issues.apache.org/jira/browse/FLINK-4260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Leo Deng
>Priority: Minor
>
> Currently, the SQL API does not support specifying an ESCAPE character in a 
> LIKE expression. The SIMILAR TO should also support that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314 I tested your code and it does seem that partitions are 
sorted
only internally, which is expected and `zipWithIndex` is AFAIK unaware of 
the sorted (as in key range) order of partitions, so it's not guaranteed that 
the "first" partition will get the `[0, partitionSize-1]` indices, the second 
`[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for 
global sorting?

If it's not possible I think we can take a step back and see what we are 
trying to achieve here.

The task is to count the frequencies of labels and assign integer ids to 
them in order of frequency. The labels should either be categorical variables 
(e.g. {Male, Female, Uknown}) or class labels. The case with the most unique 
values might be vocabulary words, which will range in the few million unique 
values at worst.

I would argue then than after we have performed the frequency count in a 
distributed manner there is no need to do the last step which is assigning 
ordered indices in a distributed manner as well, we can make the assumption 
that all the (label -> frequency) values should fit into the memory of one 
machine.

So I would recommend to gather all data into one partition after getting 
the counts, that way we guarantee a global ordering:

```{Scala}
fitData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => 0)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()
```

Of course we would need to clarify this restriction in the docstrings and 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666809#comment-15666809
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314 I tested your code and it does seem that partitions are 
sorted
only internally, which is expected and `zipWithIndex` is AFAIK unaware of 
the sorted (as in key range) order of partitions, so it's not guaranteed that 
the "first" partition will get the `[0, partitionSize-1]` indices, the second 
`[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for 
global sorting?

If it's not possible I think we can take a step back and see what we are 
trying to achieve here.

The task is to count the frequencies of labels and assign integer ids to 
them in order of frequency. The labels should either be categorical variables 
(e.g. {Male, Female, Uknown}) or class labels. The case with the most unique 
values might be vocabulary words, which will range in the few million unique 
values at worst.

I would argue then than after we have performed the frequency count in a 
distributed manner there is no need to do the last step which is assigning 
ordered indices in a distributed manner as well, we can make the assumption 
that all the (label -> frequency) values should fit into the memory of one 
machine.

So I would recommend to gather all data into one partition after getting 
the counts, that way we guarantee a global ordering:

```{Scala}
fitData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => 0)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()
```

Of course we would need to clarify this restriction in the docstrings and 
documentation.


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2758: [FLINK-4260] Support specifying ESCAPE character i...

2016-11-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2758#discussion_r87993759
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
 ---
@@ -118,4 +119,39 @@ public void testJoin() throws Exception {
String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
compareResultAsText(results, expected);
}
+
+
+   @Test
+   public void testLikeWithEscapeFromDataSet() throws Exception {
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4260) Allow SQL's LIKE ESCAPE

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666848#comment-15666848
 ] 

ASF GitHub Bot commented on FLINK-4260:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2758#discussion_r87993759
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
 ---
@@ -118,4 +119,39 @@ public void testJoin() throws Exception {
String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
compareResultAsText(results, expected);
}
+
+
+   @Test
+   public void testLikeWithEscapeFromDataSet() throws Exception {
--- End diff --

OK


> Allow SQL's LIKE ESCAPE
> ---
>
> Key: FLINK-4260
> URL: https://issues.apache.org/jira/browse/FLINK-4260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Leo Deng
>Priority: Minor
>
> Currently, the SQL API does not support specifying an ESCAPE character in a 
> LIKE expression. The SIMILAR TO should also support that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3703) Add sequence matching semantics to discard matched events

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666860#comment-15666860
 ] 

ASF GitHub Bot commented on FLINK-3703:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2367
  
Hi @LordFB 

I don't think there is currently a way to do it in cep-flink. I am still 
waiting for Till's review for this PR, but he seems to be really busy with 
other work.

Maybe together we will be able to convince him to spend some time on 
reviewing this and similar CEP PRs.


> Add sequence matching semantics to discard matched events
> -
>
> Key: FLINK-3703
> URL: https://issues.apache.org/jira/browse/FLINK-3703
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> There is no easy way to decide whether events can be part of multiple 
> matching sequences or not. Currently, the default is that an event can 
> participate in multiple matching sequences. E.g. if you have the pattern 
> {{Pattern.begin("a").followedBy("b")}} and the input event stream 
> {{Event("A"), Event("B"), Event("C")}}, then you will generate the following 
> matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}} 
> and {{Event("B"), Event("C")}}. 
> It would be useful to allow the user to define where the matching algorithm 
> should continue after a matching sequence has been found. Possible option 
> values could be 
>  * {{from first}} - continue keeping all events for future matches (that is 
> the current behaviour) 
>  * {{after first}} -  continue after the first element (remove first matching 
> event and continue with the second event)
>  * {{after last}} - continue after the last element (effectively discarding 
> all elements of the matching sequence)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2367: [FLINK-3703][cep] Add sequence matching semantics to disc...

2016-11-15 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2367
  
Hi @LordFB 

I don't think there is currently a way to do it in cep-flink. I am still 
waiting for Till's review for this PR, but he seems to be really busy with 
other work.

Maybe together we will be able to convince him to spend some time on 
reviewing this and similar CEP PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666872#comment-15666872
 ] 

Timo Walther commented on FLINK-4604:
-

I think the missing type for {{NULL}} is a Calcite issue. Usually, a {{NULL}} 
in a {{CASE(=($f2, 0), null, EXPR$1)}} should have the same type as {{EXPR$1}}. 
I will have a look at it today.

> Add support for standard deviation/variance
> ---
>
> Key: FLINK-4604
> URL: https://issues.apache.org/jira/browse/FLINK-4604
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
> Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
> and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5070) Unable to use Scala's BeanProperty with classes

2016-11-15 Thread Jakub Nowacki (JIRA)
Jakub Nowacki created FLINK-5070:


 Summary: Unable to use Scala's BeanProperty with classes
 Key: FLINK-5070
 URL: https://issues.apache.org/jira/browse/FLINK-5070
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.3
Reporter: Jakub Nowacki


When using Scala class with with property (both var and val) annotated as 
BeanProperty, throws an exception {{java.lang.IllegalStateException: Detected 
more than one getter}}.

The simple code which follows throws that exception:

{code:java}
class SomeClass(@BeanProperty var prop: Int)

object SampleBeanProperty {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements
env.fromElements(1,2)
.map(new SomeClass(_))
.print

env.execute()
}
}
{code}

Full exception:
{code}
Exception in thread "main" java.lang.IllegalStateException: Detected more than 
one setter
at 
org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1646)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1692)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1580)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1479)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:737)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:543)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:497)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:493)
at SampleBeanProperty$.main(SampleBeanProperty.scala:18)
at SampleBeanProperty.main(SampleBeanProperty.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
{code}

If the class is changed into case class, code with BeanProperty works fine.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666951#comment-15666951
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r88001538
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

I see, I'm afraid I didn't communicate that very well.

For `WindowOperator` the user facing `WindowFunction` is already abstracted 
from the actual `WindowOperator` by the `InternalWindowFunction`. I just 
realised that these "AlignedProcessingTimeWindowOperators" don't have that 
abstraction  and directly take a `WindowFunction`, there we would have to 
introduce some level of nesting, either by wrapping a `WindowFunction` in a 
`ProcessWindowFunction` or by also using `InternalWindowFunction` there. Thing 
is, though, that these operators will be removed because their state cannot 
easily made repartitionable (see the work that we did on key-groups and 
repartitionable state).


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r88001538
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

I see, I'm afraid I didn't communicate that very well.

For `WindowOperator` the user facing `WindowFunction` is already abstracted 
from the actual `WindowOperator` by the `InternalWindowFunction`. I just 
realised that these "AlignedProcessingTimeWindowOperators" don't have that 
abstraction  and directly take a `WindowFunction`, there we would have to 
introduce some level of nesting, either by wrapping a `WindowFunction` in a 
`ProcessWindowFunction` or by also using `InternalWindowFunction` there. Thing 
is, though, that these operators will be removed because their state cannot 
easily made repartitionable (see the work that we did on key-groups and 
repartitionable state).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-2186) Rework CSV import to support very wide files

2016-11-15 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reassigned FLINK-2186:


Assignee: Anton Solovev

> Rework CSV import to support very wide files
> 
>
> Key: FLINK-2186
> URL: https://issues.apache.org/jira/browse/FLINK-2186
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library, Scala API
>Reporter: Theodore Vasiloudis
>Assignee: Anton Solovev
>
> In the current readVcsFile implementation, importing CSV files with many 
> columns can become from cumbersome to impossible.
> For example to import an 11 column file we need to write:
> {code}
> val cancer = env.readCsvFile[(String, String, String, String, String, String, 
> String, String, String, String, 
> String)]("/path/to/breast-cancer-wisconsin.data")
> {code}
> For many use cases in Machine Learning we might have CSV files with thousands 
> or millions of columns that we want to import as vectors.
> In that case using the current readCsvFile method becomes impossible.
> We therefore need to rework the current function, or create a new one that 
> will allow us to import CSV files with an arbitrary number of columns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2186) Rework CSV import to support very wide files

2016-11-15 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-2186:
-
Assignee: (was: Anton Solovev)

> Rework CSV import to support very wide files
> 
>
> Key: FLINK-2186
> URL: https://issues.apache.org/jira/browse/FLINK-2186
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library, Scala API
>Reporter: Theodore Vasiloudis
>
> In the current readVcsFile implementation, importing CSV files with many 
> columns can become from cumbersome to impossible.
> For example to import an 11 column file we need to write:
> {code}
> val cancer = env.readCsvFile[(String, String, String, String, String, String, 
> String, String, String, String, 
> String)]("/path/to/breast-cancer-wisconsin.data")
> {code}
> For many use cases in Machine Learning we might have CSV files with thousands 
> or millions of columns that we want to import as vectors.
> In that case using the current readCsvFile method becomes impossible.
> We therefore need to rework the current function, or create a new one that 
> will allow us to import CSV files with an arbitrary number of columns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3849) Add FilterableTableSource interface and translation rule

2016-11-15 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reassigned FLINK-3849:


Assignee: Anton Solovev

> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2650: [FLINK-4563] [metrics] scope caching not adjusted ...

2016-11-15 Thread ex00
Github user ex00 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2650#discussion_r88007685
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 ---
@@ -44,4 +52,132 @@ protected QueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {

registry.shutdown();
}
+
+   // for test case: one filter for different reporters with different of 
scope delimiter
+   protected static CharacterFilter staticCharacterFilter = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("C", "RR");
+   }
+   };
+
+   @Test
+   public void filteringForMultipleReporters() {
+   TestReporter1.countSuccessChecks = 0;
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
+
+
+   MetricRegistry testRegistry = new 
MetricRegistryTest(MetricRegistryConfiguration.fromConfiguration(config));
+   TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
+   tmGroup.counter(1);
+   testRegistry.shutdown();
+   assert TestReporter1.countSuccessChecks == 4;
+   }
+
+   @Test
+   public void filteringForNullReporters() {
+   MetricRegistryTest.countSuccessChecks = 0;
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+   MetricRegistry testRegistry = new 
MetricRegistryTest(MetricRegistryConfiguration.fromConfiguration(config));
+   TaskManagerMetricGroup tmGroupForTestRegistry = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
+   assert testRegistry.getReporters().size() == 0;
+   tmGroupForTestRegistry.counter(1);
+   testRegistry.shutdown();
+   assert MetricRegistryTest.countSuccessChecks == 1;
+   }
+
+   public static class TestReporter1 extends TestReporter {
+   protected static int countSuccessChecks = 0;
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("A", "RR");
+   }
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+   assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName));
+   // ignore all next filters for scope -  because 
scopeString cached with only first filter
+   assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName, staticCharacterFilter));
+   assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName, this));
+   assertEquals("A-B-C-D-4", 
group.getMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("B", 
"RR").replace("1", "4");
+   }
+   }));
+   countSuccessChecks++;
+   }
+   }
+   public static class TestReporter2 extends TestReporter1 {
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("B", "RR");
+   }
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+   assertEquals("A!RR!C!D!1", 
group.getMetricIdentifier(metricName, this));
+   // ignore all next filters -  because scopeString 
cached with only first filter
+   assertEquals("A!RR!C!D!1", 
group.getMetricIdentifier(metricName));
+   assertEquals("A!RR!C!D!1", 
group.getMetricIdentifier(metricName, staticCharacterFilter));
+  

[jira] [Commented] (FLINK-4563) [metrics] scope caching not adjusted for multiple reporters

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667036#comment-15667036
 ] 

ASF GitHub Bot commented on FLINK-4563:
---

Github user ex00 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2650#discussion_r88007685
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 ---
@@ -44,4 +52,132 @@ protected QueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {

registry.shutdown();
}
+
+   // for test case: one filter for different reporters with different of 
scope delimiter
+   protected static CharacterFilter staticCharacterFilter = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("C", "RR");
+   }
+   };
+
+   @Test
+   public void filteringForMultipleReporters() {
+   TestReporter1.countSuccessChecks = 0;
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
+
+
+   MetricRegistry testRegistry = new 
MetricRegistryTest(MetricRegistryConfiguration.fromConfiguration(config));
+   TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
+   tmGroup.counter(1);
+   testRegistry.shutdown();
+   assert TestReporter1.countSuccessChecks == 4;
+   }
+
+   @Test
+   public void filteringForNullReporters() {
+   MetricRegistryTest.countSuccessChecks = 0;
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+   MetricRegistry testRegistry = new 
MetricRegistryTest(MetricRegistryConfiguration.fromConfiguration(config));
+   TaskManagerMetricGroup tmGroupForTestRegistry = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
+   assert testRegistry.getReporters().size() == 0;
+   tmGroupForTestRegistry.counter(1);
+   testRegistry.shutdown();
+   assert MetricRegistryTest.countSuccessChecks == 1;
+   }
+
+   public static class TestReporter1 extends TestReporter {
+   protected static int countSuccessChecks = 0;
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("A", "RR");
+   }
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+   assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName));
+   // ignore all next filters for scope -  because 
scopeString cached with only first filter
+   assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName, staticCharacterFilter));
+   assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName, this));
+   assertEquals("A-B-C-D-4", 
group.getMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("B", 
"RR").replace("1", "4");
+   }
+   }));
+   countSuccessChecks++;
+   }
+   }
+   public static class TestReporter2 extends TestReporter1 {
+   @Override
+   public String filterCharacters(String input) {
+   return input.replace("B", "RR");
+   }
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+   assertEquals("A!RR!C!D!1", 
group.getMetricIdentifier(metricName, this));
+   // ignore all next filters -  because s

[jira] [Commented] (FLINK-4969) TypeInfoParser needs to be extended to parse the java.sql.* types into the corresponding TypeInfos.

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667040#comment-15667040
 ] 

Timo Walther commented on FLINK-4969:
-

The definition of type information using Strings is deprecated. You should use 
a type hint:
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer

> TypeInfoParser needs to be extended to parse the java.sql.* types into the 
> corresponding TypeInfos.
> ---
>
> Key: FLINK-4969
> URL: https://issues.apache.org/jira/browse/FLINK-4969
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 1.1.3
>Reporter: radu
>Priority: Minor
>  Labels: features
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Using the "returns" option from the API in order to enforce the type for the 
> data stream will lead create incorrect types for java.sql.* types. The 
> TypeInfoParser, used to parse the types, needs to be extended to parse the 
> java.sql.* types into the corresponding TypeInfos.
>  
> Example
> = ds.map(new mapFunction(){
> ...   
> }}) .returns(“Tuple#”);
>  
>  
> The problem is that if you rely on the type extraction mechanism called 
> within the "returns()" to recognize TIMESTAMP of type SqlTimeTypeInfo it will 
> not happen but instead a GenericType will be created. It is the 
> same for the other sql types (e.g. TIME)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2186) Rework CSV import to support very wide files

2016-11-15 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reassigned FLINK-2186:


Assignee: Anton Solovev

> Rework CSV import to support very wide files
> 
>
> Key: FLINK-2186
> URL: https://issues.apache.org/jira/browse/FLINK-2186
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library, Scala API
>Reporter: Theodore Vasiloudis
>Assignee: Anton Solovev
>
> In the current readVcsFile implementation, importing CSV files with many 
> columns can become from cumbersome to impossible.
> For example to import an 11 column file we need to write:
> {code}
> val cancer = env.readCsvFile[(String, String, String, String, String, String, 
> String, String, String, String, 
> String)]("/path/to/breast-cancer-wisconsin.data")
> {code}
> For many use cases in Machine Learning we might have CSV files with thousands 
> or millions of columns that we want to import as vectors.
> In that case using the current readCsvFile method becomes impossible.
> We therefore need to rework the current function, or create a new one that 
> will allow us to import CSV files with an arbitrary number of columns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-15 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667131#comment-15667131
 ] 

Anton Mushin commented on FLINK-4832:
-

Ok.
Could you check 
[commit|https://github.com/ex00/flink/commit/c93071585ebb21453b22c9c9d102964af06bf45a],
 is correct idea for implementation this issue?

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3133) Introduce collect()/coun()/print() methods in DataStream API

2016-11-15 Thread Alexander Shoshin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Shoshin reassigned FLINK-3133:


Assignee: Alexander Shoshin  (was: Evgeny Kincharov)

> Introduce collect()/coun()/print() methods in DataStream API
> 
>
> Key: FLINK-3133
> URL: https://issues.apache.org/jira/browse/FLINK-3133
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming
>Affects Versions: 0.10.0, 1.0.0, 0.10.1
>Reporter: Maximilian Michels
>Assignee: Alexander Shoshin
> Fix For: 1.0.0
>
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should 
> be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts 
> of a stream, e.g. by supplying a time period in the arguments to the methods. 
> Collect/count/print should be lazily evaluated. Users should use the 
> {{StreamEnvironment}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = 
> StramEnvironment.getStreamExecutionEnvironment();
> DataStream printSink = env.addSource(..).print();
> ResultQueryable queryObject = env.executeWithResultQueryable();
> List sampled = queryObject.retrieve(printSink, Time.seconds(5));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2650: [FLINK-4563] [metrics] scope caching not adjusted for mul...

2016-11-15 Thread ex00
Github user ex00 commented on the issue:

https://github.com/apache/flink/pull/2650
  
@zentol, thanks for your review and comments!
I updated tests.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4563) [metrics] scope caching not adjusted for multiple reporters

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667153#comment-15667153
 ] 

ASF GitHub Bot commented on FLINK-4563:
---

Github user ex00 commented on the issue:

https://github.com/apache/flink/pull/2650
  
@zentol, thanks for your review and comments!
I updated tests.



> [metrics] scope caching not adjusted for multiple reporters
> ---
>
> Key: FLINK-4563
> URL: https://issues.apache.org/jira/browse/FLINK-4563
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Every metric group contains a scope string, representing what entities 
> (job/task/etc.) a given metric belongs to, which is calculated on demand. 
> Before this string is cached a CharacterFilter is applied to it, which is 
> provided by the callee, usually a reporter. This was done since different 
> reporters have different requirements in regards to valid characters. The 
> filtered string is cached so that we don't have to refilter the string every 
> time.
> This all works fine with a single reporter; with multiple however it is 
> completely broken as only the first filter is ever applied.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2625: [FLINK-4801] [types] Input type inference is faulty with ...

2016-11-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2625
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4801) Input type inference is faulty with custom Tuples and RichFunctions

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667158#comment-15667158
 ] 

ASF GitHub Bot commented on FLINK-4801:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2625
  
Merging this...


> Input type inference is faulty with custom Tuples and RichFunctions
> ---
>
> Key: FLINK-4801
> URL: https://issues.apache.org/jira/browse/FLINK-4801
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> This issue has been discussed on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Type-problem-in-RichFlatMapFunction-when-using-GenericArray-type-td13929.html
> This returns the wrong type:
> {code}
>   public static class Foo extends Tuple2 {
> public Foo() {
> }
> public Foo(K[] value0, K value1) {
>   super(value0, value1);
> }
>   }
> DataSource> fooDataSource = env.fromElements(foo);
> DataSet> ds = fooDataSource.join(fooDataSource)
>   .where(field).equalTo(field)
>   .with(new RichFlatJoinFunction, Foo, Foo>() {
> @Override
> public void join(Foo first, Foo second,
>   Collector> out) throws Exception {
>   out.collect(first);
> }
>   });
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2625: [FLINK-4801] [types] Input type inference is fault...

2016-11-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2625


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4801) Input type inference is faulty with custom Tuples and RichFunctions

2016-11-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4801.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 1.2.0 with 6f09ecded9e22a5eaa548ebbddb9b28dad4207c2.

> Input type inference is faulty with custom Tuples and RichFunctions
> ---
>
> Key: FLINK-4801
> URL: https://issues.apache.org/jira/browse/FLINK-4801
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> This issue has been discussed on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Type-problem-in-RichFlatMapFunction-when-using-GenericArray-type-td13929.html
> This returns the wrong type:
> {code}
>   public static class Foo extends Tuple2 {
> public Foo() {
> }
> public Foo(K[] value0, K value1) {
>   super(value0, value1);
> }
>   }
> DataSource> fooDataSource = env.fromElements(foo);
> DataSet> ds = fooDataSource.join(fooDataSource)
>   .where(field).equalTo(field)
>   .with(new RichFlatJoinFunction, Foo, Foo>() {
> @Override
> public void join(Foo first, Foo second,
>   Collector> out) throws Exception {
>   out.collect(first);
> }
>   });
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4801) Input type inference is faulty with custom Tuples and RichFunctions

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667210#comment-15667210
 ] 

ASF GitHub Bot commented on FLINK-4801:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2625


> Input type inference is faulty with custom Tuples and RichFunctions
> ---
>
> Key: FLINK-4801
> URL: https://issues.apache.org/jira/browse/FLINK-4801
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> This issue has been discussed on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Type-problem-in-RichFlatMapFunction-when-using-GenericArray-type-td13929.html
> This returns the wrong type:
> {code}
>   public static class Foo extends Tuple2 {
> public Foo() {
> }
> public Foo(K[] value0, K value1) {
>   super(value0, value1);
> }
>   }
> DataSource> fooDataSource = env.fromElements(foo);
> DataSet> ds = fooDataSource.join(fooDataSource)
>   .where(field).equalTo(field)
>   .with(new RichFlatJoinFunction, Foo, Foo>() {
> @Override
> public void join(Foo first, Foo second,
>   Collector> out) throws Exception {
>   out.collect(first);
> }
>   });
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4872) Type erasure problem exclusively on cluster execution

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667216#comment-15667216
 ] 

Timo Walther commented on FLINK-4872:
-

I will look into this issue.

> Type erasure problem exclusively on cluster execution
> -
>
> Key: FLINK-4872
> URL: https://issues.apache.org/jira/browse/FLINK-4872
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Martin Junghanns
>
> The following codes runs fine on local and collection execution environment 
> but fails when executed on a cluster.
> {code:title=Problem.java}
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import java.lang.reflect.Array;
> public class Problem {
>   public static class Pojo {
>   }
>   public static class Foo extends Tuple1 {
>   }
>   public static class Bar extends Tuple1 {
>   }
>   public static class UDF implements MapFunction, Bar> {
> private final Class clazz;
> public UDF(Class clazz) {
>   this.clazz = clazz;
> }
> @Override
> public Bar map(Foo value) throws Exception {
>   Bar bar = new Bar<>();
>   //noinspection unchecked
>   bar.f0 = (T[]) Array.newInstance(clazz, 10);
>   return bar;
> }
>   }
>   public static void main(String[] args) throws Exception {
> // runs in local, collection and cluster execution
> withLong();
> // runs in local and collection execution, fails on cluster execution
> withPojo();
>   }
>   public static void withLong() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo foo = new Foo<>();
> foo.f0 = 42L;
> DataSet> barDataSource = env.fromElements(foo);
> DataSet> map = barDataSource.map(new UDF<>(Long.class));
> map.print();
>   }
>   public static void withPojo() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo foo = new Foo<>();
> foo.f0 = new Pojo();
> DataSet> barDataSource = env.fromElements(foo);
> DataSet> map = barDataSource.map(new UDF<>(Pojo.class));
> map.print();
>   }
> }
> {code}
> {code:title=ProblemTest.java}
> import org.apache.flink.test.util.MultipleProgramsTestBase;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> @RunWith(Parameterized.class)
> public class ProblemTest extends MultipleProgramsTestBase {
>   public ProblemTest(TestExecutionMode mode) {
> super(mode);
>   }
>   @Test
>   public void testWithLong() throws Exception {
> Problem.withLong();
>   }
>   @Test
>   public void testWithPOJO() throws Exception {
> Problem.withPojo();
>   }
> }
> {code}
> Exception:
> {code}
> The return type of function 'withPojo(Problem.java:58)' could not be 
> determined automatically, due to type erasure. You can give type information 
> hints by using the returns(...) method on the result of the transformation 
> call, or by letting your function implement the 'ResultTypeQueryable' 
> interface.
> org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
> org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
> org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> Problem.withPojo(Problem.java:60)
> Problem.main(Problem.java:38) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667222#comment-15667222
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user VenturaDelMonte commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r88021488
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

Aljoscha you read my mind, I was just introducing InternalWindowFunction in 
these other WindowOperators in order to have a flatter implementation flatter. 
Regarding their removal, I checked that new work and I agree with you.


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-15 Thread VenturaDelMonte
Github user VenturaDelMonte commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r88021488
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

Aljoscha you read my mind, I was just introducing InternalWindowFunction in 
these other WindowOperators in order to have a flatter implementation flatter. 
Regarding their removal, I checked that new work and I agree with you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667225#comment-15667225
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r88021648
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

😃 


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2756#discussion_r88021648
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 ---
@@ -800,4 +907,39 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
public TypeInformation getInputType() {
return input.getType();
}
+
+   private static  ProcessWindowFunction wrapWindowFunction(final WindowFunction cleanedFunction) {
--- End diff --

😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4872) Type erasure problem exclusively on cluster execution

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667250#comment-15667250
 ] 

Timo Walther commented on FLINK-4872:
-

[~mju] can you also give an example for the class {{UDF}}? Does it implement 
{{ResultTypeQueryable}}? If not, this exception might be correct. Either the 
function must give information about the parameter of {{Bar}}, you supply it 
via {{.map(...).returns(Pojo.class)}}, or the functions output is defined by 
the functions input.

> Type erasure problem exclusively on cluster execution
> -
>
> Key: FLINK-4872
> URL: https://issues.apache.org/jira/browse/FLINK-4872
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Martin Junghanns
>
> The following codes runs fine on local and collection execution environment 
> but fails when executed on a cluster.
> {code:title=Problem.java}
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import java.lang.reflect.Array;
> public class Problem {
>   public static class Pojo {
>   }
>   public static class Foo extends Tuple1 {
>   }
>   public static class Bar extends Tuple1 {
>   }
>   public static class UDF implements MapFunction, Bar> {
> private final Class clazz;
> public UDF(Class clazz) {
>   this.clazz = clazz;
> }
> @Override
> public Bar map(Foo value) throws Exception {
>   Bar bar = new Bar<>();
>   //noinspection unchecked
>   bar.f0 = (T[]) Array.newInstance(clazz, 10);
>   return bar;
> }
>   }
>   public static void main(String[] args) throws Exception {
> // runs in local, collection and cluster execution
> withLong();
> // runs in local and collection execution, fails on cluster execution
> withPojo();
>   }
>   public static void withLong() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo foo = new Foo<>();
> foo.f0 = 42L;
> DataSet> barDataSource = env.fromElements(foo);
> DataSet> map = barDataSource.map(new UDF<>(Long.class));
> map.print();
>   }
>   public static void withPojo() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo foo = new Foo<>();
> foo.f0 = new Pojo();
> DataSet> barDataSource = env.fromElements(foo);
> DataSet> map = barDataSource.map(new UDF<>(Pojo.class));
> map.print();
>   }
> }
> {code}
> {code:title=ProblemTest.java}
> import org.apache.flink.test.util.MultipleProgramsTestBase;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> @RunWith(Parameterized.class)
> public class ProblemTest extends MultipleProgramsTestBase {
>   public ProblemTest(TestExecutionMode mode) {
> super(mode);
>   }
>   @Test
>   public void testWithLong() throws Exception {
> Problem.withLong();
>   }
>   @Test
>   public void testWithPOJO() throws Exception {
> Problem.withPojo();
>   }
> }
> {code}
> Exception:
> {code}
> The return type of function 'withPojo(Problem.java:58)' could not be 
> determined automatically, due to type erasure. You can give type information 
> hints by using the returns(...) method on the result of the transformation 
> call, or by letting your function implement the 'ResultTypeQueryable' 
> interface.
> org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
> org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
> org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> Problem.withPojo(Problem.java:60)
> Problem.main(Problem.java:38) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-15 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-5071:
-

 Summary: YARN: yarn.containers.vcores config not respected when 
checking for vcores
 Key: FLINK-5071
 URL: https://issues.apache.org/jira/browse/FLINK-5071
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Reporter: Gyula Fora
 Fix For: 1.1.3


The YarnClient validates whether the number of task slots is less then the max 
vcores settings of yarn but seems to ignore the yarn.containers.vcores flink 
config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-11-15 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667283#comment-15667283
 ] 

Gyula Fora commented on FLINK-4182:
---

Ah sorry those jobs were 1.1.3 (I am not sure if the error was fixed since then)

> HA recovery not working properly under ApplicationMaster failures.
> --
>
> Key: FLINK-4182
> URL: https://issues.apache.org/jira/browse/FLINK-4182
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Priority: Blocker
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>   at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch

[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread soniclavier
Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/2736


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
Thank you for your guidance @aljoscha 😄. Could you please tell me what 
Fix version I should keep for these JIRAs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667290#comment-15667290
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/2736


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667289#comment-15667289
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
Thank you for your guidance @aljoscha 😄. Could you please tell me what Fix 
version I should keep for these JIRAs


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-11-15 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667277#comment-15667277
 ] 

Gyula Fora commented on FLINK-4182:
---

Hi, 

I have observed the same scenario in many of my jobs under AM failiures, I 
saved some logs 

I am running 1.1.4

> HA recovery not working properly under ApplicationMaster failures.
> --
>
> Key: FLINK-4182
> URL: https://issues.apache.org/jira/browse/FLINK-4182
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Priority: Blocker
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>   at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(M

[jira] [Closed] (FLINK-4369) EvictingWindowOperator Must Actually Evict Elements

2016-11-15 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath closed FLINK-4369.
---
Resolution: Fixed

> EvictingWindowOperator Must Actually Evict Elements
> ---
>
> Key: FLINK-4369
> URL: https://issues.apache.org/jira/browse/FLINK-4369
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: vishnu viswanath
>Priority: Blocker
>
> {{EvictingWindowOperator}} does not actually remove evicted elements from the 
> state. They are only filtered from the Iterable that is given to the 
> WindowFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath closed FLINK-4174.
---
Resolution: Fixed

> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4369) EvictingWindowOperator Must Actually Evict Elements

2016-11-15 Thread vishnu viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667309#comment-15667309
 ] 

vishnu viswanath commented on FLINK-4369:
-

Fixed by [FLINK-4174]

> EvictingWindowOperator Must Actually Evict Elements
> ---
>
> Key: FLINK-4369
> URL: https://issues.apache.org/jira/browse/FLINK-4369
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: vishnu viswanath
>Priority: Blocker
>
> {{EvictingWindowOperator}} does not actually remove evicted elements from the 
> state. They are only filtered from the Iterable that is given to the 
> WindowFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5063) State handles are not properly cleaned up for declined or expired checkpoints

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667321#comment-15667321
 ] 

ASF GitHub Bot commented on FLINK-5063:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2812

[FLINK-5063] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint 
message which belongs
to the job maintained by the checkpoint coordinator, it should either 
record the state handles
for later processing or discard to free the resources. The latter case can 
happen if a
checkpoint has been expired and late acknowledge checkpoint messages 
arrive. Furthermore, it
can happen if a Task sent a decline checkpoint message while other Tasks 
where still drawing
a checkpoint. This PR changes the behaviour such that state handles 
belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the 
PendingCheckpoint.

Review @uce, @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixStateHandleCleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2812


commit c4c000d1b39de5617b6796eed524ce2a449100d3
Author: Till Rohrmann 
Date:   2016-11-14T17:33:55Z

[FLINK-5063] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint 
message which belongs
to the job maintained by the checkpoint coordinator, it should either 
record the state handles
for later processing or discard to free the resources. The latter case can 
happen if a
checkpoint has been expired and late acknowledge checkpoint messages 
arrive. Furthremore, it
can happen if a Task sent a decline checkpoint message while other Tasks 
where still drawing
a checkpoint. This PR changes the behaviour such that state handles 
belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the 
PendingCheckpoint.




> State handles are not properly cleaned up for declined or expired checkpoints
> -
>
> Key: FLINK-5063
> URL: https://issues.apache.org/jira/browse/FLINK-5063
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.2.0, 1.1.4
>
>
> In case that a {{Checkpoint}} is declined or expires, the 
> {{CheckpointCoordinator}} will dispose the {{PendingCheckpoint}}. Disposing 
> the {{PendingCheckpoint}} entails that all so far registered 
> {{SubtaskStates}} of the acknowledged {{Tasks}} are discarded. However, all 
> late arriving acknowledge messages are simply ignored without properly 
> discarding the transmitted state handles. This can lead to a cluttering of 
> checkpoint directory since the checkpoint files of late or unknown 
> acknowledge checkpoint messages are never deleted.
> I propose to properly discard the state handles at the 
> {{CheckpointCoordinator}} if receiving a late acknowledge message or an 
> acknowledge message for an unknown {{ExecutionAttemptID}} belonging to the 
> job of the {{CheckpointCoordinator}}. However, checkpoint messages belonging 
> to a different job won't be handled and simply ignored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2812: [FLINK-5063] Discard state handles of declined or ...

2016-11-15 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2812

[FLINK-5063] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint 
message which belongs
to the job maintained by the checkpoint coordinator, it should either 
record the state handles
for later processing or discard to free the resources. The latter case can 
happen if a
checkpoint has been expired and late acknowledge checkpoint messages 
arrive. Furthermore, it
can happen if a Task sent a decline checkpoint message while other Tasks 
where still drawing
a checkpoint. This PR changes the behaviour such that state handles 
belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the 
PendingCheckpoint.

Review @uce, @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixStateHandleCleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2812


commit c4c000d1b39de5617b6796eed524ce2a449100d3
Author: Till Rohrmann 
Date:   2016-11-14T17:33:55Z

[FLINK-5063] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint 
message which belongs
to the job maintained by the checkpoint coordinator, it should either 
record the state handles
for later processing or discard to free the resources. The latter case can 
happen if a
checkpoint has been expired and late acknowledge checkpoint messages 
arrive. Furthremore, it
can happen if a Task sent a decline checkpoint message while other Tasks 
where still drawing
a checkpoint. This PR changes the behaviour such that state handles 
belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the 
PendingCheckpoint.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2802: Minor fixes

2016-11-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2802
  
Line 110 is closed by `` on line 85.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88019886
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -341,23 +355,49 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
 
@Override
-   public void open(Configuration parameters) throws Exception {
-   super.open(parameters);
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-   subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+   initFileSystem();
 
-   state = new State();
--- End diff --

if you would undo this formatting change and the one in L355 the diff will 
be a lot clearer. (aka, less prone to conflicts)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88019498
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -118,48 +146,290 @@ public static void destroyHDFS() {
}
 
@Test
-   public void testCheckpointWithoutNotify() throws Exception {
-   File dataDir = tempFolder.newFolder();
+   public void testInactivityPeriodWithLateNotify() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 2, 0 ,0, 0);
+
+   testHarness.setProcessingTime(101L);// put some in pending
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.snapshot(0, 0); // put 
them in pending for 0
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   testHarness.processElement(new StreamRecord<>("test4", 1L));
 
-   OneInputStreamOperatorTestHarness testHarness = 
createTestSink(dataDir);
+   testHarness.setProcessingTime(202L);// put some in pending
 
+   testHarness.snapshot(1, 0); // put 
them in pending for 1
+   checkFs(outDir, 0, 4, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0); // put the 
pending for 0 to the "committed" state
+   checkFs(outDir, 0, 2, 2, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(1); // put the pending 
for 1 to the "committed" state
+   checkFs(outDir, 0, 0, 4, 0);
+   }
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
testHarness.open();
 
-   testHarness.processElement(new StreamRecord<>("Hello"));
-   testHarness.processElement(new StreamRecord<>("Hello"));
-   testHarness.processElement(new StreamRecord<>("Hello"));
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 2, 0 ,0, 0);
+
+   // this is to check the inactivity threshold
+   testHarness.setProcessingTime(101L);
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
 
-   testHarness.setProcessingTime(1L);
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
 
-   // snapshot but don't call notify to simulate a notify that 
never
-   // arrives, the sink should move pending files in restore() in 
that case
-   StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
 
-   testHarness = createTestSink(dataDir);
+   testHarness = createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
-   testHarness.restore(snapshot1);
+   testHarness.initializeState(snapshot);
testHarness.open();
 
-   testHarness.processElement(new StreamRecord<>("Hello"));
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
 
testHarness.close();
 
 

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88018720
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 ---
@@ -130,7 +130,10 @@ public void postSubmit() throws Exception {
// the NUM_STRINGS. If numRead is bigger than the size of the 
set we have seen some
// elements twice.
Set readNumbers = Sets.newHashSet();
+
int numRead = 0;
--- End diff --

numRead is no longer used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88016998
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -264,27 +282,23 @@
// 
---§-
 
/**
-* Our subtask index, retrieved from the {@code RuntimeContext} in 
{@link #open}.
-*/
-   private transient int subtaskIndex;
-
-   /**
-* We use reflection to get the .truncate() method, this is only 
available starting with
-* Hadoop 2.7
+* We use reflection to get the .truncate() method, this is only 
available starting with Hadoop 2.7
 */
private transient Method refTruncate;
 
/**
-* The state object that is handled by flink from snapshot/restore. In 
there we store state for
-* every open bucket: the current part file path, the valid length of 
the in-progress files and
-* pending part files.
+* The state object that is handled by Flink from snapshot/restore. 
This contains state for
+* every open bucket: the current {@code in-progress} part file path, 
its valid length and
+* the {@code pending} part files.
--- End diff --

why the annotation around pending?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88021378
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -234,7 +251,9 @@
 * 
 * This should only be set to false if using the sink without 
checkpoints, to not remove
 * the files already in the directory.
+* 
 */
+   @Deprecated
private boolean cleanupOnOpen = true;
--- End diff --

this field is unused and can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88025970
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
--- End diff --

Why did you change this method? we can check for null before calling 
truncate instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88016560
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -150,53 +162,59 @@
 
/**
 * The default maximum size of part files.
-*
-* 6 times the default block size
+* 
+* By default, {@code 6 X} the default block size.
--- End diff --

what is block size? This term is not used anywhere else in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88022818
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -481,27 +494,27 @@ public void onProcessingTime(long timestamp) throws 
Exception {
}
 
/**
-* Checks for inactive buckets, and closes them. This enables 
in-progress files to be moved to
-* the pending state and finalised on the next checkpoint.
+* Checks for inactive buckets, and closes them. Inactive are buckets 
that have not been written to
--- End diff --

Rewording: "Buckets are considered inactive if they have not been ..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88019203
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 ---
@@ -161,6 +164,15 @@ public void postSubmit() throws Exception {
Matcher matcher = 
messageRegex.matcher(line);
if (matcher.matches()) {
numRead++;
+   uniqMessagesRead.add(line);
+
+   // check that in the committed 
files there are no duplicates
+   if 
(!file.getPath().toString().endsWith(".in-progress") && 
!file.getPath().toString().endsWith(".pending")) {
--- End diff --

".in-progress" and ".pending" should use the constants defined in the 
BucketingSink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88028071
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
 
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
+
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   m.invoke(fs, testPath, 2);
-   } catch (IllegalAccessException | 
InvocationTargetException e) {
-   LOG.debug("Truncate is not supported.", e);
-   m = null;
-

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667331#comment-15667331
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88021378
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -234,7 +251,9 @@
 * 
 * This should only be set to false if using the sink without 
checkpoints, to not remove
 * the files already in the directory.
+* 
 */
+   @Deprecated
private boolean cleanupOnOpen = true;
--- End diff --

this field is unused and can be removed


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667327#comment-15667327
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88016998
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -264,27 +282,23 @@
// 
---§-
 
/**
-* Our subtask index, retrieved from the {@code RuntimeContext} in 
{@link #open}.
-*/
-   private transient int subtaskIndex;
-
-   /**
-* We use reflection to get the .truncate() method, this is only 
available starting with
-* Hadoop 2.7
+* We use reflection to get the .truncate() method, this is only 
available starting with Hadoop 2.7
 */
private transient Method refTruncate;
 
/**
-* The state object that is handled by flink from snapshot/restore. In 
there we store state for
-* every open bucket: the current part file path, the valid length of 
the in-progress files and
-* pending part files.
+* The state object that is handled by Flink from snapshot/restore. 
This contains state for
+* every open bucket: the current {@code in-progress} part file path, 
its valid length and
+* the {@code pending} part files.
--- End diff --

why the annotation around pending?


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667328#comment-15667328
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88019203
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 ---
@@ -161,6 +164,15 @@ public void postSubmit() throws Exception {
Matcher matcher = 
messageRegex.matcher(line);
if (matcher.matches()) {
numRead++;
+   uniqMessagesRead.add(line);
+
+   // check that in the committed 
files there are no duplicates
+   if 
(!file.getPath().toString().endsWith(".in-progress") && 
!file.getPath().toString().endsWith(".pending")) {
--- End diff --

".in-progress" and ".pending" should use the constants defined in the 
BucketingSink.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667334#comment-15667334
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88019886
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -341,23 +355,49 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
 
@Override
-   public void open(Configuration parameters) throws Exception {
-   super.open(parameters);
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-   subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+   initFileSystem();
 
-   state = new State();
--- End diff --

if you would undo this formatting change and the one in L355 the diff will 
be a lot clearer. (aka, less prone to conflicts)


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >