[FLINK-7505] Use lambdas in suppressed exception idiom
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5456cf9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5456cf9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5456cf9f Branch: refs/heads/master Commit: 5456cf9f8fc9156fd10e7542e8a2497a285cbff7 Parents: ca87bec Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Thu Aug 24 17:27:29 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Aug 24 20:17:08 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/util/LambdaUtil.java | 63 ++++++++++++++++++++ .../org/apache/flink/util/ThrowingConsumer.java | 37 ++++++++++++ .../apache/flink/runtime/state/StateUtil.java | 25 +------- 3 files changed, 103 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5456cf9f/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java new file mode 100644 index 0000000..8ac0f0e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +/** + * This class offers utility functions for Java's lambda features. + */ +public final class LambdaUtil { + + private LambdaUtil() { + throw new AssertionError(); + } + + /** + * This method supplies all elements from the input to the consumer. Exceptions that happen on elements are + * suppressed until all elements are processed. If exceptions happened for one or more of the inputs, they are + * reported in a combining suppressed exception. + * + * @param inputs iterator for all inputs to the throwingConsumer. + * @param throwingConsumer this consumer will be called for all elements delivered by the input iterator. + * @param <T> the type of input. + * @throws Exception collected exceptions that happened during the invocation of the consumer on the input elements. + */ + public static <T> void applyToAllWhileSuppressingExceptions( + Iterable<T> inputs, + ThrowingConsumer<T> throwingConsumer) throws Exception { + + if (inputs != null && throwingConsumer != null) { + Exception exception = null; + + for (T input : inputs) { + + if (input != null) { + try { + throwingConsumer.accept(input); + } catch (Exception ex) { + exception = ExceptionUtils.firstOrSuppressed(ex, exception); + } + } + } + + if (exception != null) { + throw exception; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5456cf9f/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java b/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java new file mode 100644 index 0000000..a180a12 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +/** + * This interface is basically Java's {@link java.util.function.Consumer} interface enhanced with the ability to throw + * an exception. + * + * @param <T> type of the consumed elements. + */ +@FunctionalInterface +public interface ThrowingConsumer<T> { + + /** + * Performs this operation on the given argument. + * + * @param t the input argument + * @throws Exception on errors during consumption + */ + void accept(T t) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/5456cf9f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java index 6f231e4..09d195a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.state; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; +import org.apache.flink.util.LambdaUtil; import java.util.concurrent.RunnableFuture; @@ -49,27 +49,8 @@ public class StateUtil { * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration */ public static void bestEffortDiscardAllStateObjects( - Iterable<? extends StateObject> handlesToDiscard) throws Exception { - - if (handlesToDiscard != null) { - Exception exception = null; - - for (StateObject state : handlesToDiscard) { - - if (state != null) { - try { - state.discardState(); - } - catch (Exception ex) { - exception = ExceptionUtils.firstOrSuppressed(ex, exception); - } - } - } - - if (exception != null) { - throw exception; - } - } + Iterable<? extends StateObject> handlesToDiscard) throws Exception { + LambdaUtil.applyToAllWhileSuppressingExceptions(handlesToDiscard, StateObject::discardState); } /**