abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1953

Change subject: [ASTERIXDB-2049][ING] Fix hang in Start Feed
......................................................................

[ASTERIXDB-2049][ING] Fix hang in Start Feed

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- The hang is caused by one runtime finishes and unregisters
  before another runtime registers. When that happens, the number
  of registered runtimes never reaches the total number of runtimes
  and so the start feed statement doesn't complete.
- To avoid the situation described above, we use different counters
  for registration and deregistration.

Change-Id: I0019f5634009bf924fb37acc78eb796842eef492
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActionSubscriber.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
9 files changed, 252 insertions(+), 22 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/53/1953/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index c0717b9..eaaa85c 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -153,7 +153,8 @@
         ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
         IActiveRuntime runtime = runtimes.get(runtimeId);
         if (runtime == null) {
-            LOGGER.warning("Request to stop a runtime that is not registered " 
+ runtimeId);
+            LOGGER.warning("Request to stop runtime: " + runtimeId
+                    + " that is not registered. Could be that it finished 
running locally ");
         } else {
             executor.execute(() -> {
                 try {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index c6f41bf..79b1cd5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -93,6 +93,7 @@
     protected String stats;
     protected boolean isFetchingStats;
     protected int numRegistered;
+    protected int numDeRegistered;
     protected volatile Future<Void> recoveryTask;
     protected volatile boolean cancelRecovery;
     protected volatile boolean suspended = false;
@@ -123,6 +124,7 @@
         this.runtimeName = runtimeName;
         this.locations = locations;
         this.numRegistered = 0;
+        this.numDeRegistered = 0;
         this.handler =
                 (ActiveNotificationHandler) 
metadataProvider.getApplicationContext().getActiveNotificationHandler();
         handler.registerListener(this);
@@ -177,13 +179,17 @@
                 setState(ActivityState.RUNNING);
             }
         } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
-            numRegistered--;
+            numDeRegistered++;
         }
     }
 
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
         LOGGER.log(level, "the job " + jobId + " finished");
+        if (numRegistered != numDeRegistered) {
+            LOGGER.log(Level.WARNING, "the job " + jobId + " finished with 
registrations = " + numRegistered
+                    + " and deregistrations = " + numDeRegistered);
+        }
         jobId = null;
         Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
         JobStatus jobStatus = status.getLeft();
@@ -202,8 +208,9 @@
     }
 
     protected void start(ActiveEvent event) {
-        this.jobId = event.getJobId();
+        jobId = event.getJobId();
         numRegistered = 0;
+        numDeRegistered = 0;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActionSubscriber.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActionSubscriber.java
new file mode 100644
index 0000000..111f984
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActionSubscriber.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActionSubscriber {
+    private final List<Action> actions = new ArrayList<>();
+    private boolean stop = false;
+
+    public synchronized void beforeSchedule(Action a) {
+        actions.add(a);
+        notifyAll();
+    }
+
+    public synchronized void beforeExecute() throws InterruptedException {
+        while (stop) {
+            wait();
+        }
+    }
+
+    synchronized void stop() {
+        stop = true;
+    }
+
+    synchronized void resume() {
+        stop = false;
+        notifyAll();
+    }
+
+    public List<Action> getActions() {
+        return actions;
+    }
+
+    public synchronized Action get(int i) throws InterruptedException {
+        while (actions.size() <= i) {
+            wait();
+        }
+        return actions.get(i);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index d38a363..c5465b1 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -159,6 +159,36 @@
     }
 
     @Test
+    public void testStartWhenOneNodeFinishesBeforeOtherNodeStarts() throws 
Exception {
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+        listener.onStart(Behavior.SUCCEED);
+        listener.onStop(Behavior.SUCCEED);
+        ActionSubscriber fastSubscriber = new ActionSubscriber();
+        nodeControllers[0].subscribe(fastSubscriber);
+        ActionSubscriber slowSubscriber = new ActionSubscriber();
+        slowSubscriber.stop();
+        nodeControllers[1].subscribe(slowSubscriber);
+        Action startActivityAction = users[0].startActivity(listener);
+        RuntimeRegistration registration = (RuntimeRegistration) 
fastSubscriber.get(0);
+        registration.sync();
+        registration.deregister();
+        Action deregistration = fastSubscriber.get(1);
+        deregistration.sync();
+        // Node 0 has completed registration and deregistration.. unblock node 
1
+        slowSubscriber.resume();
+        registration = (RuntimeRegistration) slowSubscriber.get(0);
+        registration.sync();
+        // now that node 1 is unblocked and completed registration, ensure 
that start has completed
+        startActivityAction.sync();
+        assertSuccess(startActivityAction);
+        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+        Action stopAction = users[0].stopActivity(listener);
+        stopAction.sync();
+        assertSuccess(stopAction);
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+    }
+
+    @Test
     public void testStopWhenStopSucceed() throws Exception {
         testStartWhenStartSucceed();
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
new file mode 100644
index 0000000..0c4b806
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import java.util.Objects;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.job.JobId;
+
+public class RuntimeRegistration extends Action {
+
+    private final TestNodeControllerActor nc;
+    private final JobId jobId;
+    private final EntityId entityId;
+    private final int partition;
+
+    public RuntimeRegistration(TestNodeControllerActor nc, JobId jobId, 
EntityId entityId, int partition) {
+        this.nc = nc;
+        this.jobId = jobId;
+        this.entityId = entityId;
+        this.partition = partition;
+    }
+
+    @Override
+    protected void doExecute(MetadataProvider mdProvider) throws Exception {
+        for (ActionSubscriber subscriber : nc.getSubscribers()) {
+            subscriber.beforeExecute();
+        }
+        ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, 
entityId, new ActivePartitionMessage(
+                new ActiveRuntimeId(entityId, nc.getId(), partition), jobId, 
Event.RUNTIME_REGISTERED, null));
+        nc.getClusterController().activeEvent(event);
+    }
+
+    public Action deregister() {
+        return nc.doDeRegisterRuntime(jobId, entityId, partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jobId, entityId, partition);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof RuntimeRegistration)) {
+            return false;
+        }
+        RuntimeRegistration o = (RuntimeRegistration) obj;
+        return Objects.equals(jobId, o.jobId) && Objects.equals(entityId, 
o.entityId) && partition == o.partition;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index d896995..905df72 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -122,7 +122,7 @@
                     Collections.singletonList(new 
HyracksDataException("RuntimeFailure")));
         } else {
             for (int i = 0; i < nodeControllers.length; i++) {
-                TestNodeControllerActor nodeController = nodeControllers[0];
+                TestNodeControllerActor nodeController = nodeControllers[i];
                 nodeController.registerRuntime(jobId, entityId, i);
             }
         }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index 99499a3..a40bf86 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -18,6 +18,12 @@
  */
 package org.apache.asterix.test.active;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActiveRuntimeId;
@@ -28,9 +34,12 @@
 import org.apache.hyracks.api.job.JobId;
 
 public class TestNodeControllerActor extends Actor {
+    private static final Logger LOGGER = 
Logger.getLogger(TestNodeControllerActor.class.getName());
 
     private final String id;
     private final TestClusterControllerActor clusterController;
+    private final Set<RuntimeRegistration> registrations = new HashSet<>();
+    private final List<ActionSubscriber> subscribers = new ArrayList<>();
 
     public TestNodeControllerActor(String name, TestClusterControllerActor 
clusterController) {
         super("NC: " + name, null);
@@ -39,28 +48,76 @@
     }
 
     public Action registerRuntime(JobId jobId, EntityId entityId, int 
partition) {
-        Action registration = new Action() {
-            @Override
-            protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
-                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
-                        new ActiveRuntimeId(entityId, id, partition), jobId, 
Event.RUNTIME_REGISTERED, null));
-                clusterController.activeEvent(event);
-            }
-        };
+        RuntimeRegistration registration = new RuntimeRegistration(this, 
jobId, entityId, partition);
+        for (ActionSubscriber subscriber : subscribers) {
+            subscriber.beforeSchedule(registration);
+        }
+        registrations.add(registration);
         add(registration);
         return registration;
     }
 
     public Action deRegisterRuntime(JobId jobId, EntityId entityId, int 
partition) {
-        Action registration = new Action() {
+        RuntimeRegistration registration = new RuntimeRegistration(this, 
jobId, entityId, partition);
+        if (registrations.remove(registration)) {
+            return registration.deregister();
+        } else {
+            LOGGER.warning("Request to stop runtime: " + new 
ActiveRuntimeId(entityId, "Test", partition)
+                    + " that is not registered. Could be that it finished 
running locally ");
+            return new Action() {
+                @Override
+                protected void doExecute(MetadataProvider mdProvider) throws 
Exception {
+                }
+
+                @Override
+                public void sync() throws InterruptedException {
+                    return;
+                }
+
+                @Override
+                public boolean isDone() {
+                    return true;
+                }
+            };
+        }
+    }
+
+    public Action doDeRegisterRuntime(JobId jobId, EntityId entityId, int 
partition) {
+        Action deregistration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
+                for (ActionSubscriber subscriber : subscribers) {
+                    subscriber.beforeExecute();
+                }
                 ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
                         new ActiveRuntimeId(entityId, id, partition), jobId, 
Event.RUNTIME_DEREGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };
-        add(registration);
-        return registration;
+        for (ActionSubscriber subscriber : subscribers) {
+            subscriber.beforeSchedule(deregistration);
+        }
+        add(deregistration);
+        return deregistration;
+    }
+
+    public void subscribe(ActionSubscriber subscriber) {
+        subscribers.add(subscriber);
+    }
+
+    public void unsubscribe() {
+        subscribers.clear();
+    }
+
+    public List<ActionSubscriber> getSubscribers() {
+        return subscribers;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public TestClusterControllerActor getClusterController() {
+        return clusterController;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index e190bfa..6ceafc6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -84,7 +84,8 @@
         executor = new ThreadPoolExecutor(numExecutorThreads, 
numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(requestQueueSize),
                 runnable -> new Thread(runnable, "HttpExecutor(port:" + port + 
")-" + threadId.getAndIncrement()));
-        long directMemoryBudget = numExecutorThreads * (long) 
HIGH_WRITE_BUFFER_WATER_MARK;
+        long directMemoryBudget = numExecutorThreads * (long) 
HIGH_WRITE_BUFFER_WATER_MARK
+                + numExecutorThreads * 
HttpServerInitializer.RESPONSE_CHUNK_SIZE;
         LOGGER.log(Level.INFO, "The direct memory budget for this server is " 
+ directMemoryBudget + " bytes");
     }
 
@@ -258,8 +259,8 @@
         return b && (path.length() == cpl || '/' == path.charAt(cpl));
     }
 
-    protected HttpServerHandler createHttpHandler(int chunkSize) {
-        return new HttpServerHandler(this, chunkSize);
+    protected HttpServerHandler<HttpServer> createHttpHandler(int chunkSize) {
+        return new HttpServerHandler<>(this, chunkSize);
     }
 
     public ExecutorService getExecutor() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index 4f8655f..a32da39 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -27,10 +27,10 @@
 
 public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
 
-    private static final int MAX_REQUEST_CHUNK_SIZE = 262144;
-    private static final int MAX_REQUEST_HEADER_SIZE = 262144;
-    private static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
-    private static final int RESPONSE_CHUNK_SIZE = 4096;
+    public static final int MAX_REQUEST_CHUNK_SIZE = 262144;
+    public static final int MAX_REQUEST_HEADER_SIZE = 262144;
+    public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
+    public static final int RESPONSE_CHUNK_SIZE = 4096;
     private HttpServer server;
 
     public HttpServerInitializer(HttpServer server) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1953
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0019f5634009bf924fb37acc78eb796842eef492
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to