Repository: incubator-reef Updated Branches: refs/heads/master 0850674bd -> 8a6e65585
[REEF-445] Remove unused fields from Wake.Remote.Impl.RemoteEvent This removes fields sink and source from C# and Java code. JIRA: [REEF-445](https://issues.apache.org/jira/browse/REEF-445) Pull Request: This closes #403 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8a6e6558 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8a6e6558 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8a6e6558 Branch: refs/heads/master Commit: 8a6e655858dfefbfa883aebf57a53de57b20be08 Parents: 0850674 Author: Mariia Mykhailova <[email protected]> Authored: Fri Aug 21 13:47:20 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Aug 21 15:59:07 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs | 4 -- .../Remote/Impl/DefaultRemoteManager.cs | 1 - .../Remote/Impl/RemoteEvent.cs | 12 +---- .../Remote/Impl/RemoteEventDecoder.cs | 2 +- .../Remote/Impl/RemoteEventEncoder.cs | 2 - .../wake/remote/impl/ProxyEventHandler.java | 3 +- .../reef/wake/remote/impl/RemoteEvent.java | 49 +------------------- .../wake/remote/impl/RemoteEventDecoder.java | 3 +- .../wake/remote/impl/RemoteEventEncoder.java | 6 --- .../wake/src/main/proto/RemoteProtocol.proto | 2 - .../reef/wake/test/remote/RemoteTest.java | 3 +- 11 files changed, 7 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs index de3b747..165e4d9 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs @@ -27,10 +27,6 @@ namespace Org.Apache.REEF.Wake.Remote IPEndPoint RemoteEndPoint { get; set; } - string Source { get; } - - string Sink { get; } - T Value { get; } long Sequence { get; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs index 0290029..6da0d98 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs @@ -316,7 +316,6 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message) { - Sink = "default", Sequence = _messageCount }; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs index bfed7f9..88b7104 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs @@ -25,12 +25,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { public class RemoteEvent<T> : IRemoteEvent<T> { - public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value) + public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, long seq, T value) { LocalEndPoint = localEndPoint; RemoteEndPoint = remoteEndPoint; - Source = source; - Sink = sink; Value = value; Sequence = seq; } @@ -51,15 +49,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl public IPEndPoint RemoteEndPoint { get; set; } - [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)] - public string Source { get; set; } - - [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)] - public string Sink { get; set; } - public T Value { get; set; } - [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)] + [Obsolete("This field is used in Java code only. See [REEF-445]", false)] public long Sequence { get; set; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs index 683256c..2fd1cd9 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl public IRemoteEvent<T> Decode(byte[] data) { WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data); - return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data)); + return new RemoteEvent<T>(null, null, pbuf.seq, _decoder.Decode(pbuf.data)); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs index 76f0590..b9ade13 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs @@ -33,8 +33,6 @@ namespace Org.Apache.REEF.Wake.Remote.Impl public byte[] Encode(IRemoteEvent<T> obj) { WakeMessagePBuf pbuf = new WakeMessagePBuf(); - pbuf.sink = obj.Sink; - pbuf.source = obj.Source; pbuf.data = _encoder.Encode(obj.Value); pbuf.seq = obj.Sequence; return pbuf.Serialize(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java index 72ba6d7..a5982e5 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java @@ -45,7 +45,6 @@ public class ProxyEventHandler<T> implements EventHandler<T> { * @param myId my identifier * @param remoteId the remote identifier * @param remoteSinkName the remote sink name - * @param reStage the sender stage * @throws RemoteRuntimeException */ public ProxyEventHandler(final RemoteIdentifier myId, final RemoteIdentifier remoteId, final String remoteSinkName, @@ -73,7 +72,7 @@ public class ProxyEventHandler<T> implements EventHandler<T> { if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "remoteid: {0}\n{1}", new Object[]{remoteId.getSocketAddress(), event.toString()}); } - handler.onNext(new RemoteEvent<T>(myId.getSocketAddress(), remoteId.getSocketAddress(), "", remoteSinkName, + handler.onNext(new RemoteEvent<T>(myId.getSocketAddress(), remoteId.getSocketAddress(), seqGen.getNextSeq(remoteId.getSocketAddress()), event)); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java index b7e6dc3..55adbc6 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java @@ -32,25 +32,18 @@ public class RemoteEvent<T> { //private static final AtomicLong curSeq = new AtomicLong(0); private SocketAddress localAddr; private SocketAddress remoteAddr; - private String src; - private String sink; /** * Constructs a remote event. * * @param localAddr the local socket address * @param remoteAddr the remote socket address - * @param src the source - * @param sink the remote sink * @param seq the sequence number * @param event the event */ - public RemoteEvent(final SocketAddress localAddr, final SocketAddress remoteAddr, final String src, - final String sink, final long seq, final T event) { + public RemoteEvent(final SocketAddress localAddr, final SocketAddress remoteAddr, final long seq, final T event) { this.localAddr = localAddr; this.remoteAddr = remoteAddr; - this.src = src; - this.sink = sink; this.event = event; this.seq = seq; } @@ -74,42 +67,6 @@ public class RemoteEvent<T> { } /** - * Gets the source. - * - * @return the source - */ - public String getSource() { - return src; - } - - /** - * Sets the source. - * - * @param name the source name - */ - public void setSource(final String name) { - src = name; - } - - /** - * Gets the sink. - * - * @return the sink - */ - public String getSink() { - return sink; - } - - /** - * Sets the sink. - * - * @param name the sink name - */ - public void setSink(final String name) { - sink = name; - } - - /** * Gets the actual event. * * @return the event @@ -157,10 +114,6 @@ public class RemoteEvent<T> { builder.append(localAddr); builder.append(" remoteAddr="); builder.append(remoteAddr); - builder.append(" sourceName="); - builder.append(src); - builder.append(" sinkName="); - builder.append(sink); builder.append(" seq="); builder.append(seq); builder.append(" event="); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java index 3ef0a27..fdf10b2 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java @@ -53,8 +53,7 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> { final WakeMessagePBuf pbuf; try { pbuf = WakeMessagePBuf.parseFrom(data); - return new RemoteEvent<T>(null, null, pbuf.getSource(), pbuf.getSink(), pbuf.getSeq(), - decoder.decode(pbuf.getData().toByteArray())); + return new RemoteEvent<T>(null, null, pbuf.getSeq(), decoder.decode(pbuf.getData().toByteArray())); } catch (final InvalidProtocolBufferException e) { throw new RemoteRuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java index db0c78a..29e3be6 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java @@ -50,17 +50,11 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> { */ @Override public byte[] encode(final RemoteEvent<T> obj) { - if (obj.getSink() == null) { - throw new RemoteRuntimeException("Sink stage is null"); - } if (obj.getEvent() == null) { throw new RemoteRuntimeException("Event is null"); } final WakeMessagePBuf.Builder builder = WakeMessagePBuf.newBuilder(); - final String source = obj.getSource() == null ? "" : obj.getSource(); - builder.setSource(source); - builder.setSink(obj.getSink()); builder.setSeq(obj.getSeq()); builder.setData(ByteString.copyFrom(encoder.encode(obj.getEvent()))); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/java/reef-wake/wake/src/main/proto/RemoteProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/proto/RemoteProtocol.proto b/lang/java/reef-wake/wake/src/main/proto/RemoteProtocol.proto index 7e6735e..1885513 100644 --- a/lang/java/reef-wake/wake/src/main/proto/RemoteProtocol.proto +++ b/lang/java/reef-wake/wake/src/main/proto/RemoteProtocol.proto @@ -24,8 +24,6 @@ option java_generate_equals_and_hash = true; message WakeMessagePBuf { required bytes data = 1; required int64 seq = 2; - optional string source = 3; - optional string sink = 4; } message WakeTuplePBuf { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8a6e6558/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java index e2eacc9..c6b6579 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java @@ -73,14 +73,13 @@ public class RemoteTest { final SocketAddress remoteAddr = new InetSocketAddress(this.localAddressProvider.getLocalAddress(), 9000); final RemoteEvent<TestEvent> e1 = new RemoteEvent<TestEvent>( - localAddr, remoteAddr, "stage1", "stage2", 1, new TestEvent("hello", 0.0)); + localAddr, remoteAddr, 1, new TestEvent("hello", 0.0)); System.out.println(e1); final byte[] data = reCodec.encode(e1); final RemoteEvent<TestEvent> e2 = reCodec.decode(data); System.out.println(e2); - Assert.assertEquals(e1.getSink(), e2.getSink()); Assert.assertEquals(e1.getEvent().getMessage(), e2.getEvent().getMessage()); }
