Repository: reef
Updated Branches:
  refs/heads/master 6f482d508 -> 641cb59c0


[REEF-1431] Validate Task Message Receive failure => FailedEvaluator Event

This addressed the issue by
  * Changing the Evaluator to fail when throwing an Exception
    in the Driver message handler and adding a test.

JIRA:
  [REEF-1431](https://issues.apache.org/jira/browse/REEF-1431)

Pull request:
  This closes #1057


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/641cb59c
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/641cb59c
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/641cb59c

Branch: refs/heads/master
Commit: 641cb59c0ce4f1a7b05b467e792bd3f28433baa1
Parents: 6f482d5
Author: Andrew Chung <[email protected]>
Authored: Mon Jun 27 13:41:33 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Thu Jun 30 15:33:31 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  30 ++--
 .../Functional/Common/EventMonitor.cs           |  59 ++++++++
 .../Functional/Common/Task/WaitingTask.cs       |  67 +++++++++
 .../User/ReceiveTaskMessageExceptionTest.cs     | 147 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   3 +
 5 files changed, 286 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/641cb59c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 7c94615..a6480e8 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -230,16 +230,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an 
task that is in {0} state. Ignored.", _currentStatus.State));
                 return;
             }
-            try
-            {
-                OnNext(new DriverMessageImpl(message));
-            }
-            catch (Exception e)
-            {
-                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during message delivery.", Logger);
-                _currentStatus.SetException(
-                    TaskClientCodeException.Create(TaskId, ContextId, "Error 
during message delivery.", e));
-            }
+
+            OnNext(new DriverMessageImpl(message));
         }
 
         public void OnNext(ICloseEvent value)
@@ -262,23 +254,21 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             }
         }
 
+        /// <summary>
+        /// Call Handle on the user's DriverMessageHandler.
+        /// If the user's handler throws an Exception, the Exception will 
bubble up as
+        /// an Evaluator Exception and fail the Evaluator.
+        /// </summary>
         public void OnNext(IDriverMessage value)
         {
-            Logger.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage 
value)");
+            Logger.Log(Level.Verbose, "TaskRuntime::OnNext(IDriverMessage 
value)");
 
             if (!_driverMessageHandler.IsPresent())
             {
                 return;
             }
-            try
-            {
-                _driverMessageHandler.Value.Handle(value);
-            }
-            catch (Exception e)
-            {
-                Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, 
"Exception throw when handling driver message: " + e, Logger);
-                _currentStatus.SetException(e);
-            }
+            
+            _driverMessageHandler.Value.Handle(value);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/641cb59c/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventMonitor.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventMonitor.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventMonitor.cs
new file mode 100644
index 0000000..598ef3a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventMonitor.cs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Tests.Functional.Common
+{
+    /// <summary>
+    /// An test EventMonitor that simply wraps around a <see 
cref="ManualResetEventSlim"/>.
+    /// </summary>
+    public sealed class EventMonitor
+    {
+        private readonly ManualResetEventSlim _eventHandle = new 
ManualResetEventSlim();
+
+        [Inject]
+        private EventMonitor()
+        {
+        }
+
+        /// <summary>
+        /// Sets the Event.
+        /// </summary>
+        public void Signal()
+        {
+            _eventHandle.Set();
+        }
+
+        /// <summary>
+        /// Waits for the event signal.
+        /// </summary>
+        public void Wait()
+        {
+            _eventHandle.Wait();
+        }
+
+        /// <summary>
+        /// Resets the event signal.
+        /// </summary>
+        public void Reset()
+        {
+            _eventHandle.Reset();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/641cb59c/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/WaitingTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/WaitingTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/WaitingTask.cs
new file mode 100644
index 0000000..4894cfd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/WaitingTask.cs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Common.Task
+{
+    /// <summary>
+    /// A helper test class that implements <see cref="ITask"/>, which logs
+    /// messages provided by the caller of the constructor and waits for an 
+    /// <see cref="EventMonitor"/> to be signaled.
+    /// </summary>
+    public abstract class WaitingTask : ITask
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(WaitingTask));
+
+        private readonly string _messageToLogPreWait;
+        private readonly string _messageToLogPostWait;
+        private readonly EventMonitor _eventMonitor;
+
+        protected WaitingTask(
+            EventMonitor eventMonitor,
+            string messageToLogPreWait = null,
+            string messageToLogPostWait = null)
+        {
+            _eventMonitor = eventMonitor;
+            _messageToLogPreWait = messageToLogPreWait;
+            _messageToLogPostWait = messageToLogPostWait;
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            if (!string.IsNullOrWhiteSpace(_messageToLogPreWait))
+            {
+                Logger.Log(Level.Info, _messageToLogPreWait);
+            }
+
+            _eventMonitor.Wait();
+
+            if (!string.IsNullOrWhiteSpace(_messageToLogPostWait))
+            {
+                Logger.Log(Level.Info, _messageToLogPostWait);
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/641cb59c/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/ReceiveTaskMessageExceptionTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/ReceiveTaskMessageExceptionTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/ReceiveTaskMessageExceptionTest.cs
new file mode 100644
index 0000000..59c57b0
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/ReceiveTaskMessageExceptionTest.cs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Text;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions;
+using Org.Apache.REEF.Tests.Functional.Common;
+using Org.Apache.REEF.Tests.Functional.Common.Task;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Failure.User
+{
+    [Collection("FunctionalTests")]
+    public sealed class ReceiveTaskMessageExceptionTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(ReceiveTaskMessageExceptionTest));
+
+        private static readonly string TaskId = "TaskId";
+        private static readonly string ExpectedExceptionMessage = 
"ExpectedExceptionMessage";
+        private static readonly string ReceivedFailedEvaluator = 
"ReceivedFailedEvaluator";
+
+        /// <summary>
+        /// This test validates that a failure in the IDriverMessageHandler 
results in a FailedEvaluator.
+        /// </summary>
+        [Fact]
+        public void TestReceiveTaskMessageException()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+
+            TestRun(DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<TestReceiveTaskMessageExceptionDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<TestReceiveTaskMessageExceptionDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<TestReceiveTaskMessageExceptionDriver>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, 
GenericType<TestReceiveTaskMessageExceptionDriver>.Class)
+                .Build(),
+                typeof(TestReceiveTaskMessageExceptionDriver), 1, 
"ReceiveTaskMessageExceptionTest", "local", testFolder);
+
+            ValidateSuccessForLocalRuntime(0, 0, 1, testFolder);
+            
ValidateMessageSuccessfullyLoggedForDriver(ReceivedFailedEvaluator, testFolder);
+            CleanUp(testFolder);
+        }
+
+        private sealed class TestReceiveTaskMessageExceptionDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IFailedEvaluator>,
+            IObserver<IRunningTask>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private TestReceiveTaskMessageExceptionDriver(IEvaluatorRequestor 
requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitTask(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, TaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<TestTask>.Class)
+                        .Set(TaskConfiguration.OnMessage, 
GenericType<ReceiveTaskMessageExceptionHandler>.Class)
+                        .Build());
+            }
+
+            /// <summary>
+            /// Throwing an Exception in a Driver message handler will result 
in a Failed Evaluator.
+            /// We check for the Task ID and Exception type here.
+            /// </summary>
+            public void OnNext(IFailedEvaluator value)
+            {
+                Assert.Equal(1, value.FailedContexts.Count);
+                Assert.True(value.FailedTask.IsPresent());
+                Assert.Equal(TaskId, value.FailedTask.Value.Id);
+                Assert.NotNull(value.EvaluatorException.InnerException);
+                Assert.True(value.EvaluatorException.InnerException is 
TestSerializableException);
+                Assert.Equal(ExpectedExceptionMessage, 
value.EvaluatorException.InnerException.Message);
+
+                Logger.Log(Level.Info, ReceivedFailedEvaluator);
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                value.Send(Encoding.UTF8.GetBytes("Hello from Driver!"));
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class ReceiveTaskMessageExceptionHandler : 
IDriverMessageHandler
+        {
+            [Inject]
+            private ReceiveTaskMessageExceptionHandler()
+            {
+            }
+
+            public void Handle(IDriverMessage message)
+            {
+                throw new TestSerializableException(ExpectedExceptionMessage);
+            }
+        }
+
+        private sealed class TestTask : WaitingTask
+        {
+            [Inject]
+            private TestTask(EventMonitor eventMonitor) : base(eventMonitor)
+            {
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/641cb59c/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index c76a0f4..aed5d5c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -79,10 +79,13 @@ under the License.
     <Compile Include="Functional\Bridge\TestCloseTask.cs" />
     <Compile Include="Functional\Bridge\TestContextStack.cs" />
     <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" />
+    <Compile Include="Functional\Common\EventMonitor.cs" />
     <Compile Include="Functional\Common\Task\ExceptionTask.cs" />
     <Compile 
Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" />
     <Compile 
Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\ContextStartExceptionTest.cs" />
+    <Compile Include="Functional\Common\Task\WaitingTask.cs" />
+    <Compile 
Include="Functional\Failure\User\ReceiveTaskMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\TaskCallExceptionTest.cs" />
     <Compile 
Include="Functional\Bridge\Exceptions\TestNonSerializableException.cs" />
     <Compile 
Include="Functional\Bridge\Exceptions\TestSerializableException.cs" />

Reply via email to