This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch wip/BufferedResultSender
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a8b5451d8133164669782fb9cb1349670d4f8ea8
Author: Jacob Barrett <jbarr...@pivotal.io>
AuthorDate: Fri Aug 23 00:50:29 2019 -0700

    BufferedResultSender to make function result sending feel better.
---
 .../geode/cache/execute/BufferedResultSender.java  | 156 +++++++++++++++++++
 .../cache/execute/BufferedResultSenderTest.java    | 168 +++++++++++++++++++++
 2 files changed, 324 insertions(+)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/execute/BufferedResultSender.java
 
b/geode-core/src/main/java/org/apache/geode/cache/execute/BufferedResultSender.java
new file mode 100644
index 0000000..73f8483
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/execute/BufferedResultSender.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.execute;
+
+import java.util.Collection;
+
+/**
+ * Provides a more natural {@link ResultSender} behavior when whether a result 
is last or not is
+ * unknown. All {@link #sendResult(Object)} calls are buffered such that when 
{@link #close()} is
+ * called
+ * the last item in the buffer is sent via the underlying {@link 
ResultSender#lastResult(Object)}.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>
+ * {@code
+ *   try (BufferedResultSender<SomeType> resultSender = new 
BufferedResultSender<>(context.getResultSender()) {
+ *     for (SomeType result : results) {
+ *       resultSender.sendResult(result);
+ *     }
+ *   }
+ * }
+ * </pre>
+ *
+ * This snippet will send all the results in the results collection to the 
underlying
+ * {@link ResultSender}.
+ * </p>
+ *
+ * @param <T> type of result to send
+ */
+public class BufferedResultSender<T> implements ResultSender<T>, AutoCloseable 
{
+
+  private enum State {
+    EMPTY, BUFFERING, CLOSED
+  }
+
+  private final ResultSender<? super T> resultSender;
+  private T buffer;
+  private State state = State.EMPTY;
+
+  public static <T> BufferedResultSender<T> buffered(ResultSender<? super T> 
resultSender) {
+    return new BufferedResultSender<>(resultSender);
+  }
+
+  public BufferedResultSender(ResultSender<? super T> resultSender) {
+    this.resultSender = resultSender;
+  }
+
+  /**
+   * Sends buffered result by executing {@link 
ResultSender#lastResult(Object)} on underlying
+   * {@link ResultSender}. Since this method comes from {@link AutoCloseable} 
you may use this
+   * {@link ResultSender}
+   * in a try with resource block.
+   */
+  @Override
+  public void close() {
+    if (state == State.CLOSED) {
+      return;
+    }
+
+    sendLastAndClose();
+  }
+
+  /**
+   * Flush any buffered result by executing {@link 
ResultSender#sendResult(Object)} on underlying
+   * {@link ResultSender} and buffer new result.
+   *
+   * @param oneResult to buffer for sending.
+   */
+  @Override
+  public void sendResult(T oneResult) {
+    if (state == State.CLOSED) {
+      return;
+    }
+
+    bufferResult(oneResult);
+  }
+
+  /**
+   * Flushes any buffered result and executes {@link 
ResultSender#lastResult(Object)} on underlying
+   * {@link ResultSender}. Provided for backwards compatibility with {@link 
ResultSender} API but
+   * should not be used directly.
+   *
+   * @param lastResult to send
+   * @deprecated Use {@link #sendResult(Object)} and {@link #close()}
+   */
+  @Override
+  @Deprecated
+  public void lastResult(T lastResult) {
+    sendResult(lastResult);
+    sendLastAndClose();
+  }
+
+  /**
+   * Flushes any buffered result and executes {@link 
ResultSender#sendException(Throwable)} on
+   * underlying
+   * {@link ResultSender}.
+   *
+   * @param throwable to send
+   */
+  @Override
+  public void sendException(final Throwable throwable) {
+    if (state == State.CLOSED) {
+      return;
+    }
+
+    flushBuffer();
+    resultSender.sendException(throwable);
+    buffer = null;
+    state = State.CLOSED;
+  }
+
+  public void sendAllResults(final Collection<? extends T> results) {
+    if (state == State.CLOSED) {
+      return;
+    }
+
+    for (final T result : results) {
+      bufferResult(result);
+    }
+  }
+
+  private void bufferResult(final T oneResult) {
+    flushBuffer();
+    buffer = oneResult;
+  }
+
+  private void sendLastAndClose() {
+    resultSender.lastResult(buffer);
+    buffer = null;
+    state = State.CLOSED;
+  }
+
+  private void flushBuffer() {
+    if (state == State.EMPTY) {
+      state = State.BUFFERING;
+    } else {
+      resultSender.sendResult(buffer);
+    }
+  }
+
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/execute/BufferedResultSenderTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/execute/BufferedResultSenderTest.java
new file mode 100644
index 0000000..457787d
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/execute/BufferedResultSenderTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.execute;
+
+import static java.util.Arrays.asList;
+import static org.apache.geode.cache.execute.BufferedResultSender.buffered;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import org.junit.Test;
+
+public class BufferedResultSenderTest {
+
+  @SuppressWarnings("unchecked")
+  private final ResultSender<Number> mockResultSender = 
mock(ResultSender.class);
+
+  @Test
+  public void closeInvokesLastResult() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.close();
+
+    verify(mockResultSender).lastResult(null);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void closeAfterCloseIsNoop() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.close();
+    resultSender.close();
+
+    verify(mockResultSender).lastResult(null);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void autoCloseInvokesLastResult() {
+    try (final BufferedResultSender<Integer> resultSender = 
buffered(mockResultSender)) {
+      // need to do something here to keep static analyzer happy
+      assertThat(resultSender).isNotNull();
+    }
+
+    verify(mockResultSender).lastResult(null);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void closeFlushesBuffer() {
+    try (final BufferedResultSender<Integer> resultSender = 
buffered(mockResultSender)) {
+      resultSender.sendResult(1);
+      resultSender.sendResult(2);
+    }
+
+    verify(mockResultSender).sendResult(1);
+    verify(mockResultSender).lastResult(2);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void sendResultBuffersFirstResult() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.sendResult(1);
+
+    verifyZeroInteractions(mockResultSender);
+  }
+
+  @Test
+  public void sendResultSendsFirstBuffersSecondResult() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.sendResult(1);
+    resultSender.sendResult(2);
+
+    verify(mockResultSender).sendResult(1);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void sendResultAfterCloseIsNoop() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.close();
+    resultSender.sendResult(1);
+
+    verify(mockResultSender).lastResult(null);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void lastResultFlushesAndCloses() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.lastResult(1);
+
+    verify(mockResultSender).lastResult(1);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void sendExceptionWithoutResultsSendsException() {
+    final Exception throwable = new Exception();
+
+    try (final BufferedResultSender<Integer> resultSender = 
buffered(mockResultSender)) {
+      resultSender.sendException(throwable);
+    }
+
+    verify(mockResultSender).sendException(throwable);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void sendExceptionWithoutResultFlushesAndSendsException() {
+    final Exception throwable = new Exception();
+
+    try (final BufferedResultSender<Integer> resultSender = 
buffered(mockResultSender)) {
+      resultSender.sendResult(1);
+      resultSender.sendException(throwable);
+    }
+
+    verify(mockResultSender).sendResult(1);
+    verify(mockResultSender).sendException(throwable);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+  @Test
+  public void sendAllResults() {
+    try (final BufferedResultSender<Integer> resultSender = 
buffered(mockResultSender)) {
+      resultSender.sendAllResults(asList(1, 2, 3));
+    }
+
+    verify(mockResultSender).sendResult(1);
+    verify(mockResultSender).sendResult(2);
+    verify(mockResultSender).lastResult(3);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+
+
+  @Test
+  public void sendAllResultsAfterCloseIsNoop() {
+    final BufferedResultSender<Integer> resultSender = new 
BufferedResultSender<>(mockResultSender);
+
+    resultSender.close();
+    resultSender.sendAllResults(asList(1, 2, 3));
+
+    verify(mockResultSender).lastResult(null);
+    verifyNoMoreInteractions(mockResultSender);
+  }
+}

Reply via email to