Repository: incubator-reef
Updated Branches:
  refs/heads/master cf4a460fd -> 9c5b9582a


[REEF-728] Complete implementation of ICompletedTask

This addressed the issue by
  * Piping the completed task message from Java to C#.

JIRA:
  [REEF-728](https://issues.apache.org/jira/browse/REEF-728)

This closes #524


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9c5b9582
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9c5b9582
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9c5b9582

Branch: refs/heads/master
Commit: 9c5b9582ad7a309d63beef9a3e9285cce9ab3e63
Parents: cf4a460
Author: Andrew Chung <[email protected]>
Authored: Thu Sep 24 16:52:10 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Wed Sep 30 14:22:54 2015 -0700

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h   |  1 +
 .../CompletedTaskClr2Java.cpp                   | 14 +++++
 .../Bridge/Clr2java/IClr2JavaTaskMessage.cs     | 32 +++++++++++
 .../Bridge/Clr2java/ICompletedTaskClr2Java.cs   |  2 +-
 .../Bridge/Clr2java/ISuspendedTaskClr2Java.cs   |  8 +--
 .../Bridge/Events/CompletedTask.cs              |  8 ++-
 .../Org.Apache.REEF.Driver.csproj               |  1 +
 .../AllHandlers.cs                              |  1 +
 .../HelloTaskCompletedHandler.cs                | 56 ++++++++++++++++++++
 .../Org.Apache.REEF.Examples.AllHandlers.csproj |  1 +
 .../Bridge/HelloSimpleEventHandlers.cs          |  8 ++-
 .../reef/javabridge/CompletedTaskBridge.java    | 16 ++++--
 12 files changed, 134 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h 
b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 45f2457..b1869eb 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -148,6 +148,7 @@ namespace Org {
                                                  virtual void OnError(String^ 
message);
                                                  virtual 
IActiveContextClr2Java^ GetActiveContext();
                                                  virtual String^ GetId();
+                                                 virtual array<byte>^ Get();
                                          };
 
                                          public ref class 
SuspendedTaskClr2Java : public ISuspendedTaskClr2Java {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp
index be9fe31..18e088d 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp
@@ -65,6 +65,20 @@ namespace Org {
                                                  JNIEnv *env = 
RetrieveEnv(_jvm);
                                                  return 
ManagedStringFromJavaString(env, _jstringId);
                                          }
+
+                                         array<byte>^ 
CompletedTaskClr2Java::Get() {
+                                                 
ManagedLog::LOGGER->Log("CompletedTaskClr2Java::GetMessage");
+                                                 JNIEnv *env = 
RetrieveEnv(_jvm);
+                                                 jclass jclassCompletedTask = 
env->GetObjectClass(_jobjectCompletedTask);
+                                                 jmethodID jmidGet = 
env->GetMethodID(jclassCompletedTask, "get", "()[B");
+
+                                                 if (jmidGet == NULL) {
+                                                         
ManagedLog::LOGGER->Log("jmidGet is NULL");
+                                                         return nullptr;
+                                                 }
+                                                 jbyteArray jMessage = 
(jbyteArray)env->CallObjectMethod(_jobjectCompletedTask, jmidGet);
+                                                 return 
ManagedByteArrayFromJavaByteArray(env, jMessage);
+                                         }
                                  }
                          }
                  }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs
new file mode 100644
index 0000000..e1d1513
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Driver.Bridge.Clr2java
+{
+    /// <summary>
+    /// An message sent from a Task to a Driver.
+    /// </summary>
+    public interface IClr2JavaTaskMessage
+    {
+        /// <summary>
+        /// Gets returns the byte array task message.
+        /// </summary>
+        byte[] Get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs
index 5eb2686..049bf30 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs
@@ -19,7 +19,7 @@
 
 namespace Org.Apache.REEF.Driver.Bridge.Clr2java
 {
-    public interface ICompletedTaskClr2Java : IClr2Java
+    public interface ICompletedTaskClr2Java : IClr2Java, IClr2JavaTaskMessage
     {
         IActiveContextClr2Java GetActiveContext();
         

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs
index 95bfa8b..7e146f4 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs
@@ -19,7 +19,7 @@
 
 namespace Org.Apache.REEF.Driver.Bridge.Clr2java
 {
-    public interface ISuspendedTaskClr2Java : IClr2Java
+    public interface ISuspendedTaskClr2Java : IClr2Java, IClr2JavaTaskMessage
     {
         /// <summary>
         /// get active context the task is running in
@@ -32,11 +32,5 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
         /// </summary>
         /// <returns>suspsended task id</returns>
         string GetId();
-
-        /// <summary>
-        /// get the message
-        /// </summary>
-        /// <returns>suspended task message</returns>
-        byte[] Get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
index cb44f59..18ed52f 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
@@ -38,7 +38,13 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
         [DataMember]
         public string InstanceId { get; set; }
 
-        public byte[] Message { get; set; }
+        public byte[] Message
+        {
+            get
+            {
+                return CompletedTaskClr2Java.Get();
+            }
+        }
 
         public string Id
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj 
b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index e3e7d53..9fb3613 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -44,6 +44,7 @@ under the License.
     <Compile Include="Bridge\Clr2java\IAllocatedEvaluaotrClr2Java.cs" />
     <Compile Include="Bridge\Clr2java\IClosedContextClr2Java.cs" />
     <Compile Include="Bridge\Clr2java\IClr2Java.cs" />
+    <Compile Include="Bridge\Clr2java\IClr2JavaTaskMessage.cs" />
     <Compile Include="Bridge\Clr2java\ICompletedEvaluatorClr2Java.cs" />
     <Compile Include="Bridge\Clr2java\ICompletedTaskClr2Java.cs" />
     <Compile Include="Bridge\Clr2java\IContextMessageClr2Java.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs 
b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
index 3f7a444..330874c 100644
--- a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
@@ -61,6 +61,7 @@ namespace Org.Apache.REEF.Examples.AllHandlers
                 .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<HelloFailedEvaluatorHandler>.Class)
                 .Set(DriverConfiguration.OnTaskFailed, 
GenericType<HelloFailedTaskHandler>.Class)
                 .Set(DriverConfiguration.OnTaskRunning, 
GenericType<HelloRunningTaskHandler>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<HelloTaskCompletedHandler>.Class)
                 .Set(DriverConfiguration.OnDriverStarted, 
GenericType<HelloDriverStartHandler>.Class)
                 .Set(DriverConfiguration.OnHttpEvent, 
GenericType<HelloHttpHandler>.Class)
                 .Set(DriverConfiguration.OnEvaluatorCompleted, 
GenericType<HelloCompletedEvaluatorHandler>.Class)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs 
b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs
new file mode 100644
index 0000000..aa035b0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Examples.AllHandlers
+{
+    /// <summary>
+    /// A sample implementation of TaskCompletedHandler
+    /// </summary>
+    public sealed class HelloTaskCompletedHandler : IObserver<ICompletedTask>
+    {
+        [Inject]
+        private HelloTaskCompletedHandler()
+        {
+        }
+
+        /// <summary>
+        /// Sample code to print out a completed task's details.
+        /// </summary>
+        /// <param name="completedTask"></param>
+        public void OnNext(ICompletedTask completedTask)
+        {
+            Console.WriteLine("Received CompletedTask: {0}, with message 
[{1}].", completedTask.Id, 
ByteUtilities.ByteArrarysToString(completedTask.Message));
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj
 
b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj
index 3a69437..254627d 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj
+++ 
b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj
@@ -62,6 +62,7 @@ under the License.
     <Compile Include="HelloRunningTaskHandler.cs" />
     <Compile Include="HelloTaskMessageHandler.cs" />
     <Compile Include="HelloTraceListener.cs" />
+    <Compile Include="HelloTaskCompletedHandler.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs
index 7b3b27c..b5e5e06 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs
@@ -20,6 +20,7 @@
 using System;
 using System.Collections.Generic;
 using System.Globalization;
+using System.Linq;
 using System.Net;
 using System.Text;
 using Org.Apache.REEF.Common.Tasks;
@@ -174,7 +175,12 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             using (Logger.LogFunction("HelloSimpleEventHandlers::CompletedTask 
received"))
             {
-                Logger.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Received CompletedTask: {0}, task 
id: {1}", value.Id, _taskContext.CurrentTaskId()));
+                Logger.Log(Level.Info, "Received CompletedTask: {0}, task id: 
{1}.", value.Id, _taskContext.CurrentTaskId());
+                
+                var messageStr = value.Message == null || value.Message.Length 
== 0 ?
+                    string.Empty : 
ByteUtilities.ByteArrarysToString(value.Message);
+                Logger.Log(Level.Verbose, "Message received from CompletedTask 
{0} is: [{1}]", value.Id, messageStr);
+
                 _taskContext.UpdateTaskStatus(value.Id, TaskStatus.Completed);
                 _taskContext.TaskCompleted++;
                 SubmitNextTask(value.ActiveContext);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
index 0c993a9..03b616e 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
@@ -18,15 +18,18 @@
  */
 package org.apache.reef.javabridge;
 
+import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.io.Message;
 
-public class CompletedTaskBridge extends NativeBridge {
+@Private
+public final class CompletedTaskBridge extends NativeBridge implements Message 
{
 
+  /**
+   *  These fields are used by the C++ code. Please do not remove without 
testing.
+   */
   private final CompletedTask jcompletedTask;
-
   private final String taskId;
-
-  // used by the C++ code
   private final ActiveContextBridge jactiveContext;
 
   public CompletedTaskBridge(final CompletedTask completedTask, final 
ActiveContextBridgeFactory factory) {
@@ -38,4 +41,9 @@ public class CompletedTaskBridge extends NativeBridge {
   @Override
   public void close() {
   }
+
+  @Override
+  public byte[] get() {
+    return jcompletedTask.get();
+  }
 }

Reply via email to