[jira] [Created] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge

2016-10-20 Thread Manu Zhang (JIRA)
Manu Zhang created FLINK-4863:
-

 Summary: states of merging window and trigger are set to different 
TimeWindows on merge
 Key: FLINK-4863
 URL: https://issues.apache.org/jira/browse/FLINK-4863
 Project: Flink
  Issue Type: Bug
  Components: Streaming, Windowing Operators
Reporter: Manu Zhang


While window state is set to the mergeResult's stateWindow (one of original 
windows), trigger state is set to the mergeResult itself. This will fail 
{{Timer}} of  {{ContinuousEventTimeTrigger}} since its window cannot be found 
in the window state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2666: [FLINK-4863] fix trigger context window on merge

2016-10-20 Thread manuzhang
GitHub user manuzhang opened a pull request:

https://github.com/apache/flink/pull/2666

[FLINK-4863] fix trigger context window on merge

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/flink fix_merge_window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2666.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2666


commit ce1aaba4e0208505e88ce1507533eaeab1f62321
Author: manuzhang 
Date:   2016-10-20T07:06:01Z

[FLINK-4863] fix trigger context window on merge




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591051#comment-15591051
 ] 

ASF GitHub Bot commented on FLINK-4863:
---

GitHub user manuzhang opened a pull request:

https://github.com/apache/flink/pull/2666

[FLINK-4863] fix trigger context window on merge

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/flink fix_merge_window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2666.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2666


commit ce1aaba4e0208505e88ce1507533eaeab1f62321
Author: manuzhang 
Date:   2016-10-20T07:06:01Z

[FLINK-4863] fix trigger context window on merge




> states of merging window and trigger are set to different TimeWindows on merge
> --
>
> Key: FLINK-4863
> URL: https://issues.apache.org/jira/browse/FLINK-4863
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Windowing Operators
>Reporter: Manu Zhang
>
> While window state is set to the mergeResult's stateWindow (one of original 
> windows), trigger state is set to the mergeResult itself. This will fail 
> {{Timer}} of  {{ContinuousEventTimeTrigger}} since its window cannot be found 
> in the window state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
GitHub user sedgewickmm18 opened a pull request:

https://github.com/apache/flink/pull/2667

README.md  -Description of the bluemix specif…

Hi,

as per discussion with @mxm I'm proposing to include the following change 
that makes use of container linking (exposing a container specific hostname in 
/etc/hosts) so that taskmanagers can find the jobmanager) instead of shared 
volumes and handles fully qualified image names (including URL of the private 
registry + namespace).

Best Regards,
Markus


List of changed files:

build.sh   -modify permissions to make script 
executable by default
docker-compose-bluemix.yml -allow to specify image names with path 
to private BM registry
docker-compose.sh  -find out path to docker registry, 
extend build time to 120 secs, then call docker-compose
docker-compose.yml -make use of container linking instead 
of shared volumes
docker-entrypoint.sh   -use 'jobmanager' entry in /etc/hosts 
arising from container linking so that taskmanager can register itself with 
jobmanager

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sedgewickmm18/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2667.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2667


commit 735ed871b528664afaac2ee46e8727b873b1ee97
Author: Markus Müller 
Date:   2016-10-20T08:08:34Z

README.md  -Description of the bluemix specific 
steps
build.sh   -modify permissions to make script 
executable by default
docker-compose-bluemix.yml -allow to specify image names with path 
to private BM registry
docker-compose.sh  -find out path to docker registry, 
extend build time to 120 secs, then call docker-compose
docker-compose.yml -make use of container linking instead 
of shared volumes
docker-entrypoint.sh   -use 'jobmanager' entry in /etc/hosts 
arising from container linking so that taskmanager can register itself with 
jobmanager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591201#comment-15591201
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84234419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * {@code
+ * DataStream input = ...;
+ *
+ * DataStream result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }
+ *
+ * @param  Type of the input elements.
+ * @param  Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction extends Function, 
Serializable {
+
+   /**
+* The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+* it into zero, one, or more elements.
+*
+* @param value The input value.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void flatMap(I value, TimerService timerService, Collector out) 
throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param timeDomain The {@link TimeDomain} of the firing timer.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector out) throws Exception ;
--- End diff --

Why not having 2 methods `onEventTime` and `onProcessingTime`? This way we 
avoid the `TimeDomain` argument, and we also tell the user to think about what 
he is doing in each case. 

In addition, this way is similar to the APIs we expose for the Triggers and 
I think it is good to have uniform APIs. Another solution would be to change 
the Trigger APIs to this but this would break user code.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an i

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84234419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * {@code
+ * DataStream input = ...;
+ *
+ * DataStream result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }
+ *
+ * @param  Type of the input elements.
+ * @param  Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction extends Function, 
Serializable {
+
+   /**
+* The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+* it into zero, one, or more elements.
+*
+* @param value The input value.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void flatMap(I value, TimerService timerService, Collector out) 
throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param timeDomain The {@link TimeDomain} of the firing timer.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector out) throws Exception ;
--- End diff --

Why not having 2 methods `onEventTime` and `onProcessingTime`? This way we 
avoid the `TimeDomain` argument, and we also tell the user to think about what 
he is doing in each case. 

In addition, this way is similar to the APIs we expose for the Triggers and 
I think it is good to have uniform APIs. Another solution would be to change 
the Trigger APIs to this but this would break user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84237459
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
--- End diff --

I am wondering if it would be better to rename this to `EventTimeTimers`. 
This plays well with `processingTimeTimers` and also it indicates what we are 
talking about. We have event time whose "clock-tiks" are the Watermarks and 
processing time whose clock tiks are the wall clock ones. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591261#comment-15591261
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84236659
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(pro

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591262#comment-15591262
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84235862
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
--- End diff --

I agree with @StefanRRichter 's comment below and I just have to add that 
for `processWatermark1` and `processWatermark2` much of the code is r

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84236659
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
+
+   processingTimeTimers = restoredTimers.processingTimeTimers;
+

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591263#comment-15591263
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84237459
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
--- End diff --

I am wondering if it would be better to rename this to `EventTimeTimers`. 
This plays well with `processingTimeTimers` and also it indicates what we are 
talking about. We have event time whose "clock-tiks" are the Watermarks and 
processing time whose clock tiks are the wall clock ones. 


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591264#comment-15591264
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84236439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(pro

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84237685
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

We could also rename it to SystemProcessingTimeService or sth more 
indicative of where clock tics come from.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84235862
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
--- End diff --

I agree with @StefanRRichter 's comment below and I just have to add that 
for `processWatermark1` and `processWatermark2` much of the code is repeated so 
the common part can become a private method that is called by both these 
methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enab

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591260#comment-15591260
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84237685
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

We could also rename it to SystemProcessingTimeService or sth more 
indicative of where clock tics come from.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4824) CliFrontend shows misleading error message when main() method returns before env.execute()

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591288#comment-15591288
 ] 

ASF GitHub Bot commented on FLINK-4824:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2662
  
Thanks for the PR @greghogan! Having a custom exception for missing 
arguments to a user program is a good approach. However, it requires the author 
of the program to use the custom exception. At least, we would have to adapt 
all the included examples. Additionally, it would be nice to throw another 
custom exception when no Flink job was generated during execution of the jar 
(which might be because of missing arguments). Currently, we simply throw a 
`ProgramInvocationException` which could look like a serious error to the user 
when merely arguments are missing. 

So +1 but we might do some follow-ups to fully solve the issue.


> CliFrontend shows misleading error message when main() method returns before 
> env.execute()
> --
>
> Key: FLINK-4824
> URL: https://issues.apache.org/jira/browse/FLINK-4824
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Greg Hogan
>
> While testing Flink by running the 
> {{./examples/streaming/SocketWindowWordCount.jar}} example, I got the 
> following error message:
> {code}
> ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar 
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> No port specified. Please run 'SocketWindowWordCount --port ', where 
> port is the address of the text server
> To start a simple text server, run 'netcat -l ' and type the input text 
> into the command line
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> didn't contain Flink jobs. Perhaps you forgot to call execute() on the 
> execution environment.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:324)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:985)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1032)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1029)
>   at 
> org.apache.flink.runtime.security.SecurityContext$1.run(SecurityContext.java:82)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at 
> org.apache.flink.runtime.security.SecurityContext.runSecured(SecurityContext.java:79)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1029)
> {code}
> I think the error message is misleading, because I tried executing a valid 
> Flink job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4784) MetricQueryService actor name collision

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591287#comment-15591287
 ] 

ASF GitHub Bot commented on FLINK-4784:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2636
  
I tested the change this morning, and its not working:

```
2016-10-20 11:08:28,216 WARN  
org.apache.flink.runtime.metrics.MetricRegistry   - Could not start 
Metr
icDumpActor. No metrics will be submitted to the WebInterface.
akka.actor.InvalidActorNameException: illegal actor name 
[MetricQueryService_ResourceID{resourceId='e29f9e7d096c56
2054a1caa29be21a36'}], must conform to 
(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})
*
at akka.actor.dungeon.Children$class.checkName(Children.scala:182)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
```


> MetricQueryService actor name collision
> ---
>
> Key: FLINK-4784
> URL: https://issues.apache.org/jira/browse/FLINK-4784
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> The name of the MetricQueryService actor is currently fixed, which can lead 
> to collisions if multiple TaskManagers run in the same actor system, as is 
> the case for the MiniCluster.
> We can append the TaskManager ResourceID to make the name unique for every 
> TaskManager. The MetricFetcher would still be able to reliable infer the 
> correct actor name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2662
  
Thanks for the PR @greghogan! Having a custom exception for missing 
arguments to a user program is a good approach. However, it requires the author 
of the program to use the custom exception. At least, we would have to adapt 
all the included examples. Additionally, it would be nice to throw another 
custom exception when no Flink job was generated during execution of the jar 
(which might be because of missing arguments). Currently, we simply throw a 
`ProgramInvocationException` which could look like a serious error to the user 
when merely arguments are missing. 

So +1 but we might do some follow-ups to fully solve the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84241679
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -20,11 +20,19 @@
 
 if [ "$1" = "jobmanager" ]; then
 echo "Starting Job Manager"
-sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
`hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml
+#sed -i -e "s/jobmanager.rpc.address: 
localhost/jobmanager.rpc.address: `hostname -f`/g" 
$FLINK_HOME/conf/flink-conf.yaml
+
+# make use of container linking and exploit the jobmanager entry in 
/etc/hosts
+sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml
+
 sed -i -e "s/taskmanager.numberOfTaskSlots: 
1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" 
$FLINK_HOME/conf/flink-conf.yaml
--- End diff --

This line has to go to the `taskmanager` section. Before, it didn't really 
matter because the config was shared but now this setting will just be 
configured for the `JobManager` when, in fact, it is only used by the 
`TaskManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4784) MetricQueryService actor name collision

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591353#comment-15591353
 ] 

ASF GitHub Bot commented on FLINK-4784:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2636
  
should be fixed now


> MetricQueryService actor name collision
> ---
>
> Key: FLINK-4784
> URL: https://issues.apache.org/jira/browse/FLINK-4784
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> The name of the MetricQueryService actor is currently fixed, which can lead 
> to collisions if multiple TaskManagers run in the same actor system, as is 
> the case for the MiniCluster.
> We can append the TaskManager ResourceID to make the name unique for every 
> TaskManager. The MetricFetcher would still be able to reliable infer the 
> correct actor name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2636: [FLINK-4784] Unique MetricQueryService actor names

2016-10-20 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2636
  
should be fixed now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591419#comment-15591419
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251732
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
--- End diff --

I'm renaming


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark water

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591422#comment-15591422
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251918
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591424#comment-15591424
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251996
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591462#comment-15591462
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2570
  
Thanks for your comments @StefanRRichter and @kl0u. I incorporated most of 
them by now.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

2016-10-20 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2570
  
Thanks for your comments @StefanRRichter and @kl0u. I incorporated most of 
them by now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591460#comment-15591460
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84255151
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

Done


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251996
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
+
+   processingTimeTimers = restoredTimers.processingTimeTimers;
 

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591567#comment-15591567
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
+1 go ahead


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
+1 go ahead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...

2016-10-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2642
  
@nssalian Thanks for understanding.

We as a community together have actually gone into that mode (more 
conservative and stability focused) not too long ago. So it is still a bit of a 
learning process for everyone here (committers and contributors).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591577#comment-15591577
 ] 

ASF GitHub Bot commented on FLINK-3999:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2642
  
@nssalian Thanks for understanding.

We as a community together have actually gone into that mode (more 
conservative and stability focused) not too long ago. So it is still a bit of a 
learning process for everyone here (committers and contributors).


> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591587#comment-15591587
 ] 

ASF GitHub Bot commented on FLINK-3030:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2448
  
Good to go from my side.
@rmetzger and @iampeter - since you are working on the web UI currently, 
can you try and merge this?


> Enhance Dashboard to show Execution Attempts
> 
>
> Key: FLINK-3030
> URL: https://issues.apache.org/jira/browse/FLINK-3030
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
> Fix For: 1.0.0
>
>
> Currently, the web dashboard shows only the latest execution attempt. We 
> should make all execution attempts and their accumulators available for 
> inspection.
> The REST monitoring API supports this, so it should be a change only to the 
> frontend part.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84236439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
+
+   processingTimeTimers = restoredTimers.processingTimeTimers;
+

[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...

2016-10-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2448
  
Good to go from my side.
@rmetzger and @iampeter - since you are working on the web UI currently, 
can you try and merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2636: [FLINK-4784] Unique MetricQueryService actor names

2016-10-20 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2636
  
I tested the change this morning, and its not working:

```
2016-10-20 11:08:28,216 WARN  
org.apache.flink.runtime.metrics.MetricRegistry   - Could not start 
Metr
icDumpActor. No metrics will be submitted to the WebInterface.
akka.actor.InvalidActorNameException: illegal actor name 
[MetricQueryService_ResourceID{resourceId='e29f9e7d096c56
2054a1caa29be21a36'}], must conform to 
(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})
*
at akka.actor.dungeon.Children$class.checkName(Children.scala:182)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-20 Thread Theodore Vasiloudis (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591622#comment-15591622
 ] 

Theodore Vasiloudis commented on FLINK-4850:


Just to note, the PR I opened only fixes the documentation part, I recommend 
having a different issue for the adding an {EvaluateDataSet} operation for 
{LabeledVector}

> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>Assignee: Theodore Vasiloudis
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> 1- We need to update the documentation as follows:
>  val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)
> 2- Update code such that LabeledVector can be used with evaluate method



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-20 Thread Theodore Vasiloudis (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591622#comment-15591622
 ] 

Theodore Vasiloudis edited comment on FLINK-4850 at 10/20/16 11:57 AM:
---

Just to note, the PR I opened only fixes the documentation part, I recommend 
having a different issue for the adding an {{EvaluateDataSet}} operation for 
{{LabeledVector}}


was (Author: tvas):
Just to note, the PR I opened only fixes the documentation part, I recommend 
having a different issue for the adding an {EvaluateDataSet} operation for 
{LabeledVector}

> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>Assignee: Theodore Vasiloudis
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> 1- We need to update the documentation as follows:
>  val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)
> 2- Update code such that LabeledVector can be used with evaluate method



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251732
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
--- End diff --

I'm renaming


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251918
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
+
+   processingTimeTimers = restoredTimers.processingTimeTimers;
 

[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4850:
---
Description: 
It seems that evaluate operation is defined for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML when using SVM.

We need to update the documentation as follows:

val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)




  was:
It seems that evaluate operation is defined for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML when using SVM.

1- We need to update the documentation as follows:

 val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)


2- Update code such that LabeledVector can be used with evaluate method









> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>Assignee: Theodore Vasiloudis
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> We need to update the documentation as follows:
> val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591644#comment-15591644
 ] 

Thomas FOURNIER commented on FLINK-4850:


Ok I'm creating a specific JIRA issue related to adding EvaluateDataSet 
operation for LabeledVector.

> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>Assignee: Theodore Vasiloudis
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> We need to update the documentation as follows:
> val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4864) Shade Calcite dependency in flink-table

2016-10-20 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4864:


 Summary: Shade Calcite dependency in flink-table
 Key: FLINK-4864
 URL: https://issues.apache.org/jira/browse/FLINK-4864
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.2.0
Reporter: Fabian Hueske


The Table API has a dependency on Apache Calcite.
A user reported to have version conflicts when having a own Calcite dependency 
in the classpath.

The solution would be to shade away the Calcite dependency (Calcite's 
transitive dependencies are already shaded).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242416
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
 depends_on:
   - jobmanager
 command: taskmanager
-volumes_from:
-  - jobmanager:ro
+links:
--- End diff --

`links` are now a legacy feature of Docker 1.9.0 but probably fine to stick 
with it for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242735
  
--- Diff: flink-contrib/docker-flink/docker-compose.sh ---
@@ -0,0 +1,4 @@
+#!/bin/sh
--- End diff --

Could we name this file `bluemix-docker-compose.sh`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)
Thomas FOURNIER created FLINK-4865:
--

 Summary: FlinkML - Add EvaluateDataSet operation for LabeledVector
 Key: FLINK-4865
 URL: https://issues.apache.org/jira/browse/FLINK-4865
 Project: Flink
  Issue Type: New Feature
Reporter: Thomas FOURNIER
Priority: Minor


We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)]

We want also to call this method on DataSet[LabeledVector]

 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4865:
---
Description: 
We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: If our model is an SVM 

svm.evaluate(test) with test has type DataSet[(Double,Vector)]

We would like to call it on DataSet[LabeledVector] also.

 


  was:
We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)]

We want also to call this method on DataSet[LabeledVector]

 



> FlinkML - Add EvaluateDataSet operation for LabeledVector
> -
>
> Key: FLINK-4865
> URL: https://issues.apache.org/jira/browse/FLINK-4865
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> We can only call "evaluate" method on a DataSet[(Double,Vector)].
> Eg: If our model is an SVM 
> svm.evaluate(test) with test has type DataSet[(Double,Vector)]
> We would like to call it on DataSet[LabeledVector] also.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4865:
---
Description: 
We can only call "evaluate" method on a DataSet[(Vector,Double)].

Eg: If our model is an SVM 

svm.evaluate(test) with test has type DataSet[(Vector,Double)]

We would like to call it on DataSet[LabeledVector] also.

 


  was:
We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: If our model is an SVM 

svm.evaluate(test) with test has type DataSet[(Double,Vector)]

We would like to call it on DataSet[LabeledVector] also.

 



> FlinkML - Add EvaluateDataSet operation for LabeledVector
> -
>
> Key: FLINK-4865
> URL: https://issues.apache.org/jira/browse/FLINK-4865
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> We can only call "evaluate" method on a DataSet[(Vector,Double)].
> Eg: If our model is an SVM 
> svm.evaluate(test) with test has type DataSet[(Vector,Double)]
> We would like to call it on DataSet[LabeledVector] also.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation

2016-10-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4866:
---

 Summary: Make Trigger.clear() Abstract to Enforce Implementation
 Key: FLINK-4866
 URL: https://issues.apache.org/jira/browse/FLINK-4866
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek


If the method is not abstract implementors of custom triggers will not realise 
that it could be necessary and they will likely not clean up their state/timers 
properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591734#comment-15591734
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2628#discussion_r84248344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
 ---
@@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) {
 * then switch to {@link HeapSort}.
 */
public void sort(final IndexedSortable s, int p, int r) {
-   sortInternal(s, p, r, getMaxDepth(r - p));
+   int recordsPerSegment = s.recordsPerSegment();
+   int recordSize = s.recordSize();
+
+   int maxOffset = recordSize * (recordsPerSegment - 1);
+
+   int size = s.size();
+   int sizeN = size / recordsPerSegment;
+   int sizeO = (size % recordsPerSegment) * recordSize;
+
+   sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 
0, size, sizeN, sizeO, getMaxDepth(r - p));
}
 
public void sort(IndexedSortable s) {
sort(s, 0, s.size());
}
 
-   private static void sortInternal(final IndexedSortable s, int p, int r, 
int depth) {
+   private static void sortInternal(final IndexedSortable s, int 
recordsPerSegment, int recordSize, int maxOffset,
+   int p, int pN, int pO, int r, int rN, int rO, int 
depth) {
--- End diff --

Could you please add a comment that explains all these parameters? (I 
understand them only because I know the original code and also what you are 
trying to achieve, but for someone who sees the code for the first time this 
will be quite scary.)


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2657
  
This doesn't compile currently. Do you prefer if I review the PRs 
individually or review the commits in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591741#comment-15591741
 ] 

ASF GitHub Bot commented on FLINK-4853:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2657
  
This doesn't compile currently. Do you prefer if I review the PRs 
individually or review the commits in this PR?


> Clean up JobManager registration at the ResourceManager
> ---
>
> Key: FLINK-4853
> URL: https://issues.apache.org/jira/browse/FLINK-4853
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The current {{JobManager}} registration at the {{ResourceManager}} blocks 
> threads in the {{RpcService.execute}} pool. This is not ideal and can be 
> avoided by not waiting on a {{Future}} in this call.
> I propose to encapsulate the leader id retrieval operation in a distinct 
> service so that it can be separated from the {{ResourceManager}}. This will 
> reduce the complexity of the {{ResourceManager}} and make the individual 
> components easier to test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2618
  
Thanks for updating the description. Let take a look at the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-20 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2618
  
Thanks a lot @mxm !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2628: [FLINK-3722] [runtime] Don't / and % when sorting

2016-10-20 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2628#discussion_r84248344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
 ---
@@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) {
 * then switch to {@link HeapSort}.
 */
public void sort(final IndexedSortable s, int p, int r) {
-   sortInternal(s, p, r, getMaxDepth(r - p));
+   int recordsPerSegment = s.recordsPerSegment();
+   int recordSize = s.recordSize();
+
+   int maxOffset = recordSize * (recordsPerSegment - 1);
+
+   int size = s.size();
+   int sizeN = size / recordsPerSegment;
+   int sizeO = (size % recordsPerSegment) * recordSize;
+
+   sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 
0, size, sizeN, sizeO, getMaxDepth(r - p));
}
 
public void sort(IndexedSortable s) {
sort(s, 0, s.size());
}
 
-   private static void sortInternal(final IndexedSortable s, int p, int r, 
int depth) {
+   private static void sortInternal(final IndexedSortable s, int 
recordsPerSegment, int recordSize, int maxOffset,
+   int p, int pN, int pO, int r, int rN, int rO, int 
depth) {
--- End diff --

Could you please add a comment that explains all these parameters? (I 
understand them only because I know the original code and also what you are 
trying to achieve, but for someone who sees the code for the first time this 
will be quite scary.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84255151
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3902) Discarded FileSystem checkpoints are lingering around

2016-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591808#comment-15591808
 ] 

Jan Zajíc commented on FLINK-3902:
--

WE have same issue on Linux in docker, but *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 

> Discarded FileSystem checkpoints are lingering around
> -
>
> Key: FLINK-3902
> URL: https://issues.apache.org/jira/browse/FLINK-3902
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Ufuk Celebi
>
> A user reported that checkpoints with {{FSStateBackend}} are not properly 
> cleaned up.
> {code}
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 
> 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
> 10.10.113.9:49233 Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
> empty': Directory is not empty
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
> {code}
> {code}
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 64 @ 1462875682636
> {code}
> Running the same program with the {{RocksDBBackend}} works as expected and 
> clears the old checkpoints properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around

2016-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591808#comment-15591808
 ] 

Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM:


WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 


was (Author: jan-zajic):
WE have same issue on Linux in docker, but *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 

> Discarded FileSystem checkpoints are lingering around
> -
>
> Key: FLINK-3902
> URL: https://issues.apache.org/jira/browse/FLINK-3902
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Ufuk Celebi
>
> A user reported that checkpoints with {{FSStateBackend}} are not properly 
> cleaned up.
> {code}
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 
> 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
> 10.10.113.9:49233 Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
> empty': Directory is not empty
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
> {code}
> {code}
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 64 @ 1462875682636
> {code}
> Running the same program with the {{RocksDBBackend}} works as expected and 
> clears the old checkpoints properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around

2016-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591808#comment-15591808
 ] 

Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM:


We have same issue on Linux in docker, but with *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 


was (Author: jan-zajic):
WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 

> Discarded FileSystem checkpoints are lingering around
> -
>
> Key: FLINK-3902
> URL: https://issues.apache.org/jira/browse/FLINK-3902
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Ufuk Celebi
>
> A user reported that checkpoints with {{FSStateBackend}} are not properly 
> cleaned up.
> {code}
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 
> 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
> 10.10.113.9:49233 Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
> empty': Directory is not empty
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
> {code}
> {code}
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 64 @ 1462875682636
> {code}
> Running the same program with the {{RocksDBBackend}} works as expected and 
> clears the old checkpoints properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242539
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

What are these ports needed for? The TaskManager will always initiate the 
connection to the JobManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
Github user sedgewickmm18 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84286390
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -20,11 +20,19 @@
 
 if [ "$1" = "jobmanager" ]; then
 echo "Starting Job Manager"
-sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
`hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml
+#sed -i -e "s/jobmanager.rpc.address: 
localhost/jobmanager.rpc.address: `hostname -f`/g" 
$FLINK_HOME/conf/flink-conf.yaml
+
+# make use of container linking and exploit the jobmanager entry in 
/etc/hosts
+sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml
+
 sed -i -e "s/taskmanager.numberOfTaskSlots: 
1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" 
$FLINK_HOME/conf/flink-conf.yaml
--- End diff --

agree - makes much more sense that way - otherwise it's always one slot by 
default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
Github user sedgewickmm18 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84287069
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

These lines expose taskmanager's RPC and data ports to make them accessible 
in the private subnet, please see 
https://docs.docker.com/docker-cloud/apps/ports/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591881#comment-15591881
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2628
  
Thanks @ggevay for reviewing. I added a commit with additional comments.

I transcribed `Quicksort` so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

I think this is the best time to investigate alternative methods. I'm not 
seeing how one would sort on top of `InMemorySorter` without deserializing 
records.


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84285533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+   /**
+* Creates a {@link RichFileInputSplit} based on the file modification 
time and
+* the rest of the information of the {@link FileInputSplit}, as 
returned by the
+* underlying filesystem.
+*
+* @param modificationTime  the modification file of the file this 
split belongs to
+* @param split the rest of the information about the split
+*/
+   public RichFileInputSplit(long modificationTime, FileInputSplit split) {
--- End diff --

Not sure about this constructor. I think I'd prefer something spelling out 
the parameters. This also avoids to create a regular FileInputSplit every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288480
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
--- End diff --

I think you can drop the type parameter here since you don't gain any type 
safety from the parameter. It is never used in any argument which would make it 
meaningful. Instead just use `Serializable` for the state type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84280372
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -199,44 +196,39 @@ public void close() throws Exception {
private final Object checkpointLock;
private final SourceFunction.SourceContext readerContext;
 
-   private final Queue pendingSplits;
-
-   private FileInputSplit currentSplit = null;
+   private final Queue> pendingSplits;
 
-   private S restoredFormatState = null;
+   private RichFileInputSplit currentSplit;
 
-   private volatile boolean isSplitOpen = false;
+   private volatile boolean isSplitOpen;
 
private SplitReader(FileInputFormat format,
TypeSerializer serializer,
SourceFunction.SourceContext 
readerContext,
Object checkpointLock,
-   Tuple3, 
FileInputSplit, S> restoredState) {
+   List> 
restoredState) {
 
this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-   this.pendingSplits = new ArrayDeque<>();
this.isRunning = true;
 
-   // this is the case where a task recovers from a 
previous failed attempt
-   if (restoredState != null) {
-   List pending = restoredState.f0;
-   FileInputSplit current = restoredState.f1;
-   S formatState = restoredState.f2;
-
-   for (FileInputSplit split : pending) {
-   pendingSplits.add(split);
+   this.pendingSplits = new PriorityQueue<>(100, new 
Comparator>() {
--- End diff --

Why did you choose 100 as the initial size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84285924
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
--- End diff --

The name rich :) I'd be happy if we could find another name. Rich doesn't 
really mean anything. How about `TimestampedFileInputSplit`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288567
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
--- End diff --

```java
private Serializable splitState;
```
should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288776
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+   /**
+* Creates a {@link RichFileInputSplit} based on the file modification 
time and
+* the rest of the information of the {@link FileInputSplit}, as 
returned by the
+* underlying filesystem.
+*
+* @param modificationTime  the modification file of the file this 
split belongs to
+* @param split the rest of the information about the split
+*/
+   public RichFileInputSplit(long modificationTime, FileInputSplit split) {
+   this(modificationTime,
+   split.getSplitNumber(),
+   split.getPath(),
+   split.getStart(),
+   split.getLength(),
+   split.getHostnames());
+   }
+
+   /**
+* Constructor with the raw split information.
+*
+* @param modificationTime the modification file of the file this split 
belongs to
+* @param numthe number of this input split
+* @param file   the file name
+* @param start  the position of the first byte in the file to process
+* @param length the number of bytes in the file to process (-1 is flag 
for "read whole file")
+* @param hosts  the list of hosts containing the block, possibly 
null
+*/
+   private RichFileInputSplit(long modificationTime, int num, Path file, 
long start, long length, String[] hosts) {
+   super(num, file, start, length, hosts);
+
+   Preconditions.checkArgument(modificationTime >= 0 || 
modificationTime == Long.MIN_VALUE,
+   "Invalid File Split Modification Time: "+ 
modificationTime +".");
+
+   this.modificationTime = modificationTime;
+   }
+
+   /**
+* Sets the state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* 
+* This is applicable to {@link 
org.apache.flink.api.common.io.FileInputFormat FileInputFormats}
+* that implement the {@link 
org.apache.flink.api.common.io.CheckpointableInputFormat
+* CheckpointableInputFormat} interface.
+* */
+   public void setSplitState(S state) {
+   this.splitState = state;
+   }
+
+   /**
+* Sets the state of the split to {@

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84284953
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RichFileInputSplitTest.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.RichFileInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RichFileInputSplitTest {
+
+   @Test
+   public void testSplitEquality() {
+
+   RichFileInputSplit eos1 = RichFileInputSplit.EOS;
+   RichFileInputSplit eos2 = RichFileInputSplit.EOS;
+
+   Assert.assertEquals(eos1, eos2);
+
+   FileInputSplit firstSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, 
firstSplit);
+   Assert.assertNotEquals(eos1, richFirstSplit);
+   Assert.assertNotEquals(richFirstSplit, firstSplit);
+
+   FileInputSplit secondSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, 
secondSplit);
+   Assert.assertEquals(richFirstSplit, richSecondSplit);
+   Assert.assertNotEquals(richFirstSplit, firstSplit);
+
+   FileInputSplit modSecondSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richModSecondSplit = new 
RichFileInputSplit(11, modSecondSplit);
+   Assert.assertNotEquals(richSecondSplit, richModSecondSplit);
+
+   FileInputSplit thirdSplit = new FileInputSplit(2, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, 
thirdSplit);
+   Assert.assertEquals(richThirdSplit.getModificationTime(), 10);
+   Assert.assertNotEquals(richFirstSplit, richThirdSplit);
+
+   FileInputSplit thirdSplitCopy = new FileInputSplit(2, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richThirdSplitCopy = new 
RichFileInputSplit(10, thirdSplitCopy);
+   Assert.assertEquals(richThirdSplitCopy, richThirdSplit);
+   }
+
+   @Test
+   public void testSplitComparison() {
+   FileInputSplit firstSplit = new FileInputSplit(3, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, 
firstSplit);
+
+   FileInputSplit secondSplit = new FileInputSplit(2, new 
Path("test/test2"), 0, 100, null);
+   RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, 
secondSplit);
+
+   FileInputSplit thirdSplit = new FileInputSplit(1, new 
Path("test/test2"), 0, 100, null);
+   RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, 
thirdSplit);
+
+   FileInputSplit forthSplit = new FileInputSplit(0, new 
Path("test/test3"), 0, 100, null);
+   RichFileInputSplit richForthSplit = new RichFileInputSplit(11, 
forthSplit);
+
+   // lexicographically on the path order
+   Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 
0);
+   Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+   // same mod time, same file so smaller split number first
+   Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 
0);
+
+   // smaller modification time first
+   Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+   }
+
+   @Test
+   public void testIllegalArgument() {
+   try {
+   FileInputSplit firstSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   new Ric

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84281147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
--- End diff --

Is it really necessary to have this special split? Couldn't you just have a 
`reader.stop()` method which stops the reader after the current split has been 
processed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84279968
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -189,7 +186,7 @@ public void close() throws Exception {
output.close();
}
 
-   private class SplitReader extends Thread {
+   private final class SplitReader extends Thread {
--- End diff --

Making private classes final is not really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2628: [FLINK-3722] [runtime] Don't / and % when sorting

2016-10-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2628
  
Thanks @ggevay for reviewing. I added a commit with additional comments.

I transcribed `Quicksort` so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

I think this is the best time to investigate alternative methods. I'm not 
seeing how one would sort on top of `InMemorySorter` without deserializing 
records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
Github user sedgewickmm18 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84291047
  
--- Diff: flink-contrib/docker-flink/docker-compose.sh ---
@@ -0,0 +1,4 @@
+#!/bin/sh
--- End diff --

that's fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84290827
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

Sure, makes sense since those ports are not reachable by TaskManagers 
running in different containers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-20 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955
 ] 

Anton Mushin commented on FLINK-4832:
-

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT sum(_1) FROM MyTable"

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288975
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -347,34 +328,17 @@ public void run() {
}
}
 
-   private Tuple3, FileInputSplit, S> 
getReaderState() throws IOException {
-   List snapshot = new 
ArrayList<>(this.pendingSplits.size());
-   for (FileInputSplit split: this.pendingSplits) {
-   snapshot.add(split);
-   }
-
-   // remove the current split from the list if inside.
-   if (this.currentSplit != null && 
this.currentSplit.equals(pendingSplits.peek())) {
-   this.pendingSplits.remove();
-   }
-
-   if (this.currentSplit != null) {
-   if (this.format instanceof 
CheckpointableInputFormat) {
-   @SuppressWarnings("unchecked")
-   
CheckpointableInputFormat checkpointableFormat =
-   
(CheckpointableInputFormat) this.format;
-
-   S formatState = this.isSplitOpen ?
-   
checkpointableFormat.getCurrentState() :
-   restoredFormatState;
-   return new Tuple3<>(snapshot, 
currentSplit, formatState);
-   } else {
-   LOG.info("The format does not support 
checkpointing. The current input split will be re-read from start upon 
recovery.");
-   return new Tuple3<>(snapshot, 
currentSplit, null);
+   private List> getReaderState() throws 
IOException {
+   List> snapshot = new 
ArrayList<>(this.pendingSplits.size());
+   if (currentSplit != null ) {
+   if (this.format instanceof 
CheckpointableInputFormat && this.isSplitOpen) {
+   S formatState = 
((CheckpointableInputFormat, S>) 
this.format).getCurrentState();
--- End diff --

```java
Serializable formatState = ((CheckpointableInputFormat) 
this.format).getCurrentState();
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84276796
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -43,16 +41,18 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
+import static 
org.apache.flink.streaming.api.functions.source.RichFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The operator that reads the {@link FileInputSplit splits} received from 
the preceding
+ * The operator that reads the {@link RichFileInputSplit splits} received 
from the preceding
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link 
ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
  * 
--- End diff --

Generic types are not documented in the JavaDoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements

2016-10-20 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955
 ] 

Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM:
---

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
 @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

@Test
  def testCountNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0,0,0,0"
val results = result.toDataSet[Row].collect()

TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}


was (Author: anmu):
Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT sum(_1) FROM MyTable"

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project

[jira] [Commented] (FLINK-4824) CliFrontend shows misleading error message when main() method returns before env.execute()

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592027#comment-15592027
 ] 

ASF GitHub Bot commented on FLINK-4824:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2662
  
@mxm thanks for the review. I added a second commit which I think satisfies 
your request. When no job is executed then the message is printed to stderr 
without a stacktrace.


> CliFrontend shows misleading error message when main() method returns before 
> env.execute()
> --
>
> Key: FLINK-4824
> URL: https://issues.apache.org/jira/browse/FLINK-4824
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Greg Hogan
>
> While testing Flink by running the 
> {{./examples/streaming/SocketWindowWordCount.jar}} example, I got the 
> following error message:
> {code}
> ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar 
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> No port specified. Please run 'SocketWindowWordCount --port ', where 
> port is the address of the text server
> To start a simple text server, run 'netcat -l ' and type the input text 
> into the command line
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> didn't contain Flink jobs. Perhaps you forgot to call execute() on the 
> execution environment.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:324)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:985)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1032)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1029)
>   at 
> org.apache.flink.runtime.security.SecurityContext$1.run(SecurityContext.java:82)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at 
> org.apache.flink.runtime.security.SecurityContext.runSecured(SecurityContext.java:79)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1029)
> {code}
> I think the error message is misleading, because I tried executing a valid 
> Flink job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592052#comment-15592052
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2570
  
I updated this on top of the latest master with @StefanRRichter's state 
changes.

Please take another look, @StefanRRichter.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592051#comment-15592051
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2648


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-20 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2648


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...

2016-10-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2662
  
@mxm thanks for the review. I added a second commit which I think satisfies 
your request. When no job is executed then the message is printed to stderr 
without a stacktrace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles

2016-10-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4842.
-
   Resolution: Implemented
Fix Version/s: 1.2.0

> Introduce test to enforce order of operator / udf lifecycles 
> -
>
> Key: FLINK-4842
> URL: https://issues.apache.org/jira/browse/FLINK-4842
> Project: Flink
>  Issue Type: Test
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> We should introduce a test that enforces a certain order in which life cycle 
> methods of operators and udfs are called, so that they are not easily changed 
> by accident.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4844.
-
   Resolution: Implemented
Fix Version/s: 1.2.0

> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

2016-10-20 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2570
  
I updated this on top of the latest master with @StefanRRichter's state 
changes.

Please take another look, @StefanRRichter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4867:
--

 Summary: Investigate code generation for improving sort performance
 Key: FLINK-4867
 URL: https://issues.apache.org/jira/browse/FLINK-4867
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This issue is for investigating whether code generation could speed up sorting. 
We should make some performance measurements on hand-written code that is 
similar to what we could generate, to see whether investing more time into this 
is worth it. If we find that it is worth it, we can open a second Jira for the 
actual implementation of the code generation.

I think we could generate one class at places where we currently instantiate 
{{QuickSort}}. This generated class would include the functionality of 
{{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
{{MemorySegment.compare}}, and {{MemorySegment.swap}}.

Btw. I'm planning to give this as a student project at a TU Berlin course in 
the next few months.

Some concrete ideas about how could a generated sorter be faster than the 
current sorting code:
* {{MemorySegment.compare}} could be specialized for
** Length: for small records, the loop could be unrolled
** Endiannes (currently it is optimized for big endian; and in the little 
endian case (e.g. x86) it does a Long.reverseBytes for each long read)
* {{MemorySegment.swapBytes}}
** In case of small records, using three {{UNSAFE.copyMemory}} is probably not 
as efficient as a specialized swap, because
*** We could use total loop unrolling in generated code (because we know the 
exact record size)
*** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
*** We will only need 2/3 the memory bandwidth, because the temporary storage 
could be a register if we swap one byte (or one {{long}}) at a time
** several checks might be eliminated
* Better inlining behaviour could be achieved 
** Virtual function calls to the methods of {{InMemorySorter}} could be 
eliminated. (Note, that these are problematic to devirtualize by the JVM if 
there are different derived classes used in a single Flink job (see \[8,7\]).)
** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
excessive checks make it too large
** {{MemorySegment.compare}} is probably also not inlined currently, because 
those two while loops are too large

\[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
long, Object, long, long)
\[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
\[8\] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
\[9\] 
http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592186#comment-15592186
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2628
  
> I transcribed Quicksort so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

OK, I would say that it is OK like this, but let's see what the others will 
say.


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4868:
--

 Summary: Insertion sort could avoid the swaps
 Key: FLINK-4868
 URL: https://issues.apache.org/jira/browse/FLINK-4868
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4868:
---
Description: 
This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)

Note that the threshold for switching to the insertion sort could probably be 
increased after this.

  was:
This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)


> Insertion sort could avoid the swaps
> 
>
> Key: FLINK-4868
> URL: https://issues.apache.org/jira/browse/FLINK-4868
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This is about the fallback to insertion sort at the beginning of 
> {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
> we are at the bottom of the quick sort recursion tree.
> The inner loop does a series of swaps on adjacent elements for moving a block 
> of several elements one slot to the right and inserting the ith element at 
> the hole. However, it would be faster to first copy the ith element to a temp 
> location, and then move the block of elements to the right without swaps, and 
> then copy the original ith element to the hole.
> Moving the block of elements without swaps could be achieved by calling 
> {{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
> calls in {{MemorySegment.swap}} currently being done).
> (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
> memcpy, so I'm not sure if we can do the entire block of elements with maybe 
> even one {{UNSAFE.copyMemory}}.)
> Note that the threshold for switching to the insertion sort could probably be 
> increased after this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592239#comment-15592239
 ] 

Greg Hogan commented on FLINK-4867:
---

It will be very interesting to see the results of this project. Perhaps you 
should self-assign the ticket until it can be handed over?

Inline status is logged with the JVM arguments {{-XX:+UnlockDiagnosticVMOptions 
-XX:+PrintInlining}}.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592246#comment-15592246
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2570
  
Rebased version looks good to me.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592292#comment-15592292
 ] 

Stefan Richter edited comment on FLINK-4867 at 10/20/16 4:32 PM:
-

If sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.


was (Author: srichter):
If you sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592292#comment-15592292
 ] 

Stefan Richter commented on FLINK-4867:
---

If you sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4869) Store record pointer after record keys

2016-10-20 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4869:
-

 Summary: Store record pointer after record keys
 Key: FLINK-4869
 URL: https://issues.apache.org/jira/browse/FLINK-4869
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


{{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} 
separate from the memory segments used for the sort keys. By storing the 
pointer after the sort keys the addition of the offset is moved from 
{{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which are 
O\(n).

Will run a performance comparison before submitting a PR to how significant a 
performance improvement this would yield.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4860) Sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4860:
---
Component/s: Local Runtime

> Sort performance
> 
>
> Key: FLINK-4860
> URL: https://issues.apache.org/jira/browse/FLINK-4860
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Greg Hogan
>  Labels: performance
>
> A super-task for improvements to Flink's sort performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4860) Sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4860:
---
Labels: performance  (was: )

> Sort performance
> 
>
> Key: FLINK-4860
> URL: https://issues.apache.org/jira/browse/FLINK-4860
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Greg Hogan
>  Labels: performance
>
> A super-task for improvements to Flink's sort performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4869) Store record pointer after record keys

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4869:
---
Labels: performance  (was: )

> Store record pointer after record keys
> --
>
> Key: FLINK-4869
> URL: https://issues.apache.org/jira/browse/FLINK-4869
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} 
> separate from the memory segments used for the sort keys. By storing the 
> pointer after the sort keys the addition of the offset is moved from 
> {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which 
> are O\(n).
> Will run a performance comparison before submitting a PR to how significant a 
> performance improvement this would yield.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4870) ContinuousFileMonitoringFunction does not properly handle absolut Windows paths

2016-10-20 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4870:
---

 Summary: ContinuousFileMonitoringFunction does not properly handle 
absolut Windows paths
 Key: FLINK-4870
 URL: https://issues.apache.org/jira/browse/FLINK-4870
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.1.2
Reporter: Chesnay Schepler
Priority: Minor
 Fix For: 1.2.0


The ContinuousFileMonitoringFunction fails for absolute windows paths without  
a dedicated scheme (e.g "C:\\tmp\\test.csv"), since the String path is directly 
fed into the URI constructor (which doesn't handle it properly) instead of 
first creating a flink Path and converting that into an URI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >