This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch wip/BufferedResultSender in repository https://gitbox.apache.org/repos/asf/geode.git
commit a8b5451d8133164669782fb9cb1349670d4f8ea8 Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Fri Aug 23 00:50:29 2019 -0700 BufferedResultSender to make function result sending feel better. --- .../geode/cache/execute/BufferedResultSender.java | 156 +++++++++++++++++++ .../cache/execute/BufferedResultSenderTest.java | 168 +++++++++++++++++++++ 2 files changed, 324 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/BufferedResultSender.java b/geode-core/src/main/java/org/apache/geode/cache/execute/BufferedResultSender.java new file mode 100644 index 0000000..73f8483 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/execute/BufferedResultSender.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.execute; + +import java.util.Collection; + +/** + * Provides a more natural {@link ResultSender} behavior when whether a result is last or not is + * unknown. All {@link #sendResult(Object)} calls are buffered such that when {@link #close()} is + * called + * the last item in the buffer is sent via the underlying {@link ResultSender#lastResult(Object)}. + * + * <p> + * Example: + * + * <pre> + * {@code + * try (BufferedResultSender<SomeType> resultSender = new BufferedResultSender<>(context.getResultSender()) { + * for (SomeType result : results) { + * resultSender.sendResult(result); + * } + * } + * } + * </pre> + * + * This snippet will send all the results in the results collection to the underlying + * {@link ResultSender}. + * </p> + * + * @param <T> type of result to send + */ +public class BufferedResultSender<T> implements ResultSender<T>, AutoCloseable { + + private enum State { + EMPTY, BUFFERING, CLOSED + } + + private final ResultSender<? super T> resultSender; + private T buffer; + private State state = State.EMPTY; + + public static <T> BufferedResultSender<T> buffered(ResultSender<? super T> resultSender) { + return new BufferedResultSender<>(resultSender); + } + + public BufferedResultSender(ResultSender<? super T> resultSender) { + this.resultSender = resultSender; + } + + /** + * Sends buffered result by executing {@link ResultSender#lastResult(Object)} on underlying + * {@link ResultSender}. Since this method comes from {@link AutoCloseable} you may use this + * {@link ResultSender} + * in a try with resource block. + */ + @Override + public void close() { + if (state == State.CLOSED) { + return; + } + + sendLastAndClose(); + } + + /** + * Flush any buffered result by executing {@link ResultSender#sendResult(Object)} on underlying + * {@link ResultSender} and buffer new result. + * + * @param oneResult to buffer for sending. + */ + @Override + public void sendResult(T oneResult) { + if (state == State.CLOSED) { + return; + } + + bufferResult(oneResult); + } + + /** + * Flushes any buffered result and executes {@link ResultSender#lastResult(Object)} on underlying + * {@link ResultSender}. Provided for backwards compatibility with {@link ResultSender} API but + * should not be used directly. + * + * @param lastResult to send + * @deprecated Use {@link #sendResult(Object)} and {@link #close()} + */ + @Override + @Deprecated + public void lastResult(T lastResult) { + sendResult(lastResult); + sendLastAndClose(); + } + + /** + * Flushes any buffered result and executes {@link ResultSender#sendException(Throwable)} on + * underlying + * {@link ResultSender}. + * + * @param throwable to send + */ + @Override + public void sendException(final Throwable throwable) { + if (state == State.CLOSED) { + return; + } + + flushBuffer(); + resultSender.sendException(throwable); + buffer = null; + state = State.CLOSED; + } + + public void sendAllResults(final Collection<? extends T> results) { + if (state == State.CLOSED) { + return; + } + + for (final T result : results) { + bufferResult(result); + } + } + + private void bufferResult(final T oneResult) { + flushBuffer(); + buffer = oneResult; + } + + private void sendLastAndClose() { + resultSender.lastResult(buffer); + buffer = null; + state = State.CLOSED; + } + + private void flushBuffer() { + if (state == State.EMPTY) { + state = State.BUFFERING; + } else { + resultSender.sendResult(buffer); + } + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/cache/execute/BufferedResultSenderTest.java b/geode-core/src/test/java/org/apache/geode/cache/execute/BufferedResultSenderTest.java new file mode 100644 index 0000000..457787d --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/execute/BufferedResultSenderTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.execute; + +import static java.util.Arrays.asList; +import static org.apache.geode.cache.execute.BufferedResultSender.buffered; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; + +import org.junit.Test; + +public class BufferedResultSenderTest { + + @SuppressWarnings("unchecked") + private final ResultSender<Number> mockResultSender = mock(ResultSender.class); + + @Test + public void closeInvokesLastResult() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.close(); + + verify(mockResultSender).lastResult(null); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void closeAfterCloseIsNoop() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.close(); + resultSender.close(); + + verify(mockResultSender).lastResult(null); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void autoCloseInvokesLastResult() { + try (final BufferedResultSender<Integer> resultSender = buffered(mockResultSender)) { + // need to do something here to keep static analyzer happy + assertThat(resultSender).isNotNull(); + } + + verify(mockResultSender).lastResult(null); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void closeFlushesBuffer() { + try (final BufferedResultSender<Integer> resultSender = buffered(mockResultSender)) { + resultSender.sendResult(1); + resultSender.sendResult(2); + } + + verify(mockResultSender).sendResult(1); + verify(mockResultSender).lastResult(2); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void sendResultBuffersFirstResult() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.sendResult(1); + + verifyZeroInteractions(mockResultSender); + } + + @Test + public void sendResultSendsFirstBuffersSecondResult() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.sendResult(1); + resultSender.sendResult(2); + + verify(mockResultSender).sendResult(1); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void sendResultAfterCloseIsNoop() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.close(); + resultSender.sendResult(1); + + verify(mockResultSender).lastResult(null); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + @SuppressWarnings("deprecation") + public void lastResultFlushesAndCloses() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.lastResult(1); + + verify(mockResultSender).lastResult(1); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void sendExceptionWithoutResultsSendsException() { + final Exception throwable = new Exception(); + + try (final BufferedResultSender<Integer> resultSender = buffered(mockResultSender)) { + resultSender.sendException(throwable); + } + + verify(mockResultSender).sendException(throwable); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void sendExceptionWithoutResultFlushesAndSendsException() { + final Exception throwable = new Exception(); + + try (final BufferedResultSender<Integer> resultSender = buffered(mockResultSender)) { + resultSender.sendResult(1); + resultSender.sendException(throwable); + } + + verify(mockResultSender).sendResult(1); + verify(mockResultSender).sendException(throwable); + verifyNoMoreInteractions(mockResultSender); + } + + @Test + public void sendAllResults() { + try (final BufferedResultSender<Integer> resultSender = buffered(mockResultSender)) { + resultSender.sendAllResults(asList(1, 2, 3)); + } + + verify(mockResultSender).sendResult(1); + verify(mockResultSender).sendResult(2); + verify(mockResultSender).lastResult(3); + verifyNoMoreInteractions(mockResultSender); + } + + + @Test + public void sendAllResultsAfterCloseIsNoop() { + final BufferedResultSender<Integer> resultSender = new BufferedResultSender<>(mockResultSender); + + resultSender.close(); + resultSender.sendAllResults(asList(1, 2, 3)); + + verify(mockResultSender).lastResult(null); + verifyNoMoreInteractions(mockResultSender); + } +}