[jira] [Created] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge
Manu Zhang created FLINK-4863: - Summary: states of merging window and trigger are set to different TimeWindows on merge Key: FLINK-4863 URL: https://issues.apache.org/jira/browse/FLINK-4863 Project: Flink Issue Type: Bug Components: Streaming, Windowing Operators Reporter: Manu Zhang While window state is set to the mergeResult's stateWindow (one of original windows), trigger state is set to the mergeResult itself. This will fail {{Timer}} of {{ContinuousEventTimeTrigger}} since its window cannot be found in the window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2666: [FLINK-4863] fix trigger context window on merge
GitHub user manuzhang opened a pull request: https://github.com/apache/flink/pull/2666 [FLINK-4863] fix trigger context window on merge Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/flink fix_merge_window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2666.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2666 commit ce1aaba4e0208505e88ce1507533eaeab1f62321 Author: manuzhang Date: 2016-10-20T07:06:01Z [FLINK-4863] fix trigger context window on merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge
[ https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591051#comment-15591051 ] ASF GitHub Bot commented on FLINK-4863: --- GitHub user manuzhang opened a pull request: https://github.com/apache/flink/pull/2666 [FLINK-4863] fix trigger context window on merge Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/flink fix_merge_window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2666.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2666 commit ce1aaba4e0208505e88ce1507533eaeab1f62321 Author: manuzhang Date: 2016-10-20T07:06:01Z [FLINK-4863] fix trigger context window on merge > states of merging window and trigger are set to different TimeWindows on merge > -- > > Key: FLINK-4863 > URL: https://issues.apache.org/jira/browse/FLINK-4863 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Reporter: Manu Zhang > > While window state is set to the mergeResult's stateWindow (one of original > windows), trigger state is set to the mergeResult itself. This will fail > {{Timer}} of {{ContinuousEventTimeTrigger}} since its window cannot be found > in the window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
GitHub user sedgewickmm18 opened a pull request: https://github.com/apache/flink/pull/2667 README.md -Description of the bluemix specif⦠Hi, as per discussion with @mxm I'm proposing to include the following change that makes use of container linking (exposing a container specific hostname in /etc/hosts) so that taskmanagers can find the jobmanager) instead of shared volumes and handles fully qualified image names (including URL of the private registry + namespace). Best Regards, Markus List of changed files: build.sh -modify permissions to make script executable by default docker-compose-bluemix.yml -allow to specify image names with path to private BM registry docker-compose.sh -find out path to docker registry, extend build time to 120 secs, then call docker-compose docker-compose.yml -make use of container linking instead of shared volumes docker-entrypoint.sh -use 'jobmanager' entry in /etc/hosts arising from container linking so that taskmanager can register itself with jobmanager You can merge this pull request into a Git repository by running: $ git pull https://github.com/sedgewickmm18/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2667.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2667 commit 735ed871b528664afaac2ee46e8727b873b1ee97 Author: Markus Müller Date: 2016-10-20T08:08:34Z README.md -Description of the bluemix specific steps build.sh -modify permissions to make script executable by default docker-compose-bluemix.yml -allow to specify image names with path to private BM registry docker-compose.sh -find out path to docker registry, extend build time to 120 secs, then call docker-compose docker-compose.yml -make use of container linking instead of shared volumes docker-entrypoint.sh -use 'jobmanager' entry in /etc/hosts arising from container linking so that taskmanager can register itself with jobmanager --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591201#comment-15591201 ] ASF GitHub Bot commented on FLINK-3674: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84234419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. + * + * A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react + * to them firing. + * + * {@code + * DataStream input = ...; + * + * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); + * } + * + * @param Type of the input elements. + * @param Type of the returned elements. + */ +@PublicEvolving +public interface TimelyFlatMapFunction extends Function, Serializable { + + /** +* The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms +* it into zero, one, or more elements. +* +* @param value The input value. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void flatMap(I value, TimerService timerService, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param timeDomain The {@link TimeDomain} of the firing timer. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector out) throws Exception ; --- End diff -- Why not having 2 methods `onEventTime` and `onProcessingTime`? This way we avoid the `TimeDomain` argument, and we also tell the user to think about what he is doing in each case. In addition, this way is similar to the APIs we expose for the Triggers and I think it is good to have uniform APIs. Another solution would be to change the Trigger APIs to this but this would break user code. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an i
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84234419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. + * + * A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react + * to them firing. + * + * {@code + * DataStream input = ...; + * + * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); + * } + * + * @param Type of the input elements. + * @param Type of the returned elements. + */ +@PublicEvolving +public interface TimelyFlatMapFunction extends Function, Serializable { + + /** +* The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms +* it into zero, one, or more elements. +* +* @param value The input value. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void flatMap(I value, TimerService timerService, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param timeDomain The {@link TimeDomain} of the firing timer. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector out) throws Exception ; --- End diff -- Why not having 2 methods `onEventTime` and `onProcessingTime`? This way we avoid the `TimeDomain` argument, and we also tell the user to think about what he is doing in each case. In addition, this way is similar to the APIs we expose for the Triggers and I think it is good to have uniform APIs. Another solution would be to change the Trigger APIs to this but this would break user code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84237459 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); --- End diff -- I am wondering if it would be better to rename this to `EventTimeTimers`. This plays well with `processingTimeTimers` and also it indicates what we are talking about. We have event time whose "clock-tiks" are the Watermarks and processing time whose clock tiks are the wall clock ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591261#comment-15591261 ] ASF GitHub Bot commented on FLINK-3674: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84236659 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(pro
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591262#comment-15591262 ] ASF GitHub Bot commented on FLINK-3674: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84235862 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); --- End diff -- I agree with @StefanRRichter 's comment below and I just have to add that for `processWatermark1` and `processWatermark2` much of the code is r
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84236659 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue = restoredTimers.watermarkTimersQueue; + + processingTimeTimers = restoredTimers.processingTimeTimers; +
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591263#comment-15591263 ] ASF GitHub Bot commented on FLINK-3674: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84237459 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); --- End diff -- I am wondering if it would be better to rename this to `EventTimeTimers`. This plays well with `processingTimeTimers` and also it indicates what we are talking about. We have event time whose "clock-tiks" are the Watermarks and processing time whose clock tiks are the wall clock ones. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark);
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591264#comment-15591264 ] ASF GitHub Bot commented on FLINK-3674: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84236439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(pro
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84237685 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- We could also rename it to SystemProcessingTimeService or sth more indicative of where clock tics come from. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84235862 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); --- End diff -- I agree with @StefanRRichter 's comment below and I just have to add that for `processWatermark1` and `processWatermark2` much of the code is repeated so the common part can become a private method that is called by both these methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enab
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591260#comment-15591260 ] ASF GitHub Bot commented on FLINK-3674: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84237685 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- We could also rename it to SystemProcessingTimeService or sth more indicative of where clock tics come from. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4824) CliFrontend shows misleading error message when main() method returns before env.execute()
[ https://issues.apache.org/jira/browse/FLINK-4824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591288#comment-15591288 ] ASF GitHub Bot commented on FLINK-4824: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2662 Thanks for the PR @greghogan! Having a custom exception for missing arguments to a user program is a good approach. However, it requires the author of the program to use the custom exception. At least, we would have to adapt all the included examples. Additionally, it would be nice to throw another custom exception when no Flink job was generated during execution of the jar (which might be because of missing arguments). Currently, we simply throw a `ProgramInvocationException` which could look like a serious error to the user when merely arguments are missing. So +1 but we might do some follow-ups to fully solve the issue. > CliFrontend shows misleading error message when main() method returns before > env.execute() > -- > > Key: FLINK-4824 > URL: https://issues.apache.org/jira/browse/FLINK-4824 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Greg Hogan > > While testing Flink by running the > {{./examples/streaming/SocketWindowWordCount.jar}} example, I got the > following error message: > {code} > ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar > Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 > Using address 127.0.0.1:6123 to connect to JobManager. > JobManager web interface address http://127.0.0.1:8081 > Starting execution of program > No port specified. Please run 'SocketWindowWordCount --port ', where > port is the address of the text server > To start a simple text server, run 'netcat -l ' and type the input text > into the command line > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > didn't contain Flink jobs. Perhaps you forgot to call execute() on the > execution environment. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:324) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:985) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1032) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1029) > at > org.apache.flink.runtime.security.SecurityContext$1.run(SecurityContext.java:82) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.SecurityContext.runSecured(SecurityContext.java:79) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1029) > {code} > I think the error message is misleading, because I tried executing a valid > Flink job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4784) MetricQueryService actor name collision
[ https://issues.apache.org/jira/browse/FLINK-4784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591287#comment-15591287 ] ASF GitHub Bot commented on FLINK-4784: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2636 I tested the change this morning, and its not working: ``` 2016-10-20 11:08:28,216 WARN org.apache.flink.runtime.metrics.MetricRegistry - Could not start Metr icDumpActor. No metrics will be submitted to the WebInterface. akka.actor.InvalidActorNameException: illegal actor name [MetricQueryService_ResourceID{resourceId='e29f9e7d096c56 2054a1caa29be21a36'}], must conform to (?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2}) * at akka.actor.dungeon.Children$class.checkName(Children.scala:182) at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) at akka.actor.ActorCell.attachChild(ActorCell.scala:369) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553) ``` > MetricQueryService actor name collision > --- > > Key: FLINK-4784 > URL: https://issues.apache.org/jira/browse/FLINK-4784 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0 > > > The name of the MetricQueryService actor is currently fixed, which can lead > to collisions if multiple TaskManagers run in the same actor system, as is > the case for the MiniCluster. > We can append the TaskManager ResourceID to make the name unique for every > TaskManager. The MetricFetcher would still be able to reliable infer the > correct actor name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2662 Thanks for the PR @greghogan! Having a custom exception for missing arguments to a user program is a good approach. However, it requires the author of the program to use the custom exception. At least, we would have to adapt all the included examples. Additionally, it would be nice to throw another custom exception when no Flink job was generated during execution of the jar (which might be because of missing arguments). Currently, we simply throw a `ProgramInvocationException` which could look like a serious error to the user when merely arguments are missing. So +1 but we might do some follow-ups to fully solve the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84241679 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -20,11 +20,19 @@ if [ "$1" = "jobmanager" ]; then echo "Starting Job Manager" -sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml +#sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml + +# make use of container linking and exploit the jobmanager entry in /etc/hosts +sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml + sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" $FLINK_HOME/conf/flink-conf.yaml --- End diff -- This line has to go to the `taskmanager` section. Before, it didn't really matter because the config was shared but now this setting will just be configured for the `JobManager` when, in fact, it is only used by the `TaskManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4784) MetricQueryService actor name collision
[ https://issues.apache.org/jira/browse/FLINK-4784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591353#comment-15591353 ] ASF GitHub Bot commented on FLINK-4784: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2636 should be fixed now > MetricQueryService actor name collision > --- > > Key: FLINK-4784 > URL: https://issues.apache.org/jira/browse/FLINK-4784 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0 > > > The name of the MetricQueryService actor is currently fixed, which can lead > to collisions if multiple TaskManagers run in the same actor system, as is > the case for the MiniCluster. > We can append the TaskManager ResourceID to make the name unique for every > TaskManager. The MetricFetcher would still be able to reliable infer the > correct actor name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2636: [FLINK-4784] Unique MetricQueryService actor names
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2636 should be fixed now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591419#comment-15591419 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251732 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); --- End diff -- I'm renaming > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark water
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591422#comment-15591422 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251918 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591424#comment-15591424 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251996 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591462#comment-15591462 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2570 Thanks for your comments @StefanRRichter and @kl0u. I incorporated most of them by now. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2570 Thanks for your comments @StefanRRichter and @kl0u. I incorporated most of them by now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591460#comment-15591460 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84255151 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- Done > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251996 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue = restoredTimers.watermarkTimersQueue; + + processingTimeTimers = restoredTimers.processingTimeTimers;
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591567#comment-15591567 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 +1 go ahead > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 +1 go ahead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2642 @nssalian Thanks for understanding. We as a community together have actually gone into that mode (more conservative and stability focused) not too long ago. So it is still a bit of a learning process for everyone here (committers and contributors). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591577#comment-15591577 ] ASF GitHub Bot commented on FLINK-3999: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2642 @nssalian Thanks for understanding. We as a community together have actually gone into that mode (more conservative and stability focused) not too long ago. So it is still a bit of a learning process for everyone here (committers and contributors). > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591587#comment-15591587 ] ASF GitHub Bot commented on FLINK-3030: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2448 Good to go from my side. @rmetzger and @iampeter - since you are working on the web UI currently, can you try and merge this? > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84236439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue = restoredTimers.watermarkTimersQueue; + + processingTimeTimers = restoredTimers.processingTimeTimers; +
[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2448 Good to go from my side. @rmetzger and @iampeter - since you are working on the web UI currently, can you try and merge this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2636: [FLINK-4784] Unique MetricQueryService actor names
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2636 I tested the change this morning, and its not working: ``` 2016-10-20 11:08:28,216 WARN org.apache.flink.runtime.metrics.MetricRegistry - Could not start Metr icDumpActor. No metrics will be submitted to the WebInterface. akka.actor.InvalidActorNameException: illegal actor name [MetricQueryService_ResourceID{resourceId='e29f9e7d096c56 2054a1caa29be21a36'}], must conform to (?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2}) * at akka.actor.dungeon.Children$class.checkName(Children.scala:182) at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) at akka.actor.ActorCell.attachChild(ActorCell.scala:369) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591622#comment-15591622 ] Theodore Vasiloudis commented on FLINK-4850: Just to note, the PR I opened only fixes the documentation part, I recommend having a different issue for the adding an {EvaluateDataSet} operation for {LabeledVector} > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER >Assignee: Theodore Vasiloudis > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > 1- We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) > 2- Update code such that LabeledVector can be used with evaluate method -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591622#comment-15591622 ] Theodore Vasiloudis edited comment on FLINK-4850 at 10/20/16 11:57 AM: --- Just to note, the PR I opened only fixes the documentation part, I recommend having a different issue for the adding an {{EvaluateDataSet}} operation for {{LabeledVector}} was (Author: tvas): Just to note, the PR I opened only fixes the documentation part, I recommend having a different issue for the adding an {EvaluateDataSet} operation for {LabeledVector} > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER >Assignee: Theodore Vasiloudis > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > 1- We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) > 2- Update code such that LabeledVector can be used with evaluate method -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251732 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); --- End diff -- I'm renaming --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251918 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue = restoredTimers.watermarkTimersQueue; + + processingTimeTimers = restoredTimers.processingTimeTimers;
[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4850: --- Description: It seems that evaluate operation is defined for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML when using SVM. We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) was: It seems that evaluate operation is defined for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML when using SVM. 1- We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) 2- Update code such that LabeledVector can be used with evaluate method > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER >Assignee: Theodore Vasiloudis > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591644#comment-15591644 ] Thomas FOURNIER commented on FLINK-4850: Ok I'm creating a specific JIRA issue related to adding EvaluateDataSet operation for LabeledVector. > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER >Assignee: Theodore Vasiloudis > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4864) Shade Calcite dependency in flink-table
Fabian Hueske created FLINK-4864: Summary: Shade Calcite dependency in flink-table Key: FLINK-4864 URL: https://issues.apache.org/jira/browse/FLINK-4864 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.2.0 Reporter: Fabian Hueske The Table API has a dependency on Apache Calcite. A user reported to have version conflicts when having a own Calcite dependency in the classpath. The solution would be to shade away the Calcite dependency (Calcite's transitive dependencies are already shaded). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242416 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" depends_on: - jobmanager command: taskmanager -volumes_from: - - jobmanager:ro +links: --- End diff -- `links` are now a legacy feature of Docker 1.9.0 but probably fine to stick with it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242735 --- Diff: flink-contrib/docker-flink/docker-compose.sh --- @@ -0,0 +1,4 @@ +#!/bin/sh --- End diff -- Could we name this file `bluemix-docker-compose.sh`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector
Thomas FOURNIER created FLINK-4865: -- Summary: FlinkML - Add EvaluateDataSet operation for LabeledVector Key: FLINK-4865 URL: https://issues.apache.org/jira/browse/FLINK-4865 Project: Flink Issue Type: New Feature Reporter: Thomas FOURNIER Priority: Minor We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)] We want also to call this method on DataSet[LabeledVector] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector
[ https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4865: --- Description: We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: If our model is an SVM svm.evaluate(test) with test has type DataSet[(Double,Vector)] We would like to call it on DataSet[LabeledVector] also. was: We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)] We want also to call this method on DataSet[LabeledVector] > FlinkML - Add EvaluateDataSet operation for LabeledVector > - > > Key: FLINK-4865 > URL: https://issues.apache.org/jira/browse/FLINK-4865 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > We can only call "evaluate" method on a DataSet[(Double,Vector)]. > Eg: If our model is an SVM > svm.evaluate(test) with test has type DataSet[(Double,Vector)] > We would like to call it on DataSet[LabeledVector] also. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector
[ https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4865: --- Description: We can only call "evaluate" method on a DataSet[(Vector,Double)]. Eg: If our model is an SVM svm.evaluate(test) with test has type DataSet[(Vector,Double)] We would like to call it on DataSet[LabeledVector] also. was: We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: If our model is an SVM svm.evaluate(test) with test has type DataSet[(Double,Vector)] We would like to call it on DataSet[LabeledVector] also. > FlinkML - Add EvaluateDataSet operation for LabeledVector > - > > Key: FLINK-4865 > URL: https://issues.apache.org/jira/browse/FLINK-4865 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > We can only call "evaluate" method on a DataSet[(Vector,Double)]. > Eg: If our model is an SVM > svm.evaluate(test) with test has type DataSet[(Vector,Double)] > We would like to call it on DataSet[LabeledVector] also. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation
Aljoscha Krettek created FLINK-4866: --- Summary: Make Trigger.clear() Abstract to Enforce Implementation Key: FLINK-4866 URL: https://issues.apache.org/jira/browse/FLINK-4866 Project: Flink Issue Type: Bug Components: Streaming Reporter: Aljoscha Krettek If the method is not abstract implementors of custom triggers will not realise that it could be necessary and they will likely not clean up their state/timers properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591734#comment-15591734 ] ASF GitHub Bot commented on FLINK-3722: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2628#discussion_r84248344 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java --- @@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) { * then switch to {@link HeapSort}. */ public void sort(final IndexedSortable s, int p, int r) { - sortInternal(s, p, r, getMaxDepth(r - p)); + int recordsPerSegment = s.recordsPerSegment(); + int recordSize = s.recordSize(); + + int maxOffset = recordSize * (recordsPerSegment - 1); + + int size = s.size(); + int sizeN = size / recordsPerSegment; + int sizeO = (size % recordsPerSegment) * recordSize; + + sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 0, size, sizeN, sizeO, getMaxDepth(r - p)); } public void sort(IndexedSortable s) { sort(s, 0, s.size()); } - private static void sortInternal(final IndexedSortable s, int p, int r, int depth) { + private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset, + int p, int pN, int pO, int r, int rN, int rO, int depth) { --- End diff -- Could you please add a comment that explains all these parameters? (I understand them only because I know the original code and also what you are trying to achieve, but for someone who sees the code for the first time this will be quite scary.) > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2657 This doesn't compile currently. Do you prefer if I review the PRs individually or review the commits in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591741#comment-15591741 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2657 This doesn't compile currently. Do you prefer if I review the PRs individually or review the commits in this PR? > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2618 Thanks for updating the description. Let take a look at the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2618 Thanks a lot @mxm ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2628: [FLINK-3722] [runtime] Don't / and % when sorting
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2628#discussion_r84248344 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java --- @@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) { * then switch to {@link HeapSort}. */ public void sort(final IndexedSortable s, int p, int r) { - sortInternal(s, p, r, getMaxDepth(r - p)); + int recordsPerSegment = s.recordsPerSegment(); + int recordSize = s.recordSize(); + + int maxOffset = recordSize * (recordsPerSegment - 1); + + int size = s.size(); + int sizeN = size / recordsPerSegment; + int sizeO = (size % recordsPerSegment) * recordSize; + + sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 0, size, sizeN, sizeO, getMaxDepth(r - p)); } public void sort(IndexedSortable s) { sort(s, 0, s.size()); } - private static void sortInternal(final IndexedSortable s, int p, int r, int depth) { + private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset, + int p, int pN, int pO, int r, int rN, int rO, int depth) { --- End diff -- Could you please add a comment that explains all these parameters? (I understand them only because I know the original code and also what you are trying to achieve, but for someone who sees the code for the first time this will be quite scary.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84255151 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3902) Discarded FileSystem checkpoints are lingering around
[ https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591808#comment-15591808 ] Jan Zajíc commented on FLINK-3902: -- WE have same issue on Linux in docker, but *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. > Discarded FileSystem checkpoints are lingering around > - > > Key: FLINK-3902 > URL: https://issues.apache.org/jira/browse/FLINK-3902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Ufuk Celebi > > A user reported that checkpoints with {{FSStateBackend}} are not properly > cleaned up. > {code} > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler > 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from > 10.10.113.9:49233 Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > {code} > {code} > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > {code} > Running the same program with the {{RocksDBBackend}} works as expected and > clears the old checkpoints properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around
[ https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591808#comment-15591808 ] Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM: WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. was (Author: jan-zajic): WE have same issue on Linux in docker, but *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. > Discarded FileSystem checkpoints are lingering around > - > > Key: FLINK-3902 > URL: https://issues.apache.org/jira/browse/FLINK-3902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Ufuk Celebi > > A user reported that checkpoints with {{FSStateBackend}} are not properly > cleaned up. > {code} > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler > 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from > 10.10.113.9:49233 Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > {code} > {code} > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > {code} > Running the same program with the {{RocksDBBackend}} works as expected and > clears the old checkpoints properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around
[ https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591808#comment-15591808 ] Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM: We have same issue on Linux in docker, but with *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. was (Author: jan-zajic): WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. > Discarded FileSystem checkpoints are lingering around > - > > Key: FLINK-3902 > URL: https://issues.apache.org/jira/browse/FLINK-3902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Ufuk Celebi > > A user reported that checkpoints with {{FSStateBackend}} are not properly > cleaned up. > {code} > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler > 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from > 10.10.113.9:49233 Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > {code} > {code} > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > {code} > Running the same program with the {{RocksDBBackend}} works as expected and > clears the old checkpoints properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242539 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- What are these ports needed for? The TaskManager will always initiate the connection to the JobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user sedgewickmm18 commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84286390 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -20,11 +20,19 @@ if [ "$1" = "jobmanager" ]; then echo "Starting Job Manager" -sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml +#sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml + +# make use of container linking and exploit the jobmanager entry in /etc/hosts +sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml + sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" $FLINK_HOME/conf/flink-conf.yaml --- End diff -- agree - makes much more sense that way - otherwise it's always one slot by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user sedgewickmm18 commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84287069 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- These lines expose taskmanager's RPC and data ports to make them accessible in the private subnet, please see https://docs.docker.com/docker-cloud/apps/ports/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591881#comment-15591881 ] ASF GitHub Bot commented on FLINK-3722: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2628 Thanks @ggevay for reviewing. I added a commit with additional comments. I transcribed `Quicksort` so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. I think this is the best time to investigate alternative methods. I'm not seeing how one would sort on top of `InMemorySorter` without deserializing records. > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84285533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** +* Creates a {@link RichFileInputSplit} based on the file modification time and +* the rest of the information of the {@link FileInputSplit}, as returned by the +* underlying filesystem. +* +* @param modificationTime the modification file of the file this split belongs to +* @param split the rest of the information about the split +*/ + public RichFileInputSplit(long modificationTime, FileInputSplit split) { --- End diff -- Not sure about this constructor. I think I'd prefer something spelling out the parameters. This also avoids to create a regular FileInputSplit every time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288480 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit --- End diff -- I think you can drop the type parameter here since you don't gain any type safety from the parameter. It is never used in any argument which would make it meaningful. Instead just use `Serializable` for the state type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84280372 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -199,44 +196,39 @@ public void close() throws Exception { private final Object checkpointLock; private final SourceFunction.SourceContext readerContext; - private final Queue pendingSplits; - - private FileInputSplit currentSplit = null; + private final Queue> pendingSplits; - private S restoredFormatState = null; + private RichFileInputSplit currentSplit; - private volatile boolean isSplitOpen = false; + private volatile boolean isSplitOpen; private SplitReader(FileInputFormat format, TypeSerializer serializer, SourceFunction.SourceContext readerContext, Object checkpointLock, - Tuple3, FileInputSplit, S> restoredState) { + List> restoredState) { this.format = checkNotNull(format, "Unspecified FileInputFormat."); this.serializer = checkNotNull(serializer, "Unspecified Serializer."); this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.pendingSplits = new ArrayDeque<>(); this.isRunning = true; - // this is the case where a task recovers from a previous failed attempt - if (restoredState != null) { - List pending = restoredState.f0; - FileInputSplit current = restoredState.f1; - S formatState = restoredState.f2; - - for (FileInputSplit split : pending) { - pendingSplits.add(split); + this.pendingSplits = new PriorityQueue<>(100, new Comparator>() { --- End diff -- Why did you choose 100 as the initial size? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84285924 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit --- End diff -- The name rich :) I'd be happy if we could find another name. Rich doesn't really mean anything. How about `TimestampedFileInputSplit`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288567 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; --- End diff -- ```java private Serializable splitState; ``` should be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288776 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** +* Creates a {@link RichFileInputSplit} based on the file modification time and +* the rest of the information of the {@link FileInputSplit}, as returned by the +* underlying filesystem. +* +* @param modificationTime the modification file of the file this split belongs to +* @param split the rest of the information about the split +*/ + public RichFileInputSplit(long modificationTime, FileInputSplit split) { + this(modificationTime, + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames()); + } + + /** +* Constructor with the raw split information. +* +* @param modificationTime the modification file of the file this split belongs to +* @param numthe number of this input split +* @param file the file name +* @param start the position of the first byte in the file to process +* @param length the number of bytes in the file to process (-1 is flag for "read whole file") +* @param hosts the list of hosts containing the block, possibly null +*/ + private RichFileInputSplit(long modificationTime, int num, Path file, long start, long length, String[] hosts) { + super(num, file, start, length, hosts); + + Preconditions.checkArgument(modificationTime >= 0 || modificationTime == Long.MIN_VALUE, + "Invalid File Split Modification Time: "+ modificationTime +"."); + + this.modificationTime = modificationTime; + } + + /** +* Sets the state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* +* This is applicable to {@link org.apache.flink.api.common.io.FileInputFormat FileInputFormats} +* that implement the {@link org.apache.flink.api.common.io.CheckpointableInputFormat +* CheckpointableInputFormat} interface. +* */ + public void setSplitState(S state) { + this.splitState = state; + } + + /** +* Sets the state of the split to {@
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84284953 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RichFileInputSplitTest.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.checkpointing; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.source.RichFileInputSplit; +import org.junit.Assert; +import org.junit.Test; + +public class RichFileInputSplitTest { + + @Test + public void testSplitEquality() { + + RichFileInputSplit eos1 = RichFileInputSplit.EOS; + RichFileInputSplit eos2 = RichFileInputSplit.EOS; + + Assert.assertEquals(eos1, eos2); + + FileInputSplit firstSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, firstSplit); + Assert.assertNotEquals(eos1, richFirstSplit); + Assert.assertNotEquals(richFirstSplit, firstSplit); + + FileInputSplit secondSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, secondSplit); + Assert.assertEquals(richFirstSplit, richSecondSplit); + Assert.assertNotEquals(richFirstSplit, firstSplit); + + FileInputSplit modSecondSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richModSecondSplit = new RichFileInputSplit(11, modSecondSplit); + Assert.assertNotEquals(richSecondSplit, richModSecondSplit); + + FileInputSplit thirdSplit = new FileInputSplit(2, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, thirdSplit); + Assert.assertEquals(richThirdSplit.getModificationTime(), 10); + Assert.assertNotEquals(richFirstSplit, richThirdSplit); + + FileInputSplit thirdSplitCopy = new FileInputSplit(2, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richThirdSplitCopy = new RichFileInputSplit(10, thirdSplitCopy); + Assert.assertEquals(richThirdSplitCopy, richThirdSplit); + } + + @Test + public void testSplitComparison() { + FileInputSplit firstSplit = new FileInputSplit(3, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, firstSplit); + + FileInputSplit secondSplit = new FileInputSplit(2, new Path("test/test2"), 0, 100, null); + RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, secondSplit); + + FileInputSplit thirdSplit = new FileInputSplit(1, new Path("test/test2"), 0, 100, null); + RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, thirdSplit); + + FileInputSplit forthSplit = new FileInputSplit(0, new Path("test/test3"), 0, 100, null); + RichFileInputSplit richForthSplit = new RichFileInputSplit(11, forthSplit); + + // lexicographically on the path order + Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0); + Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0); + + // same mod time, same file so smaller split number first + Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0); + + // smaller modification time first + Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); + } + + @Test + public void testIllegalArgument() { + try { + FileInputSplit firstSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + new Ric
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84281147 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); --- End diff -- Is it really necessary to have this special split? Couldn't you just have a `reader.stop()` method which stops the reader after the current split has been processed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84279968 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -189,7 +186,7 @@ public void close() throws Exception { output.close(); } - private class SplitReader extends Thread { + private final class SplitReader extends Thread { --- End diff -- Making private classes final is not really necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2628: [FLINK-3722] [runtime] Don't / and % when sorting
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2628 Thanks @ggevay for reviewing. I added a commit with additional comments. I transcribed `Quicksort` so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. I think this is the best time to investigate alternative methods. I'm not seeing how one would sort on top of `InMemorySorter` without deserializing records. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user sedgewickmm18 commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84291047 --- Diff: flink-contrib/docker-flink/docker-compose.sh --- @@ -0,0 +1,4 @@ +#!/bin/sh --- End diff -- that's fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84290827 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- Sure, makes sense since those ports are not reachable by TaskManagers running in different containers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955 ] Anton Mushin commented on FLINK-4832: - Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testDataSetAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) FROM MyTable" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288975 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -347,34 +328,17 @@ public void run() { } } - private Tuple3, FileInputSplit, S> getReaderState() throws IOException { - List snapshot = new ArrayList<>(this.pendingSplits.size()); - for (FileInputSplit split: this.pendingSplits) { - snapshot.add(split); - } - - // remove the current split from the list if inside. - if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) { - this.pendingSplits.remove(); - } - - if (this.currentSplit != null) { - if (this.format instanceof CheckpointableInputFormat) { - @SuppressWarnings("unchecked") - CheckpointableInputFormat checkpointableFormat = - (CheckpointableInputFormat) this.format; - - S formatState = this.isSplitOpen ? - checkpointableFormat.getCurrentState() : - restoredFormatState; - return new Tuple3<>(snapshot, currentSplit, formatState); - } else { - LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery."); - return new Tuple3<>(snapshot, currentSplit, null); + private List> getReaderState() throws IOException { + List> snapshot = new ArrayList<>(this.pendingSplits.size()); + if (currentSplit != null ) { + if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { + S formatState = ((CheckpointableInputFormat, S>) this.format).getCurrentState(); --- End diff -- ```java Serializable formatState = ((CheckpointableInputFormat) this.format).getCurrentState(); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84276796 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -43,16 +41,18 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; import java.util.Queue; +import static org.apache.flink.streaming.api.functions.source.RichFileInputSplit.EOS; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The operator that reads the {@link FileInputSplit splits} received from the preceding + * The operator that reads the {@link RichFileInputSplit splits} received from the preceding * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} * which has a parallelism of 1, this operator can have DOP > 1. * --- End diff -- Generic types are not documented in the JavaDoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955 ] Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM: --- Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testCountNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0,0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} was (Author: anmu): Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testDataSetAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) FROM MyTable" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project
[jira] [Commented] (FLINK-4824) CliFrontend shows misleading error message when main() method returns before env.execute()
[ https://issues.apache.org/jira/browse/FLINK-4824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592027#comment-15592027 ] ASF GitHub Bot commented on FLINK-4824: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2662 @mxm thanks for the review. I added a second commit which I think satisfies your request. When no job is executed then the message is printed to stderr without a stacktrace. > CliFrontend shows misleading error message when main() method returns before > env.execute() > -- > > Key: FLINK-4824 > URL: https://issues.apache.org/jira/browse/FLINK-4824 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Greg Hogan > > While testing Flink by running the > {{./examples/streaming/SocketWindowWordCount.jar}} example, I got the > following error message: > {code} > ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar > Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 > Using address 127.0.0.1:6123 to connect to JobManager. > JobManager web interface address http://127.0.0.1:8081 > Starting execution of program > No port specified. Please run 'SocketWindowWordCount --port ', where > port is the address of the text server > To start a simple text server, run 'netcat -l ' and type the input text > into the command line > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > didn't contain Flink jobs. Perhaps you forgot to call execute() on the > execution environment. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:324) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:985) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1032) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1029) > at > org.apache.flink.runtime.security.SecurityContext$1.run(SecurityContext.java:82) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.SecurityContext.runSecured(SecurityContext.java:79) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1029) > {code} > I think the error message is misleading, because I tried executing a valid > Flink job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592052#comment-15592052 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2570 I updated this on top of the latest master with @StefanRRichter's state changes. Please take another look, @StefanRRichter. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592051#comment-15592051 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2648 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2648 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2662 @mxm thanks for the review. I added a second commit which I think satisfies your request. When no job is executed then the message is printed to stderr without a stacktrace. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles
[ https://issues.apache.org/jira/browse/FLINK-4842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4842. - Resolution: Implemented Fix Version/s: 1.2.0 > Introduce test to enforce order of operator / udf lifecycles > - > > Key: FLINK-4842 > URL: https://issues.apache.org/jira/browse/FLINK-4842 > Project: Flink > Issue Type: Test >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > We should introduce a test that enforces a certain order in which life cycle > methods of operators and udfs are called, so that they are not easily changed > by accident. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4844. - Resolution: Implemented Fix Version/s: 1.2.0 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2570 I updated this on top of the latest master with @StefanRRichter's state changes. Please take another look, @StefanRRichter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance
Gabor Gevay created FLINK-4867: -- Summary: Investigate code generation for improving sort performance Key: FLINK-4867 URL: https://issues.apache.org/jira/browse/FLINK-4867 Project: Flink Issue Type: Sub-task Components: Local Runtime Reporter: Gabor Gevay Priority: Minor This issue is for investigating whether code generation could speed up sorting. We should make some performance measurements on hand-written code that is similar to what we could generate, to see whether investing more time into this is worth it. If we find that it is worth it, we can open a second Jira for the actual implementation of the code generation. I think we could generate one class at places where we currently instantiate {{QuickSort}}. This generated class would include the functionality of {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, {{MemorySegment.compare}}, and {{MemorySegment.swap}}. Btw. I'm planning to give this as a student project at a TU Berlin course in the next few months. Some concrete ideas about how could a generated sorter be faster than the current sorting code: * {{MemorySegment.compare}} could be specialized for ** Length: for small records, the loop could be unrolled ** Endiannes (currently it is optimized for big endian; and in the little endian case (e.g. x86) it does a Long.reverseBytes for each long read) * {{MemorySegment.swapBytes}} ** In case of small records, using three {{UNSAFE.copyMemory}} is probably not as efficient as a specialized swap, because *** We could use total loop unrolling in generated code (because we know the exact record size) *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] *** We will only need 2/3 the memory bandwidth, because the temporary storage could be a register if we swap one byte (or one {{long}}) at a time ** several checks might be eliminated * Better inlining behaviour could be achieved ** Virtual function calls to the methods of {{InMemorySorter}} could be eliminated. (Note, that these are problematic to devirtualize by the JVM if there are different derived classes used in a single Flink job (see \[8,7\]).) ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the excessive checks make it too large ** {{MemorySegment.compare}} is probably also not inlined currently, because those two while loops are too large \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, long, Object, long, long) \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ \[8\] http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ \[9\] http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592186#comment-15592186 ] ASF GitHub Bot commented on FLINK-3722: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2628 > I transcribed Quicksort so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. OK, I would say that it is OK like this, but let's see what the others will say. > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps
Gabor Gevay created FLINK-4868: -- Summary: Insertion sort could avoid the swaps Key: FLINK-4868 URL: https://issues.apache.org/jira/browse/FLINK-4868 Project: Flink Issue Type: Sub-task Components: Local Runtime Reporter: Gabor Gevay Priority: Minor This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4868) Insertion sort could avoid the swaps
[ https://issues.apache.org/jira/browse/FLINK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4868: --- Description: This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) Note that the threshold for switching to the insertion sort could probably be increased after this. was: This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) > Insertion sort could avoid the swaps > > > Key: FLINK-4868 > URL: https://issues.apache.org/jira/browse/FLINK-4868 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This is about the fallback to insertion sort at the beginning of > {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when > we are at the bottom of the quick sort recursion tree. > The inner loop does a series of swaps on adjacent elements for moving a block > of several elements one slot to the right and inserting the ith element at > the hole. However, it would be faster to first copy the ith element to a temp > location, and then move the block of elements to the right without swaps, and > then copy the original ith element to the hole. > Moving the block of elements without swaps could be achieved by calling > {{UNSAFE.copyMemory}} only once for every element (as opposed to the three > calls in {{MemorySegment.swap}} currently being done). > (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like > memcpy, so I'm not sure if we can do the entire block of elements with maybe > even one {{UNSAFE.copyMemory}}.) > Note that the threshold for switching to the insertion sort could probably be > increased after this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592239#comment-15592239 ] Greg Hogan commented on FLINK-4867: --- It will be very interesting to see the results of this project. Perhaps you should self-assign the ticket until it can be handed over? Inline status is logged with the JVM arguments {{-XX:+UnlockDiagnosticVMOptions -XX:+PrintInlining}}. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592246#comment-15592246 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2570 Rebased version looks good to me. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592292#comment-15592292 ] Stefan Richter edited comment on FLINK-4867 at 10/20/16 4:32 PM: - If sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. was (Author: srichter): If you sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592292#comment-15592292 ] Stefan Richter commented on FLINK-4867: --- If you sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4869) Store record pointer after record keys
Greg Hogan created FLINK-4869: - Summary: Store record pointer after record keys Key: FLINK-4869 URL: https://issues.apache.org/jira/browse/FLINK-4869 Project: Flink Issue Type: Sub-task Components: Core Affects Versions: 1.2.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} separate from the memory segments used for the sort keys. By storing the pointer after the sort keys the addition of the offset is moved from {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which are O\(n). Will run a performance comparison before submitting a PR to how significant a performance improvement this would yield. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4860) Sort performance
[ https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4860: --- Component/s: Local Runtime > Sort performance > > > Key: FLINK-4860 > URL: https://issues.apache.org/jira/browse/FLINK-4860 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Greg Hogan > Labels: performance > > A super-task for improvements to Flink's sort performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4860) Sort performance
[ https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4860: --- Labels: performance (was: ) > Sort performance > > > Key: FLINK-4860 > URL: https://issues.apache.org/jira/browse/FLINK-4860 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Greg Hogan > Labels: performance > > A super-task for improvements to Flink's sort performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4869) Store record pointer after record keys
[ https://issues.apache.org/jira/browse/FLINK-4869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4869: --- Labels: performance (was: ) > Store record pointer after record keys > -- > > Key: FLINK-4869 > URL: https://issues.apache.org/jira/browse/FLINK-4869 > Project: Flink > Issue Type: Sub-task > Components: Core >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} > separate from the memory segments used for the sort keys. By storing the > pointer after the sort keys the addition of the offset is moved from > {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which > are O\(n). > Will run a performance comparison before submitting a PR to how significant a > performance improvement this would yield. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4870) ContinuousFileMonitoringFunction does not properly handle absolut Windows paths
Chesnay Schepler created FLINK-4870: --- Summary: ContinuousFileMonitoringFunction does not properly handle absolut Windows paths Key: FLINK-4870 URL: https://issues.apache.org/jira/browse/FLINK-4870 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.1.2 Reporter: Chesnay Schepler Priority: Minor Fix For: 1.2.0 The ContinuousFileMonitoringFunction fails for absolute windows paths without a dedicated scheme (e.g "C:\\tmp\\test.csv"), since the String path is directly fed into the URI constructor (which doesn't handle it properly) instead of first creating a flink Path and converting that into an URI. -- This message was sent by Atlassian JIRA (v6.3.4#6332)