Repository: incubator-reef Updated Branches: refs/heads/master cf4a460fd -> 9c5b9582a
[REEF-728] Complete implementation of ICompletedTask This addressed the issue by * Piping the completed task message from Java to C#. JIRA: [REEF-728](https://issues.apache.org/jira/browse/REEF-728) This closes #524 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9c5b9582 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9c5b9582 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9c5b9582 Branch: refs/heads/master Commit: 9c5b9582ad7a309d63beef9a3e9285cce9ab3e63 Parents: cf4a460 Author: Andrew Chung <[email protected]> Authored: Thu Sep 24 16:52:10 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Wed Sep 30 14:22:54 2015 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 + .../CompletedTaskClr2Java.cpp | 14 +++++ .../Bridge/Clr2java/IClr2JavaTaskMessage.cs | 32 +++++++++++ .../Bridge/Clr2java/ICompletedTaskClr2Java.cs | 2 +- .../Bridge/Clr2java/ISuspendedTaskClr2Java.cs | 8 +-- .../Bridge/Events/CompletedTask.cs | 8 ++- .../Org.Apache.REEF.Driver.csproj | 1 + .../AllHandlers.cs | 1 + .../HelloTaskCompletedHandler.cs | 56 ++++++++++++++++++++ .../Org.Apache.REEF.Examples.AllHandlers.csproj | 1 + .../Bridge/HelloSimpleEventHandlers.cs | 8 ++- .../reef/javabridge/CompletedTaskBridge.java | 16 ++++-- 12 files changed, 134 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 45f2457..b1869eb 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -148,6 +148,7 @@ namespace Org { virtual void OnError(String^ message); virtual IActiveContextClr2Java^ GetActiveContext(); virtual String^ GetId(); + virtual array<byte>^ Get(); }; public ref class SuspendedTaskClr2Java : public ISuspendedTaskClr2Java { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp index be9fe31..18e088d 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/CompletedTaskClr2Java.cpp @@ -65,6 +65,20 @@ namespace Org { JNIEnv *env = RetrieveEnv(_jvm); return ManagedStringFromJavaString(env, _jstringId); } + + array<byte>^ CompletedTaskClr2Java::Get() { + ManagedLog::LOGGER->Log("CompletedTaskClr2Java::GetMessage"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassCompletedTask = env->GetObjectClass(_jobjectCompletedTask); + jmethodID jmidGet = env->GetMethodID(jclassCompletedTask, "get", "()[B"); + + if (jmidGet == NULL) { + ManagedLog::LOGGER->Log("jmidGet is NULL"); + return nullptr; + } + jbyteArray jMessage = (jbyteArray)env->CallObjectMethod(_jobjectCompletedTask, jmidGet); + return ManagedByteArrayFromJavaByteArray(env, jMessage); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs new file mode 100644 index 0000000..e1d1513 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IClr2JavaTaskMessage.cs @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Driver.Bridge.Clr2java +{ + /// <summary> + /// An message sent from a Task to a Driver. + /// </summary> + public interface IClr2JavaTaskMessage + { + /// <summary> + /// Gets returns the byte array task message. + /// </summary> + byte[] Get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs index 5eb2686..049bf30 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ICompletedTaskClr2Java.cs @@ -19,7 +19,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java { - public interface ICompletedTaskClr2Java : IClr2Java + public interface ICompletedTaskClr2Java : IClr2Java, IClr2JavaTaskMessage { IActiveContextClr2Java GetActiveContext(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs index 95bfa8b..7e146f4 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ISuspendedTaskClr2Java.cs @@ -19,7 +19,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java { - public interface ISuspendedTaskClr2Java : IClr2Java + public interface ISuspendedTaskClr2Java : IClr2Java, IClr2JavaTaskMessage { /// <summary> /// get active context the task is running in @@ -32,11 +32,5 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java /// </summary> /// <returns>suspsended task id</returns> string GetId(); - - /// <summary> - /// get the message - /// </summary> - /// <returns>suspended task message</returns> - byte[] Get(); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs index cb44f59..18ed52f 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs @@ -38,7 +38,13 @@ namespace Org.Apache.REEF.Driver.Bridge.Events [DataMember] public string InstanceId { get; set; } - public byte[] Message { get; set; } + public byte[] Message + { + get + { + return CompletedTaskClr2Java.Get(); + } + } public string Id { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index e3e7d53..9fb3613 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -44,6 +44,7 @@ under the License. <Compile Include="Bridge\Clr2java\IAllocatedEvaluaotrClr2Java.cs" /> <Compile Include="Bridge\Clr2java\IClosedContextClr2Java.cs" /> <Compile Include="Bridge\Clr2java\IClr2Java.cs" /> + <Compile Include="Bridge\Clr2java\IClr2JavaTaskMessage.cs" /> <Compile Include="Bridge\Clr2java\ICompletedEvaluatorClr2Java.cs" /> <Compile Include="Bridge\Clr2java\ICompletedTaskClr2Java.cs" /> <Compile Include="Bridge\Clr2java\IContextMessageClr2Java.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs index 3f7a444..330874c 100644 --- a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs @@ -61,6 +61,7 @@ namespace Org.Apache.REEF.Examples.AllHandlers .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<HelloFailedEvaluatorHandler>.Class) .Set(DriverConfiguration.OnTaskFailed, GenericType<HelloFailedTaskHandler>.Class) .Set(DriverConfiguration.OnTaskRunning, GenericType<HelloRunningTaskHandler>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<HelloTaskCompletedHandler>.Class) .Set(DriverConfiguration.OnDriverStarted, GenericType<HelloDriverStartHandler>.Class) .Set(DriverConfiguration.OnHttpEvent, GenericType<HelloHttpHandler>.Class) .Set(DriverConfiguration.OnEvaluatorCompleted, GenericType<HelloCompletedEvaluatorHandler>.Class) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs new file mode 100644 index 0000000..aa035b0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/HelloTaskCompletedHandler.cs @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Examples.AllHandlers +{ + /// <summary> + /// A sample implementation of TaskCompletedHandler + /// </summary> + public sealed class HelloTaskCompletedHandler : IObserver<ICompletedTask> + { + [Inject] + private HelloTaskCompletedHandler() + { + } + + /// <summary> + /// Sample code to print out a completed task's details. + /// </summary> + /// <param name="completedTask"></param> + public void OnNext(ICompletedTask completedTask) + { + Console.WriteLine("Received CompletedTask: {0}, with message [{1}].", completedTask.Id, ByteUtilities.ByteArrarysToString(completedTask.Message)); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj index 3a69437..254627d 100644 --- a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj +++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/Org.Apache.REEF.Examples.AllHandlers.csproj @@ -62,6 +62,7 @@ under the License. <Compile Include="HelloRunningTaskHandler.cs" /> <Compile Include="HelloTaskMessageHandler.cs" /> <Compile Include="HelloTraceListener.cs" /> + <Compile Include="HelloTaskCompletedHandler.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs index 7b3b27c..b5e5e06 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs @@ -20,6 +20,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.Linq; using System.Net; using System.Text; using Org.Apache.REEF.Common.Tasks; @@ -174,7 +175,12 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { using (Logger.LogFunction("HelloSimpleEventHandlers::CompletedTask received")) { - Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received CompletedTask: {0}, task id: {1}", value.Id, _taskContext.CurrentTaskId())); + Logger.Log(Level.Info, "Received CompletedTask: {0}, task id: {1}.", value.Id, _taskContext.CurrentTaskId()); + + var messageStr = value.Message == null || value.Message.Length == 0 ? + string.Empty : ByteUtilities.ByteArrarysToString(value.Message); + Logger.Log(Level.Verbose, "Message received from CompletedTask {0} is: [{1}]", value.Id, messageStr); + _taskContext.UpdateTaskStatus(value.Id, TaskStatus.Completed); _taskContext.TaskCompleted++; SubmitNextTask(value.ActiveContext); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c5b9582/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java index 0c993a9..03b616e 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java @@ -18,15 +18,18 @@ */ package org.apache.reef.javabridge; +import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.task.CompletedTask; +import org.apache.reef.io.Message; -public class CompletedTaskBridge extends NativeBridge { +@Private +public final class CompletedTaskBridge extends NativeBridge implements Message { + /** + * These fields are used by the C++ code. Please do not remove without testing. + */ private final CompletedTask jcompletedTask; - private final String taskId; - - // used by the C++ code private final ActiveContextBridge jactiveContext; public CompletedTaskBridge(final CompletedTask completedTask, final ActiveContextBridgeFactory factory) { @@ -38,4 +41,9 @@ public class CompletedTaskBridge extends NativeBridge { @Override public void close() { } + + @Override + public byte[] get() { + return jcompletedTask.get(); + } }
