Repository: incubator-reef Updated Branches: refs/heads/master 75f25a267 -> 5d5771456
[REEF-277] Remove unused code paths in the C# Driver This removes traces of a planned, but never executed, full Driver implementation in C#. JIRA: [REEF-277](https://issues.apache.org/jira/browse/REEF-277) Pull Request: This closes #192 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/5d577145 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/5d577145 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/5d577145 Branch: refs/heads/master Commit: 5d5771456bae780b4f0781cf6a2e0d75addc5694 Parents: 75f25a2 Author: Yingda Chen <[email protected]> Authored: Sat May 23 10:51:52 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Sun May 24 08:32:42 2015 -0700 ---------------------------------------------------------------------- .../Api/IResourceLaunchHandler.cs | 29 - .../Api/IResourceReleaseHandler.cs | 29 - .../Api/IResourceRequestHandler.cs | 29 - .../ClientJobStatusHandler.cs | 141 ---- .../EvaluatorHeartBeatSanityChecker.cs | 56 -- .../Org.Apache.REEF.Common.csproj | 5 - lang/cs/Org.Apache.REEF.Driver/ClientManager.cs | 44 -- .../Context/EvaluatorContext.cs | 148 ----- lang/cs/Org.Apache.REEF.Driver/DriverManager.cs | 537 --------------- .../DriverRuntimeConfiguration.cs | 62 -- .../DriverRuntimeConfigurationOptions.cs | 44 -- .../Org.Apache.REEF.Driver/EvaluatorManager.cs | 653 ------------------- .../Org.Apache.REEF.Driver.csproj | 7 - .../Task/RunningTaskImpl.cs | 114 ---- 14 files changed, 1898 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs b/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs deleted file mode 100644 index 53c0b8b..0000000 --- a/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; - -namespace Org.Apache.REEF.Common.Api -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public interface IResourceLaunchHandler : IObserver<ResourceLaunchProto> - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs b/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs deleted file mode 100644 index 6712634..0000000 --- a/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; - -namespace Org.Apache.REEF.Common.Api -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public interface IResourceReleaseHandler : IObserver<ResourceReleaseProto> - { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs b/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs deleted file mode 100644 index 4668f7a..0000000 --- a/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; - -namespace Org.Apache.REEF.Common.Api -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public interface IResourceRequestHandler : IObserver<ResourceRequestProto> - { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs b/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs deleted file mode 100644 index 05e4669..0000000 --- a/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Time; -using Org.Apache.REEF.Wake.Time.Event; - -namespace Org.Apache.REEF.Common -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class ClientJobStatusHandler : IJobMessageObserver, IObserver<StartTime> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClientJobStatusHandler)); - - private readonly IClock _clock; - - private readonly string _jobId; - - private readonly IObserver<JobStatusProto> _jobStatusHandler; - - private readonly IDisposable _jobControlChannel; - - State _state = State.INIT; - - public ClientJobStatusHandler( - IRemoteManager<IRemoteMessage<REEFMessage>> remoteManager, - IClock clock, - IObserver<JobControlProto> jobControlHandler, - string jobId, - string clientRID) - { - _clock = clock; - _jobId = jobId; - _jobStatusHandler = null; - _jobControlChannel = null; - //_jobStatusHandler = remoteManager.GetRemoteObserver() - //_jobControlChannel = remoteManager.RegisterObserver() - } - - public void Dispose(Optional<Exception> e) - { - try - { - if (e.IsPresent()) - { - OnError(e.Value); - } - else - { - JobStatusProto proto = new JobStatusProto(); - proto.identifier = _jobId; - proto.state = State.DONE; - Send(proto); - } - } - catch (Exception ex) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Warning, "Error closing ClientJobStatusHandler", LOGGER); - } - - try - { - _jobControlChannel.Dispose(); - } - catch (Exception ex) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Warning, "Error closing jobControlChannel", LOGGER); - } - } - - public void OnNext(byte[] value) - { - LOGGER.Log(Level.Info, "Job message from {0}" + _jobId); - SendInit(); - JobStatusProto proto = new JobStatusProto(); - proto.identifier = _jobId; - proto.state = State.RUNNING; - proto.message = value; - Send(proto); - } - - public void OnNext(StartTime value) - { - LOGGER.Log(Level.Info, "StartTime:" + value); - SendInit(); - } - - public void OnError(Exception error) - { - LOGGER.Log(Level.Error, "job excemption", error); - JobStatusProto proto = new JobStatusProto(); - proto.identifier = _jobId; - proto.state = State.FAILED; - proto.exception = ByteUtilities.StringToByteArrays(error.Message); - _clock.Dispose(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - private void Send(JobStatusProto status) - { - LOGGER.Log(Level.Info, "Sending job status " + status); - _jobStatusHandler.OnNext(status); - } - - private void SendInit() - { - if (_state == State.INIT) - { - JobStatusProto proto = new JobStatusProto(); - proto.identifier = _jobId; - proto.state = State.INIT; - Send(proto); - _state = State.RUNNING; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs b/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs deleted file mode 100644 index c8dea3b..0000000 --- a/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Common -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class EvaluatorHeartBeatSanityChecker - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorHeartBeatSanityChecker)); - - readonly Dictionary<string, long> _timeStamps = new Dictionary<string, long>(); - - public void check(string id, long timeStamp) - { - lock (this) - { - if (_timeStamps.ContainsKey(id)) - { - long oldTimeStamp = _timeStamps[id]; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "TIMESTAMP CHECKER: id [{0}], old timestamp [{1}], new timestamp [{2}]", id, oldTimeStamp, timeStamp)); - if (oldTimeStamp > timeStamp) - { - string msg = string.Format( - CultureInfo.InvariantCulture, - "Received an old heartbeat with timestamp [{0}] while timestamp [{1}] was received earlier", - oldTimeStamp, - timeStamp); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(msg), LOGGER); - } - } - _timeStamps.Add(id, timeStamp); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 7bd929a..63843c4 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -54,9 +54,6 @@ under the License. <Compile Include="Api\AbstractFailure.cs" /> <Compile Include="Api\IAbstractFailure.cs" /> <Compile Include="Api\IFailure.cs" /> - <Compile Include="Api\IResourceLaunchHandler.cs" /> - <Compile Include="Api\IResourceReleaseHandler.cs" /> - <Compile Include="Api\IResourceRequestHandler.cs" /> <Compile Include="Avro\AvroDriverInfo.cs" /> <Compile Include="Avro\AvroHttpRequest.cs" /> <Compile Include="Avro\AvroHttpSerializer.cs" /> @@ -72,13 +69,11 @@ under the License. <Compile Include="Catalog\NodeDescriptorImpl.cs" /> <Compile Include="Catalog\RackDescriptorImpl.cs" /> <Compile Include="Catalog\ResourceCatalogImpl.cs" /> - <Compile Include="ClientJobStatusHandler.cs" /> <Compile Include="Constants.cs" /> <Compile Include="Context\ContextMessage.cs" /> <Compile Include="Context\IContextMessage.cs" /> <Compile Include="Context\IContextMessageHandler.cs" /> <Compile Include="Context\IContextMessageSource.cs" /> - <Compile Include="EvaluatorHeartBeatSanityChecker.cs" /> <Compile Include="Evaluator\DefaultLocalHttpDriverConnection.cs" /> <Compile Include="Evaluator\DefaultYarnClusterHttpDriverConnection.cs" /> <Compile Include="Evaluator\DefaultYarnOneBoxHttpDriverConnection.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs b/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs deleted file mode 100644 index 09f4e21..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; - -// TODO -namespace Org.Apache.REEF.Driver -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class ClientManager : IObserver<JobControlProto> - { - public void OnNext(JobControlProto value) - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs b/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs deleted file mode 100644 index 32009de..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; -using Org.Apache.REEF.Driver.Bridge.Events; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Driver.Context -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class EvaluatorContext : IActiveContext - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorContext)); - - private readonly string _identifier; - - private readonly Optional<string> _parentId; - - private readonly EvaluatorManager _evaluatorManager; - - private bool _disposed = false; - - public EvaluatorContext(EvaluatorManager evaluatorManager, string id, Optional<string> parentId) - { - _identifier = id; - _parentId = parentId; - _evaluatorManager = evaluatorManager; - } - - public string Id - { - get - { - return _identifier; - } - - set - { - } - } - - public string EvaluatorId - { - get - { - return _evaluatorManager.Id; - } - - set - { - } - } - - public Optional<string> ParentId - { - get - { - return _parentId; - } - - set - { - } - } - - public IEvaluatorDescriptor EvaluatorDescriptor - { - get - { - return _evaluatorManager.EvaluatorDescriptor; - } - - set - { - } - } - - public void Dispose() - { - if (_disposed) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Active context [{0}] already closed", _identifier)); - Exceptions.Throw(e, LOGGER); - } - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submit close context: RunningEvaluator id [{0}] for context id [{1}]", EvaluatorId, Id)); - RemoveContextProto removeContextProto = new RemoveContextProto(); - removeContextProto.context_id = Id; - ContextControlProto contextControlProto = new ContextControlProto(); - contextControlProto.remove_context = removeContextProto; - _evaluatorManager.Handle(contextControlProto); - _disposed = true; - } - - public ClosedContext GetClosedContext(IActiveContext parentContext) - { - //return new ClosedContext(parentContext, EvaluatorId, Id, ParentId, EvaluatorDescriptor); - throw new NotImplementedException(); - } - - public FailedContext GetFailedContext(Optional<IActiveContext> parentContext, Exception cause) - { - //return new FailedContext(parentContext, Id, cause, EvaluatorId, ParentId, EvaluatorDescriptor); - throw new NotImplementedException(); - } - - public void SubmitTask(IConfiguration taskConf) - { - throw new NotImplementedException(); - } - - public void SubmitContext(IConfiguration contextConfiguration) - { - throw new NotImplementedException(); - } - - public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) - { - throw new NotImplementedException(); - } - - public void SendMessage(byte[] message) - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs b/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs deleted file mode 100644 index a3b53ed..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs +++ /dev/null @@ -1,537 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using Org.Apache.REEF.Common; -using Org.Apache.REEF.Common.Api; -using Org.Apache.REEF.Common.Catalog; -using Org.Apache.REEF.Common.Evaluator; -using Org.Apache.REEF.Common.Exceptions; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Implementations.InjectionPlan; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Time; -using Org.Apache.REEF.Wake.Time.Runtime.Event; - -namespace Org.Apache.REEF.Driver -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class DriverManager : - IEvaluatorRequestor, - IObserver<RuntimeStatusProto>, - IObserver<ResourceStatusProto>, - IObserver<ResourceAllocationProto>, - IObserver<NodeDescriptorProto>, - IObserver<RuntimeStart>, - IObserver<RuntimeStop>, - IObserver<IdleClock> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager)); - - private readonly IInjector _injector; - - private readonly IInjectionFuture<IClock> _clockFuture; - - private readonly ResourceCatalogImpl _resourceCatalog; - - private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler; - - private readonly Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>(); - - private readonly EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker(); - - private readonly ClientJobStatusHandler _clientJobStatusHandler; - - private readonly IDisposable _heartbeatConnectionChannel; - - private readonly IDisposable _errorChannel; - - private readonly IObserver<RuntimeErrorProto> _runtimeErrorHandler; - - public DriverManager( - IInjector injector, - ResourceCatalogImpl resourceCatalog, - IRemoteManager<REEFMessage> remoteManager, - IInjectionFuture<IClock> clockFuture, - IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler, - ClientJobStatusHandler clientJobStatusHandler, - string clientRId) - { - _injector = injector; - _clockFuture = clockFuture; - _resourceCatalog = resourceCatalog; - _futureResourceRequestHandler = futureResourceRequestHandler; - _clientJobStatusHandler = clientJobStatusHandler; - - _heartbeatConnectionChannel = null; - _errorChannel = null; - _runtimeErrorHandler = null; - LOGGER.Log(Level.Info, "DriverManager instantiated"); - } - - public IResourceCatalog ResourceCatalog - { - get - { - return _resourceCatalog; - } - - set - { - } - } - - private RuntimeStatusProto _runtimeStatusProto - { - get - { - RuntimeStatusProto proto = new RuntimeStatusProto(); - proto.state = State.INIT; - proto.name = "REEF"; - proto.outstanding_container_requests = 0; - return proto; - } - - set - { - _runtimeStatusProto = value; - } - } - - public void Submit(IEvaluatorRequest request) - { - LOGGER.Log(Level.Info, "Got an EvaluatorRequest"); - ResourceRequestProto proto = new ResourceRequestProto(); - //TODO: request.size deprecated should use megabytes instead - //switch (request.Size) - //{ - // case EvaluatorRequest.EvaluatorSize.SMALL: - // proto.resource_size = SIZE.SMALL; - // break; - // case EvaluatorRequest.EvaluatorSize.MEDIUM: - // proto.resource_size = SIZE.MEDIUM; - // break; - // case EvaluatorRequest.EvaluatorSize.LARGE: - // proto.resource_size = SIZE.LARGE; - // break; - // case EvaluatorRequest.EvaluatorSize.XLARGE: - // proto.resource_size = SIZE.XLARGE; - // break; - // default: - // throw new InvalidOperationException("invalid request size" + request.Size); - //} - proto.resource_count = request.Number; - if (request.MemoryMegaBytes > 0) - { - proto.memory_size = request.MemoryMegaBytes; - } - - //final ResourceCatalog.Descriptor descriptor = req.getDescriptor(); - //if (descriptor != null) { - // if (descriptor instanceof RackDescriptor) { - // request.addRackName(descriptor.getName()); - // } else if (descriptor instanceof NodeDescriptor) { - // request.addNodeName(descriptor.getName()); - // } - //} - - //_futureResourceRequestHandler.Get().OnNext(proto); - } - - public void Release(EvaluatorManager evaluatorManager) - { - lock (this) - { - string evaluatorManagerId = evaluatorManager.Id; - if (_evaluators.ContainsKey(evaluatorManagerId)) - { - _evaluators.Remove(evaluatorManagerId); - } - else - { - var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId); - Exceptions.Throw(e, LOGGER); - } - } - } - - /// <summary> - /// This handles runtime error occurs on the evaluator - /// </summary> - /// <param name="runtimeErrorProto"></param> - public void Handle(RuntimeErrorProto runtimeErrorProto) - { - FailedRuntime error = new FailedRuntime(runtimeErrorProto); - LOGGER.Log(Level.Warning, "Runtime error:" + error); - - EvaluatorException evaluatorException = error.Cause != null - ? new EvaluatorException(error.Id, error.Cause.Value) - : new EvaluatorException(error.Id, "Runtime error"); - EvaluatorManager evaluatorManager = null; - lock (_evaluators) - { - if (_evaluators.ContainsKey(error.Id)) - { - evaluatorManager = _evaluators[error.Id]; - } - else - { - LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause); - } - } - if (null != evaluatorManager) - { - evaluatorManager.Handle(evaluatorException); - } - } - - /// <summary> - /// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status - /// </summary> - /// <param name="runtimeStatusProto"></param> - public void OnNext(RuntimeStatusProto runtimeStatusProto) - { - Handle(runtimeStatusProto); - } - - /// <summary> - /// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks - /// about the current state of a given resource. Ideally, we should think the same thing. - /// </summary> - /// <param name="resourceStatusProto"></param> - public void OnNext(ResourceStatusProto resourceStatusProto) - { - Handle(resourceStatusProto); - } - - /// <summary> - /// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer. - /// </summary> - /// <param name="resourceAllocationProto"></param> - public void OnNext(ResourceAllocationProto resourceAllocationProto) - { - Handle(resourceAllocationProto); - } - - /// <summary> - /// A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog - /// so that clients can make resource requests against it. - /// </summary> - /// <param name="nodeDescriptorProto"></param> - public void OnNext(NodeDescriptorProto nodeDescriptorProto) - { - _resourceCatalog.Handle(nodeDescriptorProto); - } - - /// <summary> - /// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance - /// point to REEF. - /// </summary> - /// <param name="runtimeStart"></param> - public void OnNext(RuntimeStart runtimeStart) - { - LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart); - _runtimeStatusProto = new RuntimeStatusProto(); - _runtimeStatusProto.state = State.RUNNING; - _runtimeStatusProto.name = "REEF"; - _runtimeStatusProto.outstanding_container_requests = 0; - } - - /// <summary> - /// Handles RuntimeStop - /// </summary> - /// <param name="runtimeStop"></param> - public void OnNext(RuntimeStop runtimeStop) - { - LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop); - if (runtimeStop.Exception != null) - { - string exceptionMessage = runtimeStop.Exception.Message; - LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage); - RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto(); - runtimeErrorProto.message = exceptionMessage; - runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage); - runtimeErrorProto.name = "REEF"; - _runtimeErrorHandler.OnNext(runtimeErrorProto); - - LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage); - } - - lock (_evaluators) - { - foreach (EvaluatorManager evaluatorManager in _evaluators.Values) - { - LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id); - evaluatorManager.Dispose(); - } - } - - try - { - _heartbeatConnectionChannel.Dispose(); - _errorChannel.Dispose(); - Optional<Exception> e = runtimeStop.Exception != null ? - Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty(); - _clientJobStatusHandler.Dispose(e); - - LOGGER.Log(Level.Info, "driver manager closed"); - } - catch (Exception e) - { - Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER); - Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER); - } - } - - public void OnNext(IdleClock value) - { - string message = string.Format( - CultureInfo.InvariantCulture, - "IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]", - value + Environment.NewLine, - _runtimeStatusProto.state + Environment.NewLine, - _runtimeStatusProto.outstanding_container_requests + Environment.NewLine, - _runtimeStatusProto.container_allocation.Count); - LOGGER.Log(Level.Info, message); - - lock (_evaluators) - { - if (_runtimeStatusProto.state == State.RUNNING - && _runtimeStatusProto.outstanding_container_requests == 0 - && _runtimeStatusProto.container_allocation.Count == 0) - { - LOGGER.Log(Level.Info, "Idle runtime shutdown"); - _clockFuture.Get().Dispose(); - } - } - } - - void IObserver<IdleClock>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<IdleClock>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStart>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStart>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<NodeDescriptorProto>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<NodeDescriptorProto>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<ResourceAllocationProto>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<ResourceAllocationProto>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<ResourceStatusProto>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<ResourceStatusProto>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStatusProto>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStatusProto>.OnCompleted() - { - throw new NotImplementedException(); - } - - /// <summary> - /// Something went wrong at the runtime layer (either driver or evaluator). This - /// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler. - /// </summary> - /// <param name="runtimeErrorProto"></param> - private void Fail(RuntimeErrorProto runtimeErrorProto) - { - _runtimeErrorHandler.OnNext(runtimeErrorProto); - _clockFuture.Get().Dispose(); - } - - /// <summary> - /// Helper method to create a new EvaluatorManager instance - /// </summary> - /// <param name="id">identifier of the Evaluator</param> - /// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param> - /// <returns>new EvaluatorManager instance.</returns> - private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor) - { - LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id); - //TODO bindVolatieParameter - return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager)); - } - - /// <summary> - /// Receives and routes heartbeats from Evaluators. - /// </summary> - /// <param name="evaluatorHearBeatProto"></param> - private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto) - { - EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message; - EvaluatorStatusProto status = heartbeat.evaluator_status; - string evaluatorId = status.evaluator_id; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp)); - _sanityChecker.check(evaluatorId, heartbeat.timestamp); - - lock (_evaluators) - { - if (_evaluators.ContainsKey(evaluatorId)) - { - EvaluatorManager evaluatorManager = _evaluators[evaluatorId]; - evaluatorManager.Handle(evaluatorHearBeatProto); - } - else - { - string msg = "Contact from unkonwn evaluator with id: " + evaluatorId; - if (heartbeat.evaluator_status != null) - { - msg += " with state" + status.state; - } - LOGGER.Log(Level.Error, msg); - Exceptions.Throw(new InvalidOperationException(msg), LOGGER); - } - } - } - - /// <summary> - /// This resource status message comes from the ResourceManager layer; telling me what it thinks - /// about the state of the resource executing an Evaluator; This method simply passes the message - /// off to the referenced EvaluatorManager - /// </summary> - /// <param name="resourceStatusProto"></param> - private void Handle(ResourceStatusProto resourceStatusProto) - { - lock (_evaluators) - { - if (_evaluators.ContainsKey(resourceStatusProto.identifier)) - { - EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier]; - evaluatorManager.Handle(resourceStatusProto); - } - else - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state)); - Exceptions.Throw(e, LOGGER); - } - } - } - - /// <summary> - /// This method handles resource allocations by creating a new EvaluatorManager instance. - /// </summary> - /// <param name="resourceAllocationProto"></param> - private void Handle(ResourceAllocationProto resourceAllocationProto) - { - lock (_evaluators) - { - try - { - INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id); - if (nodeDescriptor == null) - { - Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER); - } - EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores); - LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier); - EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor); - _evaluators.Add(resourceAllocationProto.identifier, evaluatorManager); - } - catch (Exception e) - { - Exceptions.Caught(e, Level.Error, LOGGER); - Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER); - } - } - } - - private void Handle(RuntimeStatusProto runtimeStatusProto) - { - State runtimeState = runtimeStatusProto.state; - LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state); - - switch (runtimeState) - { - case State.FAILED: - Fail(runtimeStatusProto.error); - break; - case State.DONE: - _clockFuture.Get().Dispose(); - break; - case State.RUNNING: - lock (_evaluators) - { - _runtimeStatusProto = runtimeStatusProto; - if (_clockFuture.Get().IsIdle() - && runtimeStatusProto.outstanding_container_requests == 0 - && runtimeStatusProto.container_allocation.Count == 0) - { - _clockFuture.Get().Dispose(); - } - } - break; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs deleted file mode 100644 index 9d7f8ad..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common; -using Org.Apache.REEF.Common.Catalog; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Util; - -namespace Org.Apache.REEF.Driver -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class DriverRuntimeConfiguration : ConfigurationModuleBuilder - { - public static ConfigurationModule ConfigurationModule - { - get - { - return new DriverRuntimeConfiguration() - // Resource Catalog - .BindImplementation(GenericType<IResourceCatalog>.Class, GenericType<ResourceCatalogImpl>.Class) - - // JobMessageObserver - //.BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<DriverManager>.Class) - .BindImplementation(GenericType<IJobMessageObserver>.Class, GenericType<ClientJobStatusHandler>.Class) - - // JobMessageObserver Wake event handler bindings - .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobMessageHandler>.Class, GenericType<ClientJobStatusHandler>.Class) - .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobExceptionHandler>.Class, GenericType<ClientJobStatusHandler>.Class) - - // Client manager - .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobControlHandler>.Class, GenericType<ClientManager>.Class) - - // Bind the runtime parameters - //.BindNamedParameter(GenericType<RuntimeParameters.NodeDescriptorHandler>.Class, GenericType<DriverManager>.Class) - //.BindNamedParameter(GenericType<RuntimeParameters.ResourceAllocationHandler>.Class, GenericType<DriverManager>.Class) - //.BindNamedParameter(GenericType<RuntimeParameters.ResourceStatusHandler>.Class, GenericType<DriverManager>.Class) - //.BindNamedParameter(GenericType<RuntimeParameters.RuntimeStatusHandler>.Class, GenericType<DriverManager>.Class) - - // Bind to the Clock - //.BindSetEntry(GenericType<IClock.RuntimeStopHandler>.Class, GenericType<DriverManager>.Class) - .Build(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs deleted file mode 100644 index 9ba1820..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Driver -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class DriverRuntimeConfigurationOptions - { - [NamedParameter(documentation: "Job message handler (see ClientJobStatusHandler)")] - public class JobMessageHandler : Name<ClientJobStatusHandler> - { - } - - [NamedParameter(documentation: "Job exception handler (see ClientJobStatusHandler)")] - public class JobExceptionHandler : Name<ClientJobStatusHandler> - { - } - - [NamedParameter(documentation: "Called when a job control message is received by the client.")] - public class JobControlHandler : Name<ClientManager> - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs deleted file mode 100644 index e28373d..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs +++ /dev/null @@ -1,653 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Linq; -using System.Text; -using Org.Apache.REEF.Common.Api; -using Org.Apache.REEF.Common.Catalog; -using Org.Apache.REEF.Common.Evaluator; -using Org.Apache.REEF.Common.Exceptions; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; -using Org.Apache.REEF.Driver.Bridge.Events; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Time; -using TaskMessage = Org.Apache.REEF.Common.Tasks.TaskMessage; - -namespace Org.Apache.REEF.Driver -{ - /// <summary> - /// Manages a single Evaluator instance including all lifecycle instances: - /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). - /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. - /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel. - /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. - /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate - /// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances: - /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). - /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. - /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel. - /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. - /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend). - /// </summary> - [Obsolete("Driver core logic no longer needed in.NET")] - public class EvaluatorManager : IDisposable, IIdentifiable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager)); - - private STATE _state = STATE.ALLOCATED; - - private IClock _clock; - - // TODO - // private final RemoteManager remoteManager; - private readonly DriverManager _driverManager; - - private readonly IResourceReleaseHandler _resourceReleaseHandler; - - private readonly IResourceLaunchHandler _resourceLaunchHandler; - - private readonly EvaluatorDescriptorImpl _evaluatorDescriptor; - - private readonly string _evaluatorId; - - private readonly IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>(); - - private readonly HashSet<string> _activeContextIds = new HashSet<string>(); - - private IRunningTask _runningTask = null; - - private readonly IObserver<EvaluatorControlProto> _evaluatorControlHandler = null; - - private bool _isResourceReleased = false; - - //TODO - //private final DispatchingEStage dispatcher; - private EvaluatorType _type = EvaluatorType.CLR; - - public EvaluatorManager( - IClock clock, - //RemoteManager remoteManager, - DriverManager driverManager, - IResourceReleaseHandler resourceReleaseHandler, - IResourceLaunchHandler resourceLaunchHandler, - //REEFErrorHandler errorHandler, - string evaluatorId, - EvaluatorDescriptorImpl evaluatorDescriptor, - ISet<IObservable<IActiveContext>> activeContextEventHandler, - ISet<IObservable<IClosedContext>> closedContextEventHandlers, - ISet<IObservable<FailedContext>> failedContextEventHandlers, - ISet<IObservable<ContextMessage>> contextMessageHandlers, - ISet<IObservable<IRunningTask>> runningTaskEventHandlers, - ISet<IObservable<ICompletedTask>> completedTaskEventHandlers, - ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers, - ISet<IObservable<TaskMessage>> taskMessageEventHandlers, - ISet<IObservable<FailedTask>> taskExceptionEventHandlers, - ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers, - ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers, - ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers) - { - _clock = clock; - //_remoteManager = remoteManager; - _driverManager = driverManager; - _resourceReleaseHandler = resourceReleaseHandler; - _resourceLaunchHandler = resourceLaunchHandler; - _evaluatorId = evaluatorId; - _evaluatorDescriptor = evaluatorDescriptor; - - //this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads - - //this.dispatcher.register(ActiveContext.class, activeContextEventHandlers); - //this.dispatcher.register(ClosedContext.class, closedContextEventHandlers); - //this.dispatcher.register(FailedContext.class, failedContextEventHandlers); - //this.dispatcher.register(ContextMessage.class, contextMessageHandlers); - - //this.dispatcher.register(RunningTask.class, runningTaskEventHandlers); - //this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers); - //this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers); - //this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers); - //this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers); - - //this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers); - //this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers); - //this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers); - - //this.dispatcher.onNext(AllocatedEvaluator.class, - // new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier())); - } - - /// <summary> - /// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager. - /// </summary> - public enum STATE - { - ALLOCATED, // initial state - SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact - RUNNING, // first contact received, all communication channels established, Evaluator sent to client. - DONE, // clean shutdown - FAILED, // some failure occurred. - KILLED // unclean shutdown - } - - public IEvaluatorDescriptor EvaluatorDescriptor - { - get - { - return _evaluatorDescriptor; - } - } - - public INodeDescriptor NodeDescriptor - { - get - { - return EvaluatorDescriptor.NodeDescriptor; - } - } - - public IRunningTask RunningTask - { - get - { - lock (_evaluatorDescriptor) - { - return _runningTask; - } - } - } - - public string Id - { - get - { - return _evaluatorId; - } - - set - { - } - } - - public EvaluatorType Type - { - get - { - return _type; - } - - set - { - _type = value; - _evaluatorDescriptor.EvaluatorType = value; - } - } - - public void Dispose() - { - lock (_evaluatorDescriptor) - { - if (_state == STATE.RUNNING) - { - LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id); - try - { - // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. - EvaluatorControlProto proto = new EvaluatorControlProto(); - proto.timestamp = DateTime.Now.Ticks; - proto.identifier = Id; - proto.kill_evaluator = new KillEvaluatorProto(); - Handle(proto); - } - finally - { - _state = STATE.KILLED; - } - } - } - - if (!_isResourceReleased) - { - try - { - // We need to wait awhile before returning the container to the RM in order to - // give the EvaluatorRuntime (and Launcher) time to cleanly exit. - - // this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { - //@Override - //public void onNext(final Alarm alarm) { - // EvaluatorManager.this.resourceReleaseHandler.onNext( - // DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() - // .setIdentifier(EvaluatorManager.this.evaluatorId).build()); - } - catch (Exception e) - { - Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER); - ResourceReleaseProto proto = new ResourceReleaseProto(); - proto.identifier = _evaluatorId; - _resourceReleaseHandler.OnNext(proto); - } - finally - { - _isResourceReleased = true; - _driverManager.Release(this); - } - } - } - - /// <summary> - /// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED - /// </summary> - /// <param name="exception"></param> - public void Handle(EvaluatorException exception) - { - lock (_evaluatorDescriptor) - { - if (_state >= STATE.DONE) - { - return; - } - LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message); - try - { - IList<FailedContext> failedContexts = new List<FailedContext>(); - IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts); - activeContexts = activeContexts.Reverse().ToList(); - foreach (EvaluatorContext context in activeContexts) - { - Optional<IActiveContext> parentContext = context.ParentId.IsPresent() - ? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value)) - : Optional<IActiveContext>.Empty(); - failedContexts.Add(context.GetFailedContext(parentContext, exception)); - } - - //Optional<FailedTask> failedTask = _runningTask != null ? - // Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty(); - //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString()); - //this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl( - //exception, failedContextList, failedTaskOptional, this.evaluatorId)); - } - catch (Exception e) - { - Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER); - } - finally - { - _state = STATE.FAILED; - Dispose(); - } - } - } - - public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage) - { - lock (_evaluatorDescriptor) - { - EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message; - if (heartbeatProto.evaluator_status != null) - { - EvaluatorStatusProto status = heartbeatProto.evaluator_status; - if (status.error != null) - { - Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error))); - return; - } - else if (_state == STATE.SUBMITTED) - { - string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString(); - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId); - // TODO - // _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class); - _state = STATE.RUNNING; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId)); - } - } - - LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto); - - EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status; - foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status) - { - Handle(contextStatusProto, heartbeatProto.task_status != null); - } - - if (heartbeatProto.task_status != null) - { - Handle(heartbeatProto.task_status); - } - - if (evaluatorStatusProto.state == State.FAILED) - { - _state = STATE.FAILED; - EvaluatorException e = evaluatorStatusProto.error != null ? - new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) : - new EvaluatorException(_evaluatorId, "unknown cause"); - LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message); - Handle(e); - } - else if (evaluatorStatusProto.state == State.DONE) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id)); - _state = STATE.DONE; - - // TODO - // dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() { - //@Override - //public String getId() { - // return EvaluatorManager.this.evaluatorId; - Dispose(); - } - } - LOGGER.Log(Level.Info, "DONE with evaluator heartbeat"); - } - - public void Handle(ResourceLaunchProto resourceLaunchProto) - { - lock (_evaluatorDescriptor) - { - if (_state == STATE.ALLOCATED) - { - _state = STATE.SUBMITTED; - _resourceLaunchHandler.OnNext(resourceLaunchProto); - } - else - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state)); - Exceptions.Throw(e, LOGGER); - } - } - } - - /// <summary> - /// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime - /// </summary> - /// <param name="contextControlProto"></param> - public void Handle(ContextControlProto contextControlProto) - { - lock (_evaluatorDescriptor) - { - LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId); - EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto(); - evaluatorControlProto.timestamp = DateTime.Now.Ticks; - evaluatorControlProto.identifier = Id; - evaluatorControlProto.context_control = contextControlProto; - - Handle(evaluatorControlProto); - } - } - - /// <summary> - /// Forward the EvaluatorControlProto to the EvaluatorRuntime - /// </summary> - /// <param name="proto"></param> - public void Handle(EvaluatorControlProto proto) - { - lock (_evaluatorDescriptor) - { - if (_state == STATE.RUNNING) - { - _evaluatorControlHandler.OnNext(proto); - } - else - { - var e = new InvalidOperationException( - string.Format( - CultureInfo.InvariantCulture, - "Evaluator manager expects to be in {0} state, but instead is in state {1}", - STATE.RUNNING, - _state)); - Exceptions.Throw(e, LOGGER); - } - } - } - - /// <summary> - /// Resource status information from the (actual) resource manager. - /// </summary> - /// <param name="resourceStatusProto"></param> - public void Handle(ResourceStatusProto resourceStatusProto) - { - lock (_evaluatorDescriptor) - { - State resourceState = resourceStatusProto.state; - LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState); - - if (resourceState == State.DONE || resourceState == State.FAILED) - { - if (_state < STATE.DONE) - { - // something is wrong, I think I'm alive but the resource manager runtime says I'm dead - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.Append( - string.Format( - CultureInfo.InvariantCulture, - "The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state", - _evaluatorId, - resourceState, - _state)); - if (resourceStatusProto.diagnostics != null) - { - stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics); - } - if (_runningTask != null) - { - stringBuilder.Append( - string.Format( - CultureInfo.InvariantCulture, - "Taskruntime {0} did not complete before this evaluator died.", - _runningTask.Id)); - } - - // RM is telling me its DONE/FAILED - assuming it has already released the resources - _isResourceReleased = true; - //Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask)); - _state = STATE.KILLED; - } - } - } - } - - /// <summary> - /// Handle a context status update - /// </summary> - /// <param name="contextStatusProto"></param> - /// <param name="notifyClientOnNewActiveContext"></param> - private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext) - { - string contextId = contextStatusProto.context_id; - Optional<string> parentId = contextStatusProto.parent_id != null ? - Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty(); - if (ContextStatusProto.State.READY == contextStatusProto.context_state) - { - if (!_activeContextIds.Contains(contextId)) - { - EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId); - AddEvaluatorContext(evaluatorContext); - if (notifyClientOnNewActiveContext) - { - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString()); - //TODO - //dispatcher.onNext(ActiveContext.class, context); - } - } - foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message) - { - byte[] message = contextMessageProto.message; - string sourceId = contextMessageProto.source_id; - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message); - // this.dispatcher.onNext(ContextMessage.class, - //new ContextMessageImpl(theMessage, contextID, sourceID)); - } - } - else - { - if (!_activeContextIds.Contains(contextId)) - { - if (ContextStatusProto.State.FAIL == contextStatusProto.context_state) - { - AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId)); - } - else - { - var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state); - Exceptions.Throw(e, LOGGER); - } - } - } - - EvaluatorContext context = GetEvaluatorContext(contextId); - EvaluatorContext parentContext = context.ParentId.IsPresent() ? - GetEvaluatorContext(context.ParentId.Value) : null; - RemoveEvaluatorContext(context); - - if (ContextStatusProto.State.FAIL == contextStatusProto.context_state) - { - // TODO - Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error)); - Optional<IActiveContext> optionalParentContext = (null == parentContext) ? - Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext); - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext); - // TODO - //this.dispatcher.onNext(FailedContext.class, - //context.getFailedContext(optionalParentContext, reason)); - } - else if (ContextStatusProto.State.DONE == contextStatusProto.context_state) - { - if (null != parentContext) - { - // TODO - //this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext)); - } - else - { - LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown."); - } - } - else - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId)); - Exceptions.Throw(e, LOGGER); - } - } - - /// <summary> - /// Handle task status messages. - /// </summary> - /// <param name="taskStatusProto"></param> - private void Handle(TaskStatusProto taskStatusProto) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state)); - string taskId = taskStatusProto.task_id; - string contextId = taskStatusProto.context_id; - State taskState = taskStatusProto.state; - - if (taskState == State.INIT) - { - EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); - _runningTask = new RunningTaskImpl(taskId, evaluatorContext); - // this.dispatcher.onNext(RunningTask.class, this.runningTask); - } - else if (taskState == State.SUSPEND) - { - EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); - _runningTask = null; - byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null; - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString()); - //this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId)); - } - else if (taskState == State.DONE) - { - EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); - _runningTask = null; - byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null; - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString()); - //this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId)); - } - else if (taskState == State.FAILED) - { - _runningTask = null; - //EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); - //FailedTask failedTask = taskStatusProto.result != null ? - // new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) : - // new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext)); - //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString()); - //this.dispatcher.onNext(FailedTask.class, taskException); - } - else if (taskStatusProto.task_message.Count > 0) - { - if (_runningTask != null) - { - var e = new InvalidOperationException("runningTask must be null when there are multiple task messages"); - Exceptions.Throw(e, LOGGER); - } - foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message) - { - LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString()); - // this.dispatcher.onNext(TaskMessage.class, - //new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(), - // taskId, contextId, taskMessageProto.getSourceId())); - } - } - } - - private EvaluatorContext GetEvaluatorContext(string id) - { - foreach (EvaluatorContext context in _activeContexts) - { - if (context.Id.Equals(id)) - { - return context; - } - var e = new InvalidOperationException("Unknown evaluator context with id " + id); - Exceptions.Throw(e, LOGGER); - } - return null; - } - - private void AddEvaluatorContext(EvaluatorContext context) - { - _activeContexts.Add(context); - _activeContextIds.Add(context.Id); - } - - private void RemoveEvaluatorContext(EvaluatorContext context) - { - _activeContexts.Remove(context); - _activeContextIds.Remove(context.Id); - } - - [NamedParameter(documentation: "The Evaluator Identifier.")] - public class EvaluatorIdentifier : Name<string> - { - } - - [NamedParameter(documentation: "The Evaluator Host.")] - public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl> - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index 74fbf5f..692f39d 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -84,14 +84,12 @@ under the License. <Compile Include="Bridge\ILogger.cs" /> <Compile Include="Bridge\ReefHttpRequest.cs" /> <Compile Include="Bridge\ReefHttpResponse.cs" /> - <Compile Include="ClientManager.cs" /> <Compile Include="Constants.cs" /> <Compile Include="Context\ContextConfiguration.cs" /> <Compile Include="Context\ContextConfigurationOptions.cs" /> <Compile Include="Context\Defaults\DefaultContextMessageSource.cs" /> <Compile Include="Context\Defaults\DefaultContextStartHandler.cs" /> <Compile Include="Context\Defaults\DefaultContextStopHandler.cs" /> - <Compile Include="Context\EvaluatorContext.cs" /> <Compile Include="Context\IActiveContext.cs" /> <Compile Include="Context\IClosedContext.cs" /> <Compile Include="Context\IContext.cs" /> @@ -120,11 +118,7 @@ under the License. <Compile Include="Defaults\DefaultTaskSuspensionHandler.cs" /> <Compile Include="DriverConfigGenerator.cs" /> <Compile Include="DriverConfigurationSettings.cs" /> - <Compile Include="DriverManager.cs" /> - <Compile Include="DriverRuntimeConfiguration.cs" /> - <Compile Include="DriverRuntimeConfigurationOptions.cs" /> <Compile Include="DriverSubmissionSettings.cs" /> - <Compile Include="EvaluatorManager.cs" /> <Compile Include="Evaluator\EvaluatorDescriptorImpl.cs" /> <Compile Include="Evaluator\EvaluatorRequest.cs" /> <Compile Include="Evaluator\EvaluatorRequestBuilder.cs" /> @@ -143,7 +137,6 @@ under the License. <Compile Include="Task\IRunningTask.cs" /> <Compile Include="Task\ISuspendedTask.cs" /> <Compile Include="Task\ITaskMessage.cs" /> - <Compile Include="Task\RunningTaskImpl.cs" /> </ItemGroup> <ItemGroup> <None Include="Org.Apache.REEF.Driver.nuspec" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs b/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs deleted file mode 100644 index 056c188..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Driver.Task -{ - [Obsolete("Driver core logic no longer needed in.NET")] - public class RunningTaskImpl : IRunningTask - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(RunningTaskImpl)); - - private readonly string _id; - - private readonly EvaluatorContext _evaluatorContext; - - public RunningTaskImpl(string taskId, EvaluatorContext evaluatorContext) - { - _id = taskId; - _evaluatorContext = evaluatorContext; - } - - public string Id - { - get - { - return _id; - } - - set - { - } - } - - public IActiveContext ActiveContext - { - get - { - return _evaluatorContext; - } - - set - { - } - } - - public void Dispose() - { - ContextControlProto contextControlProto = new ContextControlProto(); - contextControlProto.stop_task = new StopTaskProto(); - } - - public void Dispose(byte[] message) - { - ContextControlProto contextControlProto = new ContextControlProto(); - contextControlProto.stop_task = new StopTaskProto(); - contextControlProto.task_message = message; - } - - public void OnNext(byte[] message) - { - ContextControlProto contextControlProto = new ContextControlProto(); - contextControlProto.task_message = message; - } - - public void Suspend(byte[] message) - { - ContextControlProto contextControlProto = new ContextControlProto(); - contextControlProto.suspend_task = new SuspendTaskProto(); - contextControlProto.task_message = message; - } - - public void Suspend() - { - ContextControlProto contextControlProto = new ContextControlProto(); - contextControlProto.suspend_task = new SuspendTaskProto(); - } - - public override string ToString() - { - return "TaskRuntime with taskId = " + _id; - } - - public override int GetHashCode() - { - return _id.GetHashCode(); - } - - public void Send(byte[] message) - { - LOGGER.Log(Level.Info, "RunningTaskImpl.Send() is called"); - } - } -}
