[ https://issues.apache.org/jira/browse/FLINK-986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14240024#comment-14240024 ]
ASF GitHub Bot commented on FLINK-986: -------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/incubator-flink/pull/254#discussion_r21560216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferFuture.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.runtime.util.event.EventListener; + +import static com.google.common.base.Preconditions.checkState; + +public class BufferFuture { + + private final Object monitor = new Object(); + + private Buffer buffer; + + private EventListener<BufferFuture> listener; + + private boolean isCancelled; + + public BufferFuture() { + } + + public BufferFuture(Buffer buffer) { + synchronized (monitor) { + this.buffer = buffer; + } + } + + public Buffer getBuffer() { + synchronized (monitor) { + return buffer; + } + } + + public BufferFuture waitForBuffer() throws InterruptedException { + synchronized (monitor) { + while (buffer == null && !isCancelled) { + monitor.wait(2000); + } + + return this; + } + } + + public BufferFuture addListener(EventListener<BufferFuture> listenerToAdd) { + synchronized (monitor) { + checkState(!isCancelled, "Buffer future has already been cancelled."); + checkState(listener == null, "Listener has already been added."); + + // Late registration + if (buffer != null) { + listenerToAdd.onEvent(this); + } + else { + listener = listenerToAdd; + } + + return this; + } + } + + public void cancel() { + synchronized (monitor) { + checkState(buffer == null, "Too late to cancel. Already handed a buffer in for this future."); + + isCancelled = true; + monitor.notifyAll(); + + if (listener != null) { + listener.onEvent(this); + } + } + } + + public boolean isDone() { + synchronized (monitor) { --- End diff -- I agree :) > Add intermediate results to distributed runtime > ----------------------------------------------- > > Key: FLINK-986 > URL: https://issues.apache.org/jira/browse/FLINK-986 > Project: Flink > Issue Type: New Feature > Components: Distributed Runtime > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Priority: Blocker > > Support for intermediate results in the runtime is currently blocking > different efforts like fault tolerance or result collection at the client. -- This message was sent by Atlassian JIRA (v6.3.4#6332)