DRILL-5721: Update based on merge conflict with DRILL-3449 Note: Resolved Merge Conflict and added certain new tests
closes #919 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6cb626d7 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6cb626d7 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6cb626d7 Branch: refs/heads/master Commit: 6cb626d78de75a1061e1381f680478ed8844123a Parents: b06a7bd Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Tue Sep 19 21:58:39 2017 -0700 Committer: Paul Rogers <prog...@maprtech.com> Committed: Sun Sep 24 21:34:39 2017 -0700 ---------------------------------------------------------------------- .../work/fragment/FragmentStatusReporter.java | 11 +-- .../fragment/FragmentStatusReporterTest.java | 83 +++++++++++++++++--- 2 files changed, 76 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6cb626d7/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java index c0ecb07..c095edf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.control.ControlTunnel; -import org.apache.drill.exec.server.Drillbit; import java.util.concurrent.atomic.AtomicReference; @@ -100,8 +99,7 @@ public class FragmentStatusReporter implements AutoCloseable { case FAILED: // shouldn't get here since fail() should be called. default: - throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", - newState)); + throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState)); } } @@ -109,6 +107,7 @@ public class FragmentStatusReporter implements AutoCloseable { /** * Sends status to remote Foreman node using Control Tunnel or to Local Foreman bypassing * Control Tunnel and using WorkEventBus. + * * @param status */ void sendStatus(final FragmentStatus status) { @@ -116,8 +115,7 @@ public class FragmentStatusReporter implements AutoCloseable { DrillbitEndpoint foremanNode = foremanDrillbit.get(); if (foremanNode == null) { - logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), - status.getProfile().getState(), this); + logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), status.getProfile().getState(), this); return; } @@ -144,8 +142,7 @@ public class FragmentStatusReporter implements AutoCloseable { } @Override - public void close() - { + public void close() { final DrillbitEndpoint foremanNode = foremanDrillbit.getAndSet(null); if (foremanNode != null) { logger.debug("Closing {}", this); http://git-wip-us.apache.org/repos/asf/drill/blob/6cb626d7/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java index d4d198b..29cd6a2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java @@ -18,6 +18,8 @@ */ package org.apache.drill.exec.work.fragment; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.rpc.control.WorkEventBus; import org.junit.Before; import org.junit.Test; @@ -29,9 +31,11 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.rpc.control.ControlTunnel; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -43,19 +47,31 @@ import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING; public class FragmentStatusReporterTest { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporterTest.class); + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporterTest.class); private FragmentStatusReporter statusReporter; - private ControlTunnel tunnel; + + private ControlTunnel foremanTunnel; + + private FragmentContext context; @Before public void setUp() throws Exception { - FragmentContext context = mock(FragmentContext.class); + context = mock(FragmentContext.class); when(context.getStats()).thenReturn(mock(FragmentStats.class)); when(context.getHandle()).thenReturn(FragmentHandle.getDefaultInstance()); when(context.getAllocator()).thenReturn(mock(BufferAllocator.class)); - tunnel = mock(ControlTunnel.class); - statusReporter = new FragmentStatusReporter(context, tunnel); + + // Create 2 different endpoint such that foremanEndpoint is different than + // localEndpoint + DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); + DrillbitEndpoint foremanEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.2").build(); + foremanTunnel = mock(ControlTunnel.class); + + when(context.getForemanEndpoint()).thenReturn(foremanEndpoint); + when(context.getIdentity()).thenReturn(localEndpoint); + when(context.getControlTunnel(foremanEndpoint)).thenReturn(foremanTunnel); + statusReporter = new FragmentStatusReporter(context); } @Test @@ -72,35 +88,80 @@ public class FragmentStatusReporterTest { } } } - verify(tunnel, times(FragmentState.values().length - 2)) /* exclude SENDING and FAILED */ + verify(foremanTunnel, times(FragmentState.values().length - 2)) /* exclude SENDING and FAILED */ .sendFragmentStatus(any(FragmentStatus.class)); } @Test public void testFail() throws Exception { statusReporter.fail(null); - verify(tunnel).sendFragmentStatus(any(FragmentStatus.class)); + verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class)); } @Test public void testClose() throws Exception { statusReporter.close(); - verifyZeroInteractions(tunnel); + verifyZeroInteractions(foremanTunnel); } @Test public void testCloseClosed() throws Exception { statusReporter.close(); statusReporter.close(); - verifyZeroInteractions(tunnel); + verifyZeroInteractions(foremanTunnel); } @Test public void testStateChangedAfterClose() throws Exception { statusReporter.stateChanged(RUNNING); - verify(tunnel).sendFragmentStatus(any(FragmentStatus.class)); + verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class)); statusReporter.close(); statusReporter.stateChanged(CANCELLATION_REQUESTED); - verify(tunnel).sendFragmentStatus(any(FragmentStatus.class)); + verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class)); + } + + + /** + * With LocalEndpoint and Foreman Endpoint being same node, test that status change + * message doesn't happen via Control Tunnel instead it happens locally via WorkEventBus + * + * @throws Exception + */ + @Test + public void testStateChangedLocalForeman() throws Exception { + + DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); + + when(context.getIdentity()).thenReturn(localEndpoint); + when(context.getForemanEndpoint()).thenReturn(localEndpoint); + when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class)); + + statusReporter = new FragmentStatusReporter(context); + statusReporter.stateChanged(RUNNING); + verifyZeroInteractions(foremanTunnel); + verify(context.getWorkEventbus()).statusUpdate(any(FragmentStatus.class)); + } + + /** + * With LocalEndpoint and Foreman Endpoint being same node, test that after close of + * FragmentStatusReporter, status update doesn't happen either through Control Tunnel + * or through WorkEventBus. + * + * @throws Exception + */ + @Test + public void testCloseLocalForeman() throws Exception { + DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); + + when(context.getIdentity()).thenReturn(localEndpoint); + when(context.getForemanEndpoint()).thenReturn(localEndpoint); + when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class)); + statusReporter = new FragmentStatusReporter(context); + + statusReporter.close(); + assertTrue(statusReporter.foremanDrillbit.get() == null); + statusReporter.stateChanged(RUNNING); + verifyZeroInteractions(foremanTunnel); + verify(context.getWorkEventbus(), never()).statusUpdate(any(FragmentStatus.class)); } }