http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java new file mode 100644 index 0000000..f315d02 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Tests for naming. + */ +package org.apache.reef.io.network.naming;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java new file mode 100644 index 0000000..b5c06d5 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * TODO: Document. + */ +package org.apache.reef.io.network; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java new file mode 100644 index 0000000..8c3506f --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.io.network.naming.NameResolver; +import org.apache.reef.io.network.naming.NameResolverConfiguration; +import org.apache.reef.io.network.naming.NameServer; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +// TODO[JIRA REEF-637] Remove the deprecated class. +/** + * Helper class for DeprecatedNetworkConnectionService test, deprecated in 0.13. + */ +@Deprecated +public final class DeprecatedNetworkMessagingTestService implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(DeprecatedNetworkMessagingTestService.class.getName()); + + private final IdentifierFactory factory; + private final NetworkConnectionService receiverNetworkConnService; + private final NetworkConnectionService senderNetworkConnService; + private final String receiver; + private final String sender; + private final NameServer nameServer; + private final NameResolver receiverResolver; + private final NameResolver senderResolver; + + public DeprecatedNetworkMessagingTestService(final String localAddress) throws InjectionException { + // name server + final Injector injector = Tang.Factory.getTang().newInjector(); + this.nameServer = injector.getInstance(NameServer.class); + final Configuration netConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort()) + .build(); + + LOG.log(Level.FINEST, "=== Test network connection service receiver start"); + // network service for receiver + this.receiver = "receiver"; + final Injector injectorReceiver = injector.forkInjector(netConf); + this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class); + this.receiverResolver = injectorReceiver.getInstance(NameResolver.class); + this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class); + this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver)); + + // network service for sender + this.sender = "sender"; + LOG.log(Level.FINEST, "=== Test network connection service sender start"); + final Injector injectorSender = injector.forkInjector(netConf); + senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class); + senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender)); + this.senderResolver = injectorSender.getInstance(NameResolver.class); + } + + public <T> void registerTestConnectionFactory(final Identifier connFactoryId, + final int numMessages, final Monitor monitor, + final Codec<T> codec) throws NetworkException { + receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, + new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); + senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, + new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); + } + + public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) { + final Identifier destId = factory.getNewInstance(receiver); + return (Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId); + } + + public void close() throws Exception { + senderNetworkConnService.close(); + receiverNetworkConnService.close(); + nameServer.close(); + receiverResolver.close(); + senderResolver.close(); + } + + public static final class MessageHandler<T> implements EventHandler<Message<T>> { + private final int expected; + private final Monitor monitor; + private AtomicInteger count = new AtomicInteger(0); + + public MessageHandler(final Monitor monitor, + final int expected) { + this.monitor = monitor; + this.expected = expected; + } + + @Override + public void onNext(final Message<T> value) { + count.incrementAndGet(); + LOG.log(Level.FINE, "Count: {0}", count.get()); + LOG.log(Level.FINE, + "OUT: {0} received {1} from {2} to {3}", + new Object[]{value, value.getSrcId(), value.getDestId()}); + + for (final T obj : value.getData()) { + LOG.log(Level.FINE, "OUT: data: {0}", obj); + } + + if (count.get() == expected) { + monitor.mnotify(); + } + } + } + + public static final class TestListener<T> implements LinkListener<Message<T>> { + @Override + public void onSuccess(final Message<T> message) { + LOG.log(Level.FINE, "success: " + message); + } + @Override + public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) { + LOG.log(Level.WARNING, "exception: " + cause + message); + throw new RuntimeException(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java new file mode 100644 index 0000000..e2cf7f9 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import java.util.logging.ConsoleHandler; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.Logger; + +public final class LoggingUtils { + public static void setLoggingLevel(final Level level) { + final Handler[] handlers = Logger.getLogger("").getHandlers(); + ConsoleHandler ch = null; + for (final Handler h : handlers) { + if (h instanceof ConsoleHandler) { + ch = (ConsoleHandler) h; + break; + } + } + if (ch == null) { + ch = new ConsoleHandler(); + Logger.getLogger("").addHandler(ch); + } + ch.setLevel(level); + Logger.getLogger("").setLevel(level); + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private LoggingUtils() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java new file mode 100644 index 0000000..32c61ad --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class Monitor { + private AtomicBoolean finished = new AtomicBoolean(false); + + public void mwait() throws InterruptedException { + synchronized (this) { + while (!finished.get()) { + this.wait(); + } + } + } + + public void mnotify() { + synchronized (this) { + finished.compareAndSet(false, true); + this.notifyAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java new file mode 100644 index 0000000..a05f266 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.io.network.naming.NameResolver; +import org.apache.reef.io.network.naming.NameResolverConfiguration; +import org.apache.reef.io.network.naming.NameServer; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Helper class for NetworkConnectionService test. + */ +public final class NetworkMessagingTestService implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(NetworkMessagingTestService.class.getName()); + + private final IdentifierFactory factory; + private final NetworkConnectionService receiverNetworkConnService; + private final NetworkConnectionService senderNetworkConnService; + private final NameServer nameServer; + private final NameResolver receiverResolver; + private final NameResolver senderResolver; + + public NetworkMessagingTestService(final String localAddress) throws InjectionException { + // name server + final Injector injector = Tang.Factory.getTang().newInjector(); + this.nameServer = injector.getInstance(NameServer.class); + final Configuration netConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort()) + .build(); + + LOG.log(Level.FINEST, "=== Test network connection service receiver start"); + // network service for receiver + final Injector injectorReceiver = injector.forkInjector(netConf); + this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class); + this.receiverResolver = injectorReceiver.getInstance(NameResolver.class); + this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class); + + // network service for sender + LOG.log(Level.FINEST, "=== Test network connection service sender start"); + final Injector injectorSender = injector.forkInjector(netConf); + senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class); + this.senderResolver = injectorSender.getInstance(NameResolver.class); + } + + public <T> void registerTestConnectionFactory(final Identifier connFactoryId, + final int numMessages, final Monitor monitor, + final Codec<T> codec) throws NetworkException { + final Identifier receiverEndPointId = factory.getNewInstance("receiver"); + final Identifier senderEndPointId = factory.getNewInstance("sender"); + receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, + new MessageHandler<T>(monitor, numMessages, senderEndPointId, receiverEndPointId), + new TestListener<T>(), receiverEndPointId); + senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, + new MessageHandler<T>(monitor, numMessages, receiverEndPointId, senderEndPointId), + new TestListener<T>(), senderEndPointId); + } + + public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) { + final Identifier receiverEndPointId = factory.getNewInstance("receiver"); + return (Connection<T>)senderNetworkConnService + .getConnectionFactory(connFactoryId) + .newConnection(receiverEndPointId); + } + + public void close() throws Exception { + senderNetworkConnService.close(); + receiverNetworkConnService.close(); + nameServer.close(); + receiverResolver.close(); + senderResolver.close(); + } + + public static final class MessageHandler<T> implements EventHandler<Message<T>> { + private final int expected; + private final Monitor monitor; + private final Identifier expectedSrcId; + private final Identifier expectedDestId; + private AtomicInteger count = new AtomicInteger(0); + + public MessageHandler(final Monitor monitor, + final int expected, + final Identifier expectedSrcId, + final Identifier expectedDestId) { + this.monitor = monitor; + this.expected = expected; + this.expectedSrcId = expectedSrcId; + this.expectedDestId = expectedDestId; + } + + @Override + public void onNext(final Message<T> value) { + count.incrementAndGet(); + LOG.log(Level.FINE, "Count: {0}", count.get()); + LOG.log(Level.FINE, + "OUT: {0} received {1} from {2} to {3}", + new Object[]{value, value.getSrcId(), value.getDestId()}); + + for (final T obj : value.getData()) { + LOG.log(Level.FINE, "OUT: data: {0}", obj); + } + + assert value.getSrcId().equals(expectedSrcId); + assert value.getDestId().equals(expectedDestId); + + if (count.get() == expected) { + monitor.mnotify(); + } + } + } + + public static final class TestListener<T> implements LinkListener<Message<T>> { + @Override + public void onSuccess(final Message<T> message) { + LOG.log(Level.FINE, "success: " + message); + } + @Override + public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) { + LOG.log(Level.WARNING, "exception: " + cause + message); + throw new RuntimeException(cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java new file mode 100644 index 0000000..75ff0b6 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import org.apache.reef.io.network.impl.StreamingCodec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + + +public class StreamingIntegerCodec implements StreamingCodec<Integer> { + + @Override + public void encodeToStream(final Integer obj, final DataOutputStream stream) { + try { + stream.writeInt(obj); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer decodeFromStream(final DataInputStream stream) { + try { + return stream.readInt(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer decode(final byte[] data) { + return null; + } + + @Override + public byte[] encode(final Integer obj) { + return new byte[0]; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java new file mode 100644 index 0000000..3e4edfc --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import org.apache.reef.io.network.impl.StreamingCodec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + + +public class StreamingStringCodec implements StreamingCodec<String> { + @Override + public byte[] encode(final String obj) { + return obj.getBytes(); + } + + @Override + public String decode(final byte[] buf) { + return new String(buf); + } + + @Override + public void encodeToStream(final String obj, final DataOutputStream stream) { + try { + stream.writeUTF(obj); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String decodeFromStream(final DataInputStream stream) { + try { + return stream.readUTF(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java new file mode 100644 index 0000000..6dc69a4 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.PeriodicEvent; + +public class TimeoutHandler implements EventHandler<PeriodicEvent> { + + private final Monitor monitor; + + public TimeoutHandler(final Monitor monitor) { + this.monitor = monitor; + } + + @Override + public void onNext(final PeriodicEvent event) { + monitor.mnotify(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java new file mode 100644 index 0000000..2437717 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.storage; + +import org.apache.reef.io.ExternalMap; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.storage.ram.CodecRamMap; +import org.apache.reef.io.storage.ram.RamMap; +import org.apache.reef.io.storage.ram.RamStorageService; +import org.apache.reef.io.storage.util.IntegerCodec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + + +public class ExternalMapTest { + @Test + public void testCodecRamMap() { + final RamStorageService ramStore = new RamStorageService(); + final Codec<Integer> c = new IntegerCodec(); + final ExternalMap<Integer> m = new CodecRamMap<>(ramStore, c); + genericTest(m); + } + + @Test + public void testRamMap() { + final RamStorageService ramStore = new RamStorageService(); + final ExternalMap<Integer> m = new RamMap<>(ramStore); + genericTest(m); + } + + + void genericTest(final ExternalMap<Integer> m) { + m.put("foo", 42); + final Map<String, Integer> smallMap = new HashMap<>(); + smallMap.put("bar", 43); + smallMap.put("baz", 44); + + m.putAll(smallMap); + + Assert.assertEquals(44, (int) m.get("baz")); + Assert.assertEquals(43, (int) m.get("bar")); + Assert.assertEquals(42, (int) m.get("foo")); + Assert.assertNull(m.get("quuz")); + + Assert.assertTrue(m.containsKey("bar")); + Assert.assertFalse(m.containsKey("quuz")); + + final Set<String> barBaz = new HashSet<>(); + barBaz.add("bar"); + barBaz.add("baz"); + barBaz.add("quuz"); + + final Iterable<Map.Entry<CharSequence, Integer>> it = m.getAll(barBaz); + + final Map<CharSequence, Integer> found = new TreeMap<>(); + + for (final Map.Entry<CharSequence, Integer> e : it) { + found.put(e.getKey(), e.getValue()); + } + final Iterator<CharSequence> it2 = found.keySet().iterator(); + Assert.assertTrue(it2.hasNext()); + CharSequence s = it2.next(); + Assert.assertEquals(s, "bar"); + Assert.assertEquals((int) found.get(s), 43); + Assert.assertTrue(it2.hasNext()); + s = it2.next(); + Assert.assertEquals(s, "baz"); + Assert.assertEquals((int) found.get(s), 44); + Assert.assertFalse(it2.hasNext()); + + Assert.assertEquals(44, (int) m.remove("baz")); + Assert.assertFalse(m.containsKey("baz")); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java new file mode 100644 index 0000000..33caa7c --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.storage; + +import org.apache.reef.exception.evaluator.ServiceException; +import org.apache.reef.io.Accumulator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +public class FramingTest { + + @Test + public void frameRoundTripTest() throws IOException, ServiceException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); + final FramingOutputStream o = new FramingOutputStream(baos); + final FramingOutputStream o2 = new FramingOutputStream(baos2); + final Accumulator<byte[]> a = o2.accumulator(); + int offset = 0; + for (int i = 0; i < 256; i++) { + final byte[] b = new byte[i]; + Arrays.fill(b, (byte) i); + o.write(b); + if (i == 255) { + o.close(); + } else { + o.nextFrame(); + } + offset += (4 + i); + Assert.assertEquals(offset, o.getCurrentOffset()); + a.add(b); + Assert.assertEquals(offset, o2.getCurrentOffset()); + } + a.close(); + o2.close(); + final byte[] b1 = baos.toByteArray(); + final byte[] b2 = baos2.toByteArray(); + Assert.assertArrayEquals(b1, b2); + final FramingInputStream inA1 = new FramingInputStream(new ByteArrayInputStream(b1)); + final FramingInputStream inA2 = new FramingInputStream(new ByteArrayInputStream(b2)); + for (int i = 0; i <= 256; i++) { + final byte[] b = new byte[i]; + Arrays.fill(b, (byte) i); + final byte[] f = inA1.readFrame(); + final byte[] g = inA2.readFrame(); + if (i == 256) { + Assert.assertNull(f); + Assert.assertNull(g); + } else { + Assert.assertArrayEquals(b, f); + Assert.assertArrayEquals(b, g); + } + } + inA2.close(); + inA1.close(); + + final FramingInputStream inB1 = new FramingInputStream(new ByteArrayInputStream(b1)); + int i = 0; + for (final byte[] bin : inB1) { + final byte[] b = new byte[i]; + Arrays.fill(b, (byte) i); + Assert.assertArrayEquals(b, bin); + i++; + } + Assert.assertEquals(256, i); + inB1.close(); + + final FramingInputStream inB2 = new FramingInputStream(new ByteArrayInputStream(b2)); + i = 0; + for (final byte[] bin : inB2) { + final byte[] b = new byte[i]; + Arrays.fill(b, (byte) i); + Assert.assertArrayEquals(b, bin); + i++; + } + Assert.assertEquals(256, i); + inB2.close(); + Assert.assertArrayEquals(b1, b2); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java new file mode 100644 index 0000000..072c0b6 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.storage; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; + +public class MergingIteratorTest { + + @Test + public void testMergingIterator() { + Comparator<Integer> cmp = new Comparator<Integer>() { + + @Override + public int compare(final Integer o1, final Integer o2) { + return Integer.compare(o1, o2); + } + }; + @SuppressWarnings("unchecked") + Iterator<Integer>[] its = new Iterator[]{ + Arrays.asList(new Integer[]{1, 4, 7, 10}).iterator(), + Arrays.asList(new Integer[]{2, 5, 8, 11}).iterator(), + Arrays.asList(new Integer[]{3, 6, 9, 12}).iterator() + }; + MergingIterator<Integer> merge = new MergingIterator<Integer>(cmp, its); + int i = 1; + while (merge.hasNext()) { + Assert.assertEquals(i, (int) merge.next()); + i++; + } + Assert.assertEquals(13, i); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java new file mode 100644 index 0000000..68b7879 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.storage; + +import org.apache.reef.exception.evaluator.ServiceException; +import org.apache.reef.io.Accumulator; +import org.apache.reef.io.Spool; +import org.apache.reef.io.storage.ram.SortingRamSpool; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +public class SortingSpoolTest { + + @Test + public void testRamSpool() throws ServiceException { + genericTest(new SortingRamSpool<Integer>(), new Comparator<Integer>() { + + @Override + public int compare(final Integer o1, final Integer o2) { + return Integer.compare(o1, o2); + } + + }); + } + + @Test + public void testRamSpoolComparator() throws ServiceException { + final Comparator<Integer> backwards = new Comparator<Integer>() { + + @Override + public int compare(final Integer o1, final Integer o2) { + return -1 * o1.compareTo(o2); + } + + }; + genericTest(new SortingRamSpool<Integer>(backwards), backwards); + } + + @Test(expected = IllegalStateException.class) + public void testRamSpoolAddAfterClose() throws ServiceException { + final Spool<Integer> s = new SortingRamSpool<>(); + genericAddAfterCloseTest(s); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRamSpoolCantRemove() throws ServiceException { + final Spool<Integer> s = new SortingRamSpool<>(); + genericCantRemove(s); + } + + @Test(expected = IllegalStateException.class) + public void testIteratorBeforeClose() throws ServiceException { + final Spool<Integer> s = new SortingRamSpool<>(); + genericIteratorBeforeClose(s); + } + + void genericTest(final Spool<Integer> s, final Comparator<Integer> comparator) + throws ServiceException { + final List<Integer> l = new ArrayList<Integer>(); + final Random r = new Random(42); + while (l.size() < 100) { + l.add(r.nextInt(75)); + } + final Accumulator<Integer> a = s.accumulator(); + for (int i = 0; i < 100; i++) { + a.add(l.get(i)); + } + a.close(); + final List<Integer> m = new ArrayList<Integer>(); + for (final int i : s) { + m.add(i); + } + final Integer[] sorted = l.toArray(new Integer[0]); + Arrays.sort(sorted, 0, sorted.length, comparator); + final Integer[] shouldBeSorted = m.toArray(new Integer[0]); + Assert.assertArrayEquals(sorted, shouldBeSorted); + } + + void genericAddAfterCloseTest(final Spool<?> s) throws ServiceException { + final Accumulator<?> a = s.accumulator(); + a.close(); + a.add(null); + } + + void genericCantRemove(final Spool<Integer> s) throws ServiceException { + final Accumulator<Integer> a = s.accumulator(); + a.add(10); + a.close(); + final Iterator<?> it = s.iterator(); + it.remove(); + } + + void genericIteratorBeforeClose(final Spool<Integer> s) throws ServiceException { + final Accumulator<Integer> a = s.accumulator(); + a.add(10); + s.iterator(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java new file mode 100644 index 0000000..61bf349 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.storage; + +import org.apache.reef.exception.evaluator.ServiceException; +import org.apache.reef.io.Accumulable; +import org.apache.reef.io.Accumulator; +import org.apache.reef.io.Spool; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.Deserializer; +import org.apache.reef.io.serialization.Serializer; +import org.apache.reef.io.storage.local.CodecFileAccumulable; +import org.apache.reef.io.storage.local.CodecFileIterable; +import org.apache.reef.io.storage.local.LocalStorageService; +import org.apache.reef.io.storage.local.SerializerFileSpool; +import org.apache.reef.io.storage.ram.RamSpool; +import org.apache.reef.io.storage.ram.RamStorageService; +import org.apache.reef.io.storage.util.IntegerCodec; +import org.apache.reef.tang.ConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class SpoolFileTest { + private final Serializer<Integer, OutputStream> serializer = new Serializer<Integer, OutputStream>() { + @Override + public Accumulable<Integer> create(final OutputStream out) { + return new Accumulable<Integer>() { + + @Override + public Accumulator<Integer> accumulator() { + return new Accumulator<Integer>() { + + @Override + public void add(final Integer datum) { + try { + final int d = datum; + out.write(new byte[]{(byte) (d >>> 24), (byte) (d >>> 16), + (byte) (d >>> 8), (byte) d}); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void close() { + try { + out.flush(); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + }; + } + }; + } + }; + private final Deserializer<Integer, InputStream> deserializer = new Deserializer<Integer, InputStream>() { + @Override + public Iterable<Integer> create(final InputStream in) { + return new Iterable<Integer>() { + @Override + public Iterator<Integer> iterator() { + final Iterator<Integer> it = new Iterator<Integer>() { + private final byte[] inb = new byte[4]; + private Integer nextInt; + + @Override + public boolean hasNext() { + return nextInt != null; + } + + private void prime() { + final int read; + try { + read = in.read(inb); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + if (read != 4) { + nextInt = null; + } else { + nextInt = ((inb[0] & 0xFF) << 24) + ((inb[1] & 0xFF) << 16) + + ((inb[2] & 0xFF) << 8) + (inb[3] & 0xFF); + } + + } + + @Override + public Integer next() { + final Integer ret = nextInt; + prime(); + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + it.next(); // calls prime + return it; + } + }; + } + }; + + @Test + public void testRam() throws BindException, InjectionException, ServiceException, IOException { + final Tang t = Tang.Factory.getTang(); + final ConfigurationBuilder configurationBuilderOne = t.newConfigurationBuilder(RamConf.CONF.build()); + + final AvroConfigurationSerializer avroSerializer = new AvroConfigurationSerializer(); + final String serializedConfiguration = avroSerializer.toString(configurationBuilderOne.build()); + final ConfigurationBuilder configurationBuilderTwo = + t.newConfigurationBuilder(avroSerializer.fromString(serializedConfiguration)); + + @SuppressWarnings("unchecked") + final Spool<Integer> f = (Spool<Integer>) t.newInjector(configurationBuilderTwo.build()).getInstance( + Spool.class); + test(f); + } + + @Test + public void testFile() throws ServiceException { + final LocalStorageService service = new LocalStorageService("spoolTest", "file"); + final Spool<Integer> f = new SerializerFileSpool<Integer>(service, serializer, + deserializer); + test(f); + service.getScratchSpace().delete(); + } + + @Test + public void testInterop() throws ServiceException { + final LocalStorageService service = new LocalStorageService("spoolTest", "file"); + final Codec<Integer> c = new IntegerCodec(); + + + final CodecFileAccumulable<Integer, Codec<Integer>> f = new CodecFileAccumulable<Integer, Codec<Integer>>( + service, c); + final CodecFileIterable<Integer, Codec<Integer>> g = new CodecFileIterable<Integer, Codec<Integer>>( + new File(f.getName()), c); + test(f, g); + service.getScratchSpace().delete(); + } + + protected void test(final Spool<Integer> f) throws ServiceException { + test(f, f); + } + + protected void test(final Accumulable<Integer> f, final Iterable<Integer> g) throws ServiceException { + + try (Accumulator<Integer> acc = f.accumulator()) { + for (int i = 0; i < 1000; i++) { + acc.add(i); + } + } + int i = 0; + for (final int j : g) { + Assert.assertEquals(i, j); + i++; + } + final Iterator<Integer> itA = g.iterator(); + final Iterator<Integer> itB = g.iterator(); + + for (i = 0; i < 1000; i++) { + Assert.assertEquals((int) itA.next(), i); + Assert.assertEquals((int) itB.next(), i); + } + Assert.assertFalse(itA.hasNext()); + Assert.assertFalse(itB.hasNext()); + } + + public static final class RamConf extends ConfigurationModuleBuilder { + public static final ConfigurationModule CONF = new RamConf() + .bindImplementation(RamStorageService.class, RamStorageService.class) + .bindImplementation(Spool.class, RamSpool.class) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java new file mode 100644 index 0000000..07cea31 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.storage; + +import org.apache.reef.exception.evaluator.ServiceException; +import org.apache.reef.io.Accumulator; +import org.apache.reef.io.Tuple; +import org.apache.reef.io.serialization.Deserializer; +import org.apache.reef.io.serialization.Serializer; +import org.apache.reef.io.storage.util.IntegerDeserializer; +import org.apache.reef.io.storage.util.IntegerSerializer; +import org.apache.reef.io.storage.util.StringDeserializer; +import org.apache.reef.io.storage.util.StringSerializer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class TupleSerializerTest { + + private Serializer<Integer, OutputStream> keySerializer; + private Serializer<String, OutputStream> valSerializer; + private Deserializer<Integer, InputStream> keyDeserializer; + private Deserializer<String, InputStream> valDeserializer; + private FramingTupleSerializer<Integer, String> fts; + private ByteArrayOutputStream baos; + private FramingTupleDeserializer<Integer, String> ftd; + private Iterable<Tuple<Integer, String>> iterable; + + @Before + public void setup() throws ServiceException { + + keySerializer = new IntegerSerializer(); + valSerializer = new StringSerializer(); + keyDeserializer = new IntegerDeserializer(); + valDeserializer = new StringDeserializer(); + + fts = new FramingTupleSerializer<Integer, String>( + keySerializer, valSerializer); + + baos = new ByteArrayOutputStream(); + final Accumulator<Tuple<Integer, String>> acc = fts.create(baos).accumulator(); + for (int i = 0; i < 100; i++) { + acc.add(new Tuple<>(i, i + "")); + } + acc.close(); + + ftd = new FramingTupleDeserializer<Integer, String>( + keyDeserializer, valDeserializer); + iterable = ftd.create(new ByteArrayInputStream(baos.toByteArray())); + } + + @Test + public void testFramingSerializer() throws ServiceException, IOException { + int i = 0; + for (final Tuple<Integer, String> t : iterable) { + final Tuple<Integer, String> u = new Tuple<>(i, i + ""); + Assert.assertEquals(u, t); + i++; + } + Assert.assertEquals(100, i); + } + + @Test(expected = NoSuchElementException.class) + public void testReadOffEnd() { + final Iterator<Tuple<Integer, String>> it = iterable.iterator(); + try { + while (it.hasNext()) { + it.next(); + it.hasNext(); + } + } catch (final NoSuchElementException e) { + throw new IllegalStateException("Errored out too early!", e); + } + it.next(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCantRemove() { + final Iterator<Tuple<Integer, String>> it = iterable.iterator(); + it.next(); + it.remove(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java new file mode 100644 index 0000000..b5207f3 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * TODO: Document. + */ +package org.apache.reef.io.storage; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java deleted file mode 100644 index 76ccbc9..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import org.apache.reef.exception.evaluator.NetworkException; -import org.apache.reef.io.network.Connection; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.services.network.util.*; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -// TODO[JIRA REEF-637] Remove the deprecated class. -/** - * Test for deprecated methods, which are deprecated in 0.13, of NetworkConnectionService. - */ -@Deprecated -public final class DeprecatedNetworkConnectionServiceTest { - private static final Logger LOG = Logger.getLogger(DeprecatedNetworkConnectionServiceTest.class.getName()); - - private final LocalAddressProvider localAddressProvider; - private final String localAddress; - private final Identifier groupCommClientId; - private final Identifier shuffleClientId; - - public DeprecatedNetworkConnectionServiceTest() throws InjectionException { - localAddressProvider = LocalAddressProviderFactory.getInstance(); - localAddress = localAddressProvider.getLocalAddress(); - - final IdentifierFactory idFac = new StringIdentifierFactory(); - this.groupCommClientId = idFac.getNewInstance("groupComm"); - this.shuffleClientId = idFac.getNewInstance("shuffle"); - } - - @Rule - public TestName name = new TestName(); - - private void runMessagingNetworkConnectionService(final Codec<String> codec) throws Exception { - final int numMessages = 2000; - final Monitor monitor = new Monitor(); - try (final DeprecatedNetworkMessagingTestService messagingTestService - = new DeprecatedNetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - - try (final Connection<String> conn = messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write("hello" + count); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } - } - - /** - * NetworkConnectionService messaging test. - */ - @Test - public void testMessagingNetworkConnectionService() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runMessagingNetworkConnectionService(new StringCodec()); - } - - /** - * NetworkConnectionService streaming messaging test. - */ - @Test - public void testStreamingMessagingNetworkConnectionService() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runMessagingNetworkConnectionService(new StreamingStringCodec()); - } - - public void runNetworkConnServiceWithMultipleConnFactories(final Codec<String> stringCodec, - final Codec<Integer> integerCodec) - throws Exception { - final ExecutorService executor = Executors.newFixedThreadPool(5); - - final int groupcommMessages = 1000; - final Monitor monitor = new Monitor(); - try (final DeprecatedNetworkMessagingTestService messagingTestService - = new DeprecatedNetworkMessagingTestService(localAddress)) { - - messagingTestService.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec); - - final int shuffleMessages = 2000; - final Monitor monitor2 = new Monitor(); - messagingTestService.registerTestConnectionFactory(shuffleClientId, shuffleMessages, monitor2, integerCodec); - - executor.submit(new Runnable() { - @Override - public void run() { - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - conn.open(); - for (int count = 0; count < groupcommMessages; ++count) { - // send messages to the receiver. - conn.write("hello" + count); - } - monitor.mwait(); - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - }); - - executor.submit(new Runnable() { - @Override - public void run() { - try (final Connection<Integer> conn = - messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) { - conn.open(); - for (int count = 0; count < shuffleMessages; ++count) { - // send messages to the receiver. - conn.write(count); - } - monitor2.mwait(); - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - }); - - monitor.mwait(); - monitor2.mwait(); - executor.shutdown(); - } - } - - /** - * Test NetworkService registering multiple connection factories. - */ - @Test - public void testMultipleConnectionFactoriesTest() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>()); - } - - /** - * Test NetworkService registering multiple connection factories with Streamingcodec. - */ - @Test - public void testMultipleConnectionFactoriesStreamingTest() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec()); - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkConnServiceRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; - - for (final int size : messageSizes) { - final int numMessages = 300000 / (Math.max(1, size / 512)); - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - try (final DeprecatedNetworkMessagingTestService messagingTestService - = new DeprecatedNetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - final long start = System.currentTimeMillis(); - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write(message); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - final long end = System.currentTimeMillis(); - - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + - " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); - - final int numThreads = 4; - final int size = 2000; - final int numMessages = 300000 / (Math.max(1, size / 512)); - final int totalNumMessages = numMessages * numThreads; - - final ExecutorService e = Executors.newCachedThreadPool(); - for (int t = 0; t < numThreads; t++) { - final int tt = t; - - e.submit(new Runnable() { - public void run() { - try (final DeprecatedNetworkMessagingTestService messagingTestService - = new DeprecatedNetworkMessagingTestService(localAddress)) { - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write(message); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - // start and time - final long start = System.currentTimeMillis(); - final Object ignore = new Object(); - for (int i = 0; i < numThreads; i++) { - barrier.add(ignore); - } - e.shutdown(); - e.awaitTermination(100, TimeUnit.SECONDS); - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + - " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars - } - - @Test - public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; - - for (final int size : messageSizes) { - final int numMessages = 300000 / (Math.max(1, size / 512)); - final int numThreads = 2; - final int totalNumMessages = numMessages * numThreads; - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - try (final DeprecatedNetworkMessagingTestService messagingTestService - = new DeprecatedNetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec); - - final ExecutorService e = Executors.newCachedThreadPool(); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - final long start = System.currentTimeMillis(); - for (int i = 0; i < numThreads; i++) { - e.submit(new Runnable() { - @Override - public void run() { - - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write(message); - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - e.shutdown(); - e.awaitTermination(30, TimeUnit.SECONDS); - monitor.mwait(); - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + - " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkConnServiceBatchingRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final int batchSize = 1024 * 1024; - final int[] messageSizes = {32, 64, 512}; - - for (final int size : messageSizes) { - final int numMessages = 300 / (Math.max(1, size / 512)); - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - try (final DeprecatedNetworkMessagingTestService messagingTestService - = new DeprecatedNetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - final long start = System.currentTimeMillis(); - try { - for (int i = 0; i < numMessages; i++) { - final StringBuilder sb = new StringBuilder(); - for (int j = 0; j < batchSize / size; j++) { - sb.append(message); - } - conn.open(); - conn.write(sb.toString()); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - final long numAppMessages = numMessages * batchSize / size; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + - " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java deleted file mode 100644 index 47c8700..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; -import org.apache.reef.io.network.naming.NameResolver; -import org.apache.reef.io.network.naming.exception.NamingException; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.junit.Assert; -import org.junit.Test; - -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutionException; - -public class LocalNameResolverTest { - - private final LocalAddressProvider localAddressProvider; - - public LocalNameResolverTest() throws InjectionException { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); - } - - /** - * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}. - * - * @throws Exception - */ - @Test - public final void testClose() throws Exception { - final String localAddress = localAddressProvider.getLocalAddress(); - final IdentifierFactory factory = new StringIdentifierFactory(); - try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF - .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000) - .build()).getInstance(NameResolver.class)) { - final Identifier id = factory.getNewInstance("Task1"); - resolver.register(id, new InetSocketAddress(localAddress, 7001)); - resolver.unregister(id); - Thread.sleep(100); - } - } - - /** - * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}. - * To check caching behavior with expireAfterAccess & expireAfterWrite - * Changing NameCache's pattern to expireAfterAccess causes this test to fail - * - * @throws Exception - */ - @Test - public final void testLookup() throws Exception { - final IdentifierFactory factory = new StringIdentifierFactory(); - final String localAddress = localAddressProvider.getLocalAddress(); - try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF - .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150) - .build()).getInstance(NameResolver.class)) { - final Identifier id = factory.getNewInstance("Task1"); - final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 7001); - resolver.register(id, socketAddr); - InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry - Assert.assertTrue(socketAddr.equals(lookupAddr)); - resolver.unregister(id); - Thread.sleep(100); - try { - lookupAddr = resolver.lookup(id); - Thread.sleep(100); - //With expireAfterAccess, the previous lookup would reset expiry to 150ms - //more and 100ms wait will not expire the item and will return the cached value - //With expireAfterWrite, the extra wait of 100 ms will expire the item - //resulting in NamingException and the test passes - lookupAddr = resolver.lookup(id); - Assert.assertNull("resolver.lookup(id)", lookupAddr); - } catch (final Exception e) { - if (e instanceof ExecutionException) { - Assert.assertTrue("Execution Exception cause is instanceof NamingException", - e.getCause() instanceof NamingException); - } else { - throw e; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java deleted file mode 100644 index b66f3da..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import org.apache.reef.io.network.naming.*; -import org.apache.reef.io.network.naming.exception.NamingException; -import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; -import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutionException; - -public class NameClientTest { - - private final LocalAddressProvider localAddressProvider; - - public NameClientTest() throws InjectionException { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); - } - - private static final int RETRY_COUNT, RETRY_TIMEOUT; - - static { - final Tang tang = Tang.Factory.getTang(); - try { - RETRY_COUNT = tang.newInjector().getNamedInstance(NameResolverRetryCount.class); - RETRY_TIMEOUT = tang.newInjector().getNamedInstance(NameResolverRetryTimeout.class); - } catch (final InjectionException e1) { - throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1); - } - } - - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - /** - * Test method for {@link org.apache.reef.io.network.naming.NameClient#close()}. - * - * @throws Exception - */ - @Test - public final void testClose() throws Exception { - final String localAddress = localAddressProvider.getLocalAddress(); - final IdentifierFactory factory = new StringIdentifierFactory(); - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int serverPort = server.getPort(); - final Configuration nameResolverConf = NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort) - .set(NameResolverConfiguration.CACHE_TIMEOUT, 10000) - .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT) - .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT) - .build(); - - try (final NameResolver client = - Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) { - final Identifier id = factory.getNewInstance("Task1"); - client.register(id, new InetSocketAddress(localAddress, 7001)); - client.unregister(id); - Thread.sleep(100); - } - } - } - - /** - * Test method for {@link org.apache.reef.io.network.naming.NameClient#lookup()}. - * To check caching behavior with expireAfterAccess & expireAfterWrite - * Changing NameCache's pattern to expireAfterAccess causes this test to fail - * - * @throws Exception - */ - @Test - public final void testLookup() throws Exception { - final String localAddress = localAddressProvider.getLocalAddress(); - final IdentifierFactory factory = new StringIdentifierFactory(); - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int serverPort = server.getPort(); - final Configuration nameResolverConf = NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort) - .set(NameResolverConfiguration.CACHE_TIMEOUT, 150) - .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT) - .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT) - .build(); - - try (final NameResolver client = - Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) { - final Identifier id = factory.getNewInstance("Task1"); - client.register(id, new InetSocketAddress(localAddress, 7001)); - client.lookup(id); // caches the entry - client.unregister(id); - Thread.sleep(100); - try { - InetSocketAddress addr = client.lookup(id); - Thread.sleep(100); - //With expireAfterAccess, the previous lookup would reset expiry to 150ms - //more and 100ms wait will not expire the item and will return the cached value - //With expireAfterWrite, the extra wait of 100 ms will expire the item - //resulting in NamingException and the test passes - addr = client.lookup(id); - Assert.assertNull("client.lookup(id)", addr); - } catch (final Exception e) { - if (e instanceof ExecutionException) { - Assert.assertTrue("Execution Exception cause is instanceof NamingException", - e.getCause() instanceof NamingException); - } else { - throw e; - } - } - } - } - } - -}
