DRILL-5721: Update based on merge conflict with DRILL-3449
            Note: Resolved Merge Conflict and added certain new tests

closes #919


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6cb626d7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6cb626d7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6cb626d7

Branch: refs/heads/master
Commit: 6cb626d78de75a1061e1381f680478ed8844123a
Parents: b06a7bd
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Tue Sep 19 21:58:39 2017 -0700
Committer: Paul Rogers <prog...@maprtech.com>
Committed: Sun Sep 24 21:34:39 2017 -0700

----------------------------------------------------------------------
 .../work/fragment/FragmentStatusReporter.java   | 11 +--
 .../fragment/FragmentStatusReporterTest.java    | 83 +++++++++++++++++---
 2 files changed, 76 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6cb626d7/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index c0ecb07..c095edf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -26,7 +26,6 @@ import 
org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.server.Drillbit;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -100,8 +99,7 @@ public class FragmentStatusReporter implements AutoCloseable 
{
       case FAILED:
         // shouldn't get here since fail() should be called.
       default:
-        throw new IllegalStateException(String.format("Received state changed 
event for unexpected state of %s.",
-            newState));
+        throw new IllegalStateException(String.format("Received state changed 
event for unexpected state of %s.", newState));
     }
   }
 
@@ -109,6 +107,7 @@ public class FragmentStatusReporter implements 
AutoCloseable {
   /**
    * Sends status to remote Foreman node using Control Tunnel or to Local 
Foreman bypassing
    * Control Tunnel and using WorkEventBus.
+   *
    * @param status
    */
   void sendStatus(final FragmentStatus status) {
@@ -116,8 +115,7 @@ public class FragmentStatusReporter implements 
AutoCloseable {
     DrillbitEndpoint foremanNode = foremanDrillbit.get();
 
     if (foremanNode == null) {
-      logger.warn("{}: State {} is not reported as {} is closed", 
QueryIdHelper.getQueryIdentifier(context.getHandle()),
-          status.getProfile().getState(), this);
+      logger.warn("{}: State {} is not reported as {} is closed", 
QueryIdHelper.getQueryIdentifier(context.getHandle()), 
status.getProfile().getState(), this);
       return;
     }
 
@@ -144,8 +142,7 @@ public class FragmentStatusReporter implements 
AutoCloseable {
   }
 
   @Override
-  public void close()
-  {
+  public void close() {
     final DrillbitEndpoint foremanNode = foremanDrillbit.getAndSet(null);
     if (foremanNode != null) {
       logger.debug("Closing {}", this);

http://git-wip-us.apache.org/repos/asf/drill/blob/6cb626d7/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
index d4d198b..29cd6a2 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -29,9 +31,11 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -43,19 +47,31 @@ import static 
org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
 
 public class FragmentStatusReporterTest {
 
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentStatusReporterTest.class);
+  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentStatusReporterTest.class);
 
   private FragmentStatusReporter statusReporter;
-  private ControlTunnel tunnel;
+
+  private ControlTunnel foremanTunnel;
+
+  private FragmentContext context;
 
   @Before
   public void setUp() throws Exception {
-    FragmentContext context = mock(FragmentContext.class);
+    context = mock(FragmentContext.class);
     when(context.getStats()).thenReturn(mock(FragmentStats.class));
     when(context.getHandle()).thenReturn(FragmentHandle.getDefaultInstance());
     when(context.getAllocator()).thenReturn(mock(BufferAllocator.class));
-    tunnel = mock(ControlTunnel.class);
-    statusReporter = new FragmentStatusReporter(context, tunnel);
+
+    // Create 2 different endpoint such that foremanEndpoint is different than
+    // localEndpoint
+    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
+    DrillbitEndpoint foremanEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.2").build();
+    foremanTunnel = mock(ControlTunnel.class);
+
+    when(context.getForemanEndpoint()).thenReturn(foremanEndpoint);
+    when(context.getIdentity()).thenReturn(localEndpoint);
+    when(context.getControlTunnel(foremanEndpoint)).thenReturn(foremanTunnel);
+    statusReporter = new FragmentStatusReporter(context);
   }
 
   @Test
@@ -72,35 +88,80 @@ public class FragmentStatusReporterTest {
         }
       }
     }
-    verify(tunnel, times(FragmentState.values().length - 2)) /* exclude 
SENDING and FAILED */
+    verify(foremanTunnel, times(FragmentState.values().length - 2)) /* exclude 
SENDING and FAILED */
         .sendFragmentStatus(any(FragmentStatus.class));
   }
 
   @Test
   public void testFail() throws Exception {
     statusReporter.fail(null);
-    verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+    verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class));
   }
 
   @Test
   public void testClose() throws Exception {
     statusReporter.close();
-    verifyZeroInteractions(tunnel);
+    verifyZeroInteractions(foremanTunnel);
   }
 
   @Test
   public void testCloseClosed() throws Exception {
     statusReporter.close();
     statusReporter.close();
-    verifyZeroInteractions(tunnel);
+    verifyZeroInteractions(foremanTunnel);
   }
 
   @Test
   public void testStateChangedAfterClose() throws Exception {
     statusReporter.stateChanged(RUNNING);
-    verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+    verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class));
     statusReporter.close();
     statusReporter.stateChanged(CANCELLATION_REQUESTED);
-    verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+    verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class));
+  }
+
+
+  /**
+   * With LocalEndpoint and Foreman Endpoint being same node, test that status 
change
+   * message doesn't happen via Control Tunnel instead it happens locally via 
WorkEventBus
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testStateChangedLocalForeman() throws Exception {
+
+    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
+
+    when(context.getIdentity()).thenReturn(localEndpoint);
+    when(context.getForemanEndpoint()).thenReturn(localEndpoint);
+    when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class));
+
+    statusReporter = new FragmentStatusReporter(context);
+    statusReporter.stateChanged(RUNNING);
+    verifyZeroInteractions(foremanTunnel);
+    verify(context.getWorkEventbus()).statusUpdate(any(FragmentStatus.class));
+  }
+
+  /**
+   * With LocalEndpoint and Foreman Endpoint being same node, test that after 
close of
+   * FragmentStatusReporter, status update doesn't happen either through 
Control Tunnel
+   * or through WorkEventBus.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCloseLocalForeman() throws Exception {
+    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
+
+    when(context.getIdentity()).thenReturn(localEndpoint);
+    when(context.getForemanEndpoint()).thenReturn(localEndpoint);
+    when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class));
+    statusReporter = new FragmentStatusReporter(context);
+
+    statusReporter.close();
+    assertTrue(statusReporter.foremanDrillbit.get() == null);
+    statusReporter.stateChanged(RUNNING);
+    verifyZeroInteractions(foremanTunnel);
+    verify(context.getWorkEventbus(), 
never()).statusUpdate(any(FragmentStatus.class));
   }
 }

Reply via email to