[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2472 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79656257 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Flink's basic future abstraction. A future represents an asynchronous operation whose result + * will be contained in this instance upon completion. + * + * @param type of the future's result + */ +public interface Future { + + /** +* Checks if the future has been completed. A future is completed, if the result has been +* delivered. +* +* @return true if the future is completed; otherwise false +*/ + boolean isDone(); --- End diff -- I see. After all, it's a minor naming issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79623763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Flink's basic future abstraction. A future represents an asynchronous operation whose result + * will be contained in this instance upon completion. + * + * @param type of the future's result + */ +public interface Future { + + /** +* Checks if the future has been completed. A future is completed, if the result has been +* delivered. +* +* @return true if the future is completed; otherwise false +*/ + boolean isDone(); + + /** +* Tries to cancel the future's operation. Note that not all future operations can be canceled. +* The result of the cancelling will be returned. +* +* @param mayInterruptIfRunning true iff the future operation may be interrupted +* @return true if the cancelling was successful; otherwise false +*/ + boolean cancel(boolean mayInterruptIfRunning); + + /** +* Gets the result value of the future. If the future has not been completed, then this +* operation will block indefinitely until the result has been delivered. +* +* @return the result value +* @throws CancellationException if the future has been cancelled +* @throws InterruptedException if the current thread was interrupted while waiting for the result +* @throws ExecutionException if the future has been completed with an exception +*/ + T get() throws InterruptedException, ExecutionException; + + /** +* Gets the result value of the future. If the future has not been done, then this operation +* will block the given timeout value. If the result has not been delivered within the timeout, +* then the method throws an {@link TimeoutException}. +* +* @param timeout the time to wait for the future to be done +* @param unit time unit for the timeout argument +* @return the result value +* @throws CancellationException if the future has been cancelled +* @throws InterruptedException if the current thread was interrupted while waiting for the result +* @throws ExecutionException if the future has been completed with an exception +* @throws TimeoutException if the future has not been completed within the given timeout +*/ + T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; + + /** +* Gets the value of the future. If the future has not been completed when calling this +* function, the given value is returned. +* +* @param valueIfAbsent value which is returned if the future has not been completed +* @return value of the future or the given value if the future has not been completed +* @throws ExecutionException if the future has been completed with an exception +*/ + T getNow(T valueIfAbsent) throws ExecutionException; + + /** +* Applies the given function to the value of the future. The result of the apply function is +* the value of the newly returned future. +* +* The apply function is executed asynchronously by the given executor. +* +* @param applyFunction function to apply to the future's value +* @param executor used to execute the given apply function asynchronously +* @param type of the apply function's return value +* @return future r
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79622945 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Flink's basic future abstraction. A future represents an asynchronous operation whose result + * will be contained in this instance upon completion. + * + * @param type of the future's result + */ +public interface Future { + + /** +* Checks if the future has been completed. A future is completed, if the result has been +* delivered. +* +* @return true if the future is completed; otherwise false +*/ + boolean isDone(); --- End diff -- That's because I sticked to Java 8's `CompletableFuture` implementation where it is named the same. I'm not super happy with the naming either. But I also see the benefit of sticking to Java 8's `CompletableFuture` interface. This will allow us to easily replace it once we switch to Java 8. I think we have to decide whether we want to stick to Java 8's `CompletableFuture` or not. In the latter case we can rename other methods as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79597647 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Flink's basic future abstraction. A future represents an asynchronous operation whose result + * will be contained in this instance upon completion. + * + * @param type of the future's result + */ +public interface Future { + + /** +* Checks if the future has been completed. A future is completed, if the result has been +* delivered. +* +* @return true if the future is completed; otherwise false +*/ + boolean isDone(); --- End diff -- Why is this not named `isCompleted()`? That would be analogue to the `complete()` function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79599026 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Flink's basic future abstraction. A future represents an asynchronous operation whose result + * will be contained in this instance upon completion. + * + * @param type of the future's result + */ +public interface Future { + + /** +* Checks if the future has been completed. A future is completed, if the result has been +* delivered. +* +* @return true if the future is completed; otherwise false +*/ + boolean isDone(); + + /** +* Tries to cancel the future's operation. Note that not all future operations can be canceled. +* The result of the cancelling will be returned. +* +* @param mayInterruptIfRunning true iff the future operation may be interrupted +* @return true if the cancelling was successful; otherwise false +*/ + boolean cancel(boolean mayInterruptIfRunning); + + /** +* Gets the result value of the future. If the future has not been completed, then this +* operation will block indefinitely until the result has been delivered. +* +* @return the result value +* @throws CancellationException if the future has been cancelled +* @throws InterruptedException if the current thread was interrupted while waiting for the result +* @throws ExecutionException if the future has been completed with an exception +*/ + T get() throws InterruptedException, ExecutionException; + + /** +* Gets the result value of the future. If the future has not been done, then this operation +* will block the given timeout value. If the result has not been delivered within the timeout, +* then the method throws an {@link TimeoutException}. +* +* @param timeout the time to wait for the future to be done +* @param unit time unit for the timeout argument +* @return the result value +* @throws CancellationException if the future has been cancelled +* @throws InterruptedException if the current thread was interrupted while waiting for the result +* @throws ExecutionException if the future has been completed with an exception +* @throws TimeoutException if the future has not been completed within the given timeout +*/ + T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; + + /** +* Gets the value of the future. If the future has not been completed when calling this +* function, the given value is returned. +* +* @param valueIfAbsent value which is returned if the future has not been completed +* @return value of the future or the given value if the future has not been completed +* @throws ExecutionException if the future has been completed with an exception +*/ + T getNow(T valueIfAbsent) throws ExecutionException; + + /** +* Applies the given function to the value of the future. The result of the apply function is +* the value of the newly returned future. +* +* The apply function is executed asynchronously by the given executor. +* +* @param applyFunction function to apply to the future's value +* @param executor used to execute the given apply function asynchronously +* @param type of the apply function's return value +* @return future representi
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79565983 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.impl; + +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Promise; + +/** + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}. + * + * @param type of the future's value + */ +public class FlinkCompletableFuture extends FlinkFuture implements CompletableFuture { --- End diff -- True, you're right. Will change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r79565763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.impl; + +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Promise; + +/** + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}. + * + * @param type of the future's value + */ +public class FlinkCompletableFuture extends FlinkFuture implements CompletableFuture { + + private final Promise promise; + + public FlinkCompletableFuture() { + promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + scalaFuture = promise.future(); + --- End diff -- Good point, will remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r77971053 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.impl; + +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Promise; + +/** + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}. + * + * @param type of the future's value + */ +public class FlinkCompletableFuture extends FlinkFuture implements CompletableFuture { --- End diff -- How about letting this class overrides the cancel() methods and treat it as completeExceptionally with CancellationException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2472#discussion_r77970498 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.impl; + +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Promise; + +/** + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}. + * + * @param type of the future's value + */ +public class FlinkCompletableFuture extends FlinkFuture implements CompletableFuture { + + private final Promise promise; + + public FlinkCompletableFuture() { + promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + scalaFuture = promise.future(); + --- End diff -- unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2472 [FLINK-4361] Introduce Flink's own future abstraction Flink's future abstraction whose API is similar to Java 8's CompletableFuture. That's in order to ease a future transition to this class once we ditch Java 7. The current set of operations comprises: - isDone to check the completion of the future - get/getNow to obtain the future's value - cancel to cancel the future (best effort basis) - thenApplyAsync to transform the future's value into another value - thenAcceptAsync to register a callback for a successful completion of the future - exceptionallyAsync to register a callback for an exception completion of the future - thenComposeAsync to transform the future's value and flatten the returned future - handleAsync to register a callback which is called either with the regular result or the exceptional result Additionally, Flink offers a CompletableFuture which can be completed with a regular value or an exception: - complete/completeExceptionally You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink futures Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2472 commit 14023bb0df8e1a263d4082fb0e747a831e82f4cd Author: Till Rohrmann Date: 2016-09-02T19:13:34Z [FLINK-4361] Introduce Flink's own future abstraction Flink's future abstraction whose API is similar to Java 8's CompletableFuture. That's in order to ease a future transition to this class once we ditch Java 7. The current set of operations comprises: - isDone to check the completion of the future - get/getNow to obtain the future's value - cancel to cancel the future (best effort basis) - thenApplyAsync to transform the future's value into another value - thenAcceptAsync to register a callback for a successful completion of the future - exceptionallyAsync to register a callback for an exception completion of the future - thenComposeAsync to transform the future's value and flatten the returned future - handleAsync to register a callback which is called either with the regular result or the exceptional result Additionally, Flink offers a CompletableFuture which can be completed with a regular value or an exception: - complete/completeExceptionally --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---