This is an automated email from the ASF dual-hosted git repository. amichair pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit 3b8165d4505d450f5bc7cc529b884c4149d29dd5 Author: Amichai Rothman <[email protected]> AuthorDate: Sat May 9 09:02:07 2026 +0300 Fix whitespace style inconsistencies --- .../aries/rsa/discovery/config/Activator.java | 1 - .../apache/aries/rsa/discovery/mdns/Interest.java | 26 +++-- .../aries/rsa/discovery/mdns/InterestManager.java | 61 ++++++------ .../aries/rsa/discovery/mdns/MdnsDiscovery.java | 106 ++++++++++----------- .../discovery/mdns/PublishingEndpointListener.java | 46 ++++----- .../examples/fastbin/consumer/EchoConsumer.java | 2 +- .../examples/echotcp/consumer/EchoConsumer.java | 2 +- .../apache/aries/rsa/itests/felix/RsaTestBase.java | 2 +- .../rsa/itests/felix/TwoContainerPaxExam.java | 2 - .../rsa/provider/fastbin/FastBinProvider.java | 4 +- .../fastbin/api/ObjectSerializationStrategy.java | 2 +- .../fastbin/api/ProtobufSerializationStrategy.java | 22 ++--- .../provider/fastbin/streams/InputStreamProxy.java | 18 ++-- .../fastbin/streams/OutputStreamProxy.java | 20 ++-- .../fastbin/streams/StreamProviderImpl.java | 12 +-- .../fastbin/tcp/AbstractInvocationStrategy.java | 17 ++-- .../fastbin/tcp/AsyncFutureInvocationStrategy.java | 13 ++- .../fastbin/tcp/AsyncInvocationStrategy.java | 5 +- .../tcp/AsyncPromiseInvocationStrategy.java | 7 +- .../fastbin/tcp/BlockingInvocationStrategy.java | 5 +- .../provider/fastbin/tcp/ClientInvokerImpl.java | 25 +++-- .../rsa/provider/fastbin/tcp/InvocationType.java | 10 +- .../provider/fastbin/tcp/LengthPrefixedCodec.java | 14 +-- .../provider/fastbin/tcp/ServerInvokerImpl.java | 28 +++--- .../rsa/provider/fastbin/tcp/TcpTransport.java | 96 +++++++++---------- .../provider/fastbin/tcp/TcpTransportFactory.java | 4 +- .../provider/fastbin/tcp/TcpTransportServer.java | 10 +- .../rsa/provider/fastbin/tcp/TransportPool.java | 5 +- .../fastbin/util/ClassLoaderObjectInputStream.java | 2 +- .../fastbin/util/IntrospectionSupport.java | 17 ++-- .../rsa/provider/fastbin/util/StringSupport.java | 2 +- .../rsa/provider/fastbin/util/URISupport.java | 1 - .../aries/rsa/provider/fastbin/InvocationTest.java | 26 +++-- .../rsa/provider/fastbin/StreamInvocationTest.java | 20 ++-- .../rsa/provider/fastbin/TransportFailureTest.java | 1 - .../fastbin/streams/InputStreamProxyTest.java | 4 +- .../fastbin/tcp/LengthPrefixedCodecTest.java | 4 +- .../apache/aries/rsa/provider/tcp/TcpProvider.java | 4 +- .../provider/tcp/ser/BasicObjectOutputStream.java | 2 +- .../apache/aries/rsa/provider/tcp/ConfigTest.java | 2 +- .../aries/rsa/provider/tcp/TcpProviderTest.java | 2 +- .../rsa/provider/tcp/myservice/MyServiceImpl.java | 3 - .../aries/rsa/core/ExportRegistrationImpl.java | 1 - .../aries/rsa/core/event/EventAdminSender.java | 2 +- .../rsa/annotations/RSADiscoveryProvider.java | 4 +- .../rsa/annotations/RSADistributionProvider.java | 4 +- .../importer/TopologyManagerImport.java | 2 +- 47 files changed, 308 insertions(+), 360 deletions(-) diff --git a/discovery/config/src/main/java/org/apache/aries/rsa/discovery/config/Activator.java b/discovery/config/src/main/java/org/apache/aries/rsa/discovery/config/Activator.java index 1d4e1bc3..04fed288 100644 --- a/discovery/config/src/main/java/org/apache/aries/rsa/discovery/config/Activator.java +++ b/discovery/config/src/main/java/org/apache/aries/rsa/discovery/config/Activator.java @@ -30,7 +30,6 @@ import org.osgi.service.cm.ManagedServiceFactory; import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.osgi.util.tracker.ServiceTracker; - @Header(name = Constants.BUNDLE_ACTIVATOR, value = "${@class}") @org.osgi.annotation.bundle.Capability( // namespace = "osgi.remoteserviceadmin.discovery", // diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java index 93793668..1fa851c5 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java @@ -47,7 +47,6 @@ public class Interest { private final AtomicReference<List<String>> scopes = new AtomicReference<>(); private final EndpointEventListener listener; - public Interest(Long id, EndpointEventListener listener, Map<String, Object> props) { this.id = id; this.scopes.set(StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE))); @@ -55,20 +54,19 @@ public class Interest { } public void update(Map<String, Object> props) { - List<String> newScopes = StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE)); List<String> oldScopes = this.scopes.getAndSet(newScopes); - + added.values().removeIf(ed -> { Optional<String> newScope = getFirstMatch(ed, newScopes); Optional<String> oldScope = getFirstMatch(ed, oldScopes); EndpointEvent event; boolean remove; String filter; - if(newScope.isPresent()) { + if (newScope.isPresent()) { remove = false; filter = newScope.get(); - if(oldScope.isPresent() && oldScope.get().equals(filter)) { + if (oldScope.isPresent() && oldScope.get().equals(filter)) { event = null; } else { event = new EndpointEvent(MODIFIED, ed); @@ -81,7 +79,7 @@ public class Interest { if (event != null) notifyListener(event, filter); - + return remove; }); } @@ -93,33 +91,33 @@ public class Interest { EndpointEvent event; String filter; if (currentScope.isPresent()) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Listener {} is interested in endpoint {}. It will be {}", id, ed, alreadyAdded ? "MODIFIED" : "ADDED"); } added.put(ed.getId(), ed); event = new EndpointEvent(alreadyAdded ? MODIFIED : ADDED, ed); filter = currentScope.get(); - } else if(alreadyAdded) { - if(LOG.isDebugEnabled()) { + } else if (alreadyAdded) { + if (LOG.isDebugEnabled()) { LOG.debug("Listener {} is no longer interested in endpoint {}. It will be {}", id, ed, "MODIFIED"); } EndpointDescription previous = added.remove(ed.getId()); event = new EndpointEvent(MODIFIED_ENDMATCH, ed); filter = getFirstMatch(previous, scopes).orElse(null); } else { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Listener {} not interested in endpoint {}", id, ed); } return; } - + notifyListener(event, filter); } public void endpointRemoved(String id) { EndpointDescription previous = added.remove(id); - if(previous != null) { - if(LOG.isDebugEnabled()) { + if (previous != null) { + if (LOG.isDebugEnabled()) { LOG.debug("Endpoint {} is no longer available for listener {}", id, this.id); } notifyListener(new EndpointEvent(REMOVED, previous), getFirstMatch(previous, scopes.get()).orElse(null)); @@ -132,7 +130,7 @@ public class Interest { listener, filter, event.getType(), endpoint); listener.endpointChanged(event, filter); } - + private Optional<String> getFirstMatch(EndpointDescription endpoint, List<String> scopes) { return scopes.stream().filter(endpoint::matches).findFirst(); } diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java index 9f285923..cc2143d7 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java @@ -54,65 +54,61 @@ public class InterestManager { private final ConcurrentMap<Long, Interest> interests = new ConcurrentHashMap<>(); private final SseEventSourceFactory eventSourceFactory; - + private final EndpointDescriptionParser parser; - + private final Client client; - + private final ConcurrentMap<String, Set<EndpointDescription>> endpointsBySource = new ConcurrentHashMap<>(); - + private final ConcurrentMap<String, SseEventSource> streams = new ConcurrentHashMap<>(); - + public InterestManager(SseEventSourceFactory factory, EndpointDescriptionParser parser, Client client) { - this.eventSourceFactory = factory; this.parser = parser; this.client = client; - } public void deactivate() { - streams.values().forEach(SseEventSource::close); streams.clear(); - interests.clear(); } public void remoteAdded(String uri) { - if(streams.containsKey(uri)) { + if (streams.containsKey(uri)) { return; } - - if(LOG.isInfoEnabled()) { + + if (LOG.isInfoEnabled()) { LOG.info("Discovered a remote at {}", uri); } - + SseEventSource sse = eventSourceFactory.newBuilder(client.target(uri)).build(); sse.register(i -> onEndpointEvent(uri, i), t -> lostRemoteStream(uri, t), () -> lostRemoteStream(uri, null)); streams.put(uri, sse); sse.open(); } - + public void remoteRemoved(String uri) { - if(LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled()) { LOG.info("Remote at {} is no longer present", uri); } - + SseEventSource sseEventSource = streams.remove(uri); - if(sseEventSource != null) { + if (sseEventSource != null) { sseEventSource.close(); } } - + private void onEndpointEvent(String source, InboundSseEvent event) { String name = event.getName(); - - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("Received a {} notification from {}", name, source); } - - if(ENDPOINT_UPDATED.equals(name)) { + + if (ENDPOINT_UPDATED.equals(name)) { EndpointDescription ed = parser.readEndpoint(event.readData(InputStream.class)); endpointsBySource.compute(source, (a,b) -> { return b == null ? singleton(ed) : concat(b.stream(), Stream.of(ed)).collect(toSet()); @@ -121,7 +117,7 @@ public class InterestManager { } else if (ENDPOINT_REVOKED.equals(name)) { String id = event.readData(); endpointsBySource.compute(source, (a,b) -> { - if(b == null) { + if (b == null) { return null; } else { Set<EndpointDescription> set = b.stream().filter(ed -> !ed.getId().equals(id)).collect(toSet()); @@ -131,21 +127,20 @@ public class InterestManager { interests.values().forEach(i -> i.endpointRemoved(id)); } } - + private void lostRemoteStream(String source, Throwable t) { - - if(t != null) { - if(LOG.isWarnEnabled()) { + if (t != null) { + if (LOG.isWarnEnabled()) { LOG.warn("The remote {} had a failure", source, t); } } else { - if(LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled()) { LOG.info("The remote {} has disconnected", source); } } - + Set<EndpointDescription> remove = endpointsBySource.remove(source); - if(remove != null) { + if (remove != null) { remove.forEach(ed -> interests.values().forEach(i -> i.endpointRemoved(ed.getId()))); } } @@ -155,10 +150,9 @@ public class InterestManager { } public void addInterest(EndpointEventListener epListener, Map<String, Object> props) { - Long id = getServiceId(props); - if(LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled()) { LOG.info("Service {} has registered an interest in endpoint events", id); } @@ -171,10 +165,9 @@ public class InterestManager { } public void updateInterest(Map<String, Object> props) { - Long id = getServiceId(props); - if(LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled()) { LOG.info("Service {} has changed its interest in endpoint events", id); } diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java index 3493da58..2327ee7b 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java @@ -62,19 +62,18 @@ public class MdnsDiscovery { private static final String _ARIES_DISCOVERY_HTTP_TCP_LOCAL = "_aries-discovery._tcp.local."; private static final Logger LOG = LoggerFactory.getLogger(MdnsDiscovery.class); - + private final Client client; - + private final String fwUuid; - + private final InterestManager interestManager; - + private final PublishingEndpointListener publishingListener; - + private JaxrsServiceRuntime runtime; - - private JmDNS jmdns; + private JmDNS jmdns; @Activate public MdnsDiscovery(BundleContext ctx, @Reference SseEventSourceFactory eventSourceFactory, @@ -84,7 +83,7 @@ public class MdnsDiscovery { fwUuid = ctx.getProperty(FRAMEWORK_UUID); this.publishingListener = new PublishingEndpointListener(parser, ctx, fwUuid); } - + @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) public void bindEndpointEventListener(EndpointEventListener epListener, Map<String, Object> props) { interestManager.addInterest(epListener, props); @@ -110,13 +109,13 @@ public class MdnsDiscovery { public void unbindJaxrsServiceRuntime(JaxrsServiceRuntime runtime) { JmDNS jmdns = null; synchronized (this) { - if(runtime == this.runtime) { + if (runtime == this.runtime) { jmdns = this.jmdns; this.runtime = null; } } - - if(jmdns != null) { + + if (jmdns != null) { jmdns.unregisterAllServices(); } } @@ -127,36 +126,36 @@ public class MdnsDiscovery { this.runtime = runtime; jmdns = this.jmdns; } - - if(jmdns != null) { + + if (jmdns != null) { RuntimeDTO runtimeDTO = runtime.getRuntimeDTO(); List<String> uris = StringPlus.normalize(runtimeDTO.serviceDTO.properties.get(JAX_RS_SERVICE_ENDPOINT)); - - if(uris == null || uris.isEmpty()) { + + if (uris == null || uris.isEmpty()) { LOG.warn("Unable to advertise discovery as there are no endpoint URIs"); return; } - + String base = runtimeDTO.defaultApplication.base; - if(base == null) { + if (base == null) { base = ""; } - + base += "/aries/rsa/discovery"; - + URI uri = uris.stream() .filter(s -> s.matches(".*(?:[0-9]{1,3}\\.){3}[0-9]{1,3}.*")) .findFirst() .map(URI::create) .orElseGet(() -> URI.create(uris.get(0))); - + Map<String, Object> props = new HashMap<>(); props.put("scheme", uri.getScheme() == null ? "" : uri.getScheme()); props.put("path", uri.getPath() == null ? base : uri.getPath() + base); props.put("frameworkUuid", fwUuid); - + ServiceInfo info = ServiceInfo.create(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, fwUuid, uri.getPort(), 0, 0, props); - + try { jmdns.registerService(info); } catch (IOException ioe) { @@ -164,91 +163,90 @@ public class MdnsDiscovery { } } } - + public @interface Config { String bind_address(); } - + @Activate public void start(Config config) throws UnknownHostException, IOException { String bind = config.bind_address(); - + JmDNS jmdns = JmDNS.create(bind == null ? null : InetAddress.getByName(bind)); - + JaxrsServiceRuntime runtime; synchronized (this) { this.jmdns = jmdns; runtime = this.runtime; } - - if(runtime != null) { + + if (runtime != null) { updateAndRegister(runtime); } - + // Add a service listener jmdns.addServiceListener(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, new MdnsListener()); - } - + @Deactivate - public void stop () { + public void stop() { try { jmdns.close(); } catch (IOException e) { LOG.warn("An exception occurred closing the mdns discovery"); } - + interestManager.deactivate(); publishingListener.stop(); } private class MdnsListener implements ServiceListener { - + private final ConcurrentMap<String, String> namesToUris = new ConcurrentHashMap<>(); - + @Override public void serviceAdded(ServiceEvent event) { } - + @Override public void serviceRemoved(ServiceEvent event) { ServiceInfo info = event.getInfo(); - if(info != null) { + if (info != null) { String removed = namesToUris.remove(info.getKey()); - if(removed != null) { + if (removed != null) { interestManager.remoteRemoved(removed); } } } - + @Override public void serviceResolved(ServiceEvent event) { ServiceInfo info = event.getInfo(); - + String infoUuid = info.getPropertyString("frameworkUuid"); - - if(infoUuid == null || infoUuid.equals(fwUuid)) { + + if (infoUuid == null || infoUuid.equals(fwUuid)) { // Ignore until we can see if this is for our own endpoint return; } - + String scheme = info.getPropertyString("scheme"); - if(scheme == null) { + if (scheme == null) { scheme = "http"; } - + String path = info.getPropertyString("path"); - if(path == null) { + if (path == null) { // Not a complete record yet return; } if (path.startsWith("/")) { path = path.substring(1); } - + int port = info.getPort(); - if(port == -1) { + if (port == -1) { switch(scheme) { case "http": port = 80; @@ -257,19 +255,19 @@ public class MdnsDiscovery { port = 443; break; default: - LOG.error("Unknown URI scheme advertised {} by framework {} on host {}", + LOG.error("Unknown URI scheme advertised {} by framework {} on host {}", scheme, info.getName(), info.getDomain()); } } - + String address = info.getInetAddresses()[0].getHostAddress(); - + String uri = String.format("%s://%s:%d/%s", scheme, address, port, path); - + LOG.info("Discovered remote at {}", uri); - + namesToUris.put(info.getKey(), uri); - + interestManager.remoteAdded(uri); } } diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java index 9794c903..0f7049fe 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java @@ -62,16 +62,16 @@ import org.slf4j.LoggerFactory; public class PublishingEndpointListener { private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class); - + private final String uuid; private final EndpointDescriptionParser parser; - + private final ServiceRegistration<?> listenerReg; private final ServiceRegistration<?> resourceReg; - + private final ConcurrentMap<String, SponsoredEndpoint> localEndpoints = new ConcurrentHashMap<>(); - + private final Set<Subscription> listeners = ConcurrentHashMap.newKeySet(); @SuppressWarnings("serial") @@ -81,7 +81,7 @@ public class PublishingEndpointListener { String[] ifAr = { EndpointEventListener.class.getName() }; Dictionary<String, Object> props = serviceProperties(uuid); listenerReg = bctx.registerService(ifAr, new ListenerFactory(), props); - resourceReg = bctx.registerService(PublishingEndpointListener.class, this, + resourceReg = bctx.registerService(PublishingEndpointListener.class, this, new Hashtable<>() {{put("osgi.jaxrs.resource", Boolean.TRUE);}}); } @@ -94,7 +94,7 @@ public class PublishingEndpointListener { private void endpointUpdate(Long bundleId, EndpointDescription ed, int type) { String edFwUuid = ed.getFrameworkUUID(); - if(edFwUuid == null || !edFwUuid.equals(uuid)) { + if (edFwUuid == null || !edFwUuid.equals(uuid)) { LOG.warn("This listener has been called with an endpoint {} for a remote framework {}", ed.getId(), edFwUuid); return; } @@ -112,7 +112,7 @@ public class PublishingEndpointListener { case EndpointEvent.MODIFIED_ENDMATCH: case EndpointEvent.REMOVED: boolean act = localEndpoints.compute(id, (k,v) -> { - if(v == null) { + if (v == null) { return null; } else { Set<Long> updated = v.sponsors.stream().filter(l -> !bundleId.equals(l)).collect(toSet()); @@ -120,7 +120,7 @@ public class PublishingEndpointListener { } }) == null; - if(act) { + if (act) { listeners.forEach(s -> s.revoke(id)); } break; @@ -147,14 +147,14 @@ public class PublishingEndpointListener { throw new RuntimeException(e); } } - + @GET @Produces(SERVER_SENT_EVENTS) @Path("aries/rsa/discovery") public void listen(@Context Sse sse, @Context SseEventSink sink) { Subscription subscription = new Subscription(sse, sink); listeners.add(subscription); - + localEndpoints.values().stream() .map(s -> toEndpointData(s.ed)) .forEach(subscription::update); @@ -176,13 +176,13 @@ public class PublishingEndpointListener { .filter(s -> s.sponsors.contains(bundleId)) .forEach(s -> endpointUpdate(bundleId, s.ed, EndpointEvent.REMOVED)); } - + } - + private class PerClientEndpointEventListener implements EndpointEventListener { - + private final Long bundleId; - + public PerClientEndpointEventListener(Long bundleId) { super(); this.bundleId = bundleId; @@ -195,13 +195,13 @@ public class PublishingEndpointListener { } class Subscription { - + static final String ENDPOINT_UPDATED = "UPDATED"; static final String ENDPOINT_REVOKED = "REVOKED"; - + Sse sse; SseEventSink eventSink; - + public Subscription(Sse sse, SseEventSink eventSink) { this.sse = sse; this.eventSink = eventSink; @@ -211,30 +211,30 @@ public class PublishingEndpointListener { eventSink.send(sse.newEvent(ENDPOINT_UPDATED, endpointData)) .whenComplete(this::sendFailure); } - + public void revoke(String endpointId) { eventSink.send(sse.newEvent(ENDPOINT_REVOKED, endpointId)) .whenComplete(this::sendFailure); } - + public void close() { eventSink.close(); listeners.remove(this); } - + private void sendFailure(Object o, Throwable t) { - if(t != null) { + if (t != null) { LOG.error("Failed to send endpoint message, closing"); listeners.remove(this); eventSink.close(); } } } - + private static class SponsoredEndpoint { private final EndpointDescription ed; private final Set<Long> sponsors; - + public SponsoredEndpoint(EndpointDescription ed, Set<Long> sponsors) { this.ed = ed; this.sponsors = sponsors; diff --git a/examples/echofastbin/fbconsumer/src/main/java/org/apache/aries/rsa/examples/fastbin/consumer/EchoConsumer.java b/examples/echofastbin/fbconsumer/src/main/java/org/apache/aries/rsa/examples/fastbin/consumer/EchoConsumer.java index b82b8274..77afd205 100644 --- a/examples/echofastbin/fbconsumer/src/main/java/org/apache/aries/rsa/examples/fastbin/consumer/EchoConsumer.java +++ b/examples/echofastbin/fbconsumer/src/main/java/org/apache/aries/rsa/examples/fastbin/consumer/EchoConsumer.java @@ -30,7 +30,7 @@ import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; -@Component(immediate=true) +@Component(immediate = true) public class EchoConsumer { EchoService echoService; diff --git a/examples/echotcp/consumer/src/main/java/org/apache/aries/rsa/examples/echotcp/consumer/EchoConsumer.java b/examples/echotcp/consumer/src/main/java/org/apache/aries/rsa/examples/echotcp/consumer/EchoConsumer.java index 8603060c..4fd0409b 100644 --- a/examples/echotcp/consumer/src/main/java/org/apache/aries/rsa/examples/echotcp/consumer/EchoConsumer.java +++ b/examples/echotcp/consumer/src/main/java/org/apache/aries/rsa/examples/echotcp/consumer/EchoConsumer.java @@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; -@Component(immediate=true) +@Component(immediate = true) public class EchoConsumer { EchoService echoService; diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java index d7b486d2..09af4933 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java @@ -81,7 +81,7 @@ public class RsaTestBase { Bundle serviceBundle = null; Bundle[] bundles = bundleContext.getBundles(); for (Bundle bundle : bundles) { - if(symName.equals(bundle.getSymbolicName())) { + if (symName.equals(bundle.getSymbolicName())) { serviceBundle = bundle; break; } diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java index a3e2277f..b41cb7d4 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java @@ -47,12 +47,10 @@ public class TwoContainerPaxExam extends PaxExam { public void run(RunNotifier notifier) { TestContainer remoteContainer = null; try { - ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig()); remoteContainer = PaxExamRuntime.createContainer(testSystem); remoteContainer.start(); super.run(notifier); - } catch (Exception e) { throw new RuntimeException(e); } finally { diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java index 997dcf42..200a8e35 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java @@ -80,10 +80,10 @@ public class FastBinProvider implements DistributionProvider { final Semaphore counter = new Semaphore(0); server.stop(() -> counter.release(1)); try { - if(!counter.tryAcquire(1, 30, TimeUnit.SECONDS)) { + if (!counter.tryAcquire(1, 30, TimeUnit.SECONDS)) { LOG.warn("Server/Client failed to shut down in time. Proceeding shutdown anyway..."); } - } catch(InterruptedException e) { + } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for Server/Client shutdown"); } } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java index 3167c430..ea3d474a 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java @@ -61,7 +61,7 @@ public class ObjectSerializationStrategy implements SerializationStrategy { final ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(source); ois.setClassLoader(loader); final Object[] args = (Object[]) ois.readObject(); - if( args!=null ) { + if (args != null) { System.arraycopy(args, 0, target, 0, args.length); } } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java index edb07a5b..b16867cf 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java @@ -42,18 +42,18 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { } private void encodeProtobuf(Class<?> type, Object arg, DataByteArrayOutputStream target) throws IOException { - if( !PBMessage.class.isAssignableFrom(type) ) { + if (!PBMessage.class.isAssignableFrom(type)) { throw new IllegalArgumentException("Invalid "+name()+" serialization method: method argument not a "+PBMessage.class.getName()); } PBMessage msg = (PBMessage) arg; - if( msg==null ) { + if (msg == null) { return; } msg.freeze().writeUnframed(target); } private Object decodeProtobuf(Class<?> type, DataByteArrayInputStream source) throws IllegalAccessException, NoSuchFieldException, IOException { - if( !PBMessage.class.isAssignableFrom(type) ) { + if (!PBMessage.class.isAssignableFrom(type)) { throw new IllegalArgumentException("Invalid "+name()+" serialization method: method argument not a "+PBMessage.class.getName()); } @@ -62,7 +62,7 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { PBMessage msg = factory.parseUnframed(source); String name = type.getName(); Object rc; - if( name.endsWith("$Getter") || name.endsWith("$Buffer") ) { + if (name.endsWith("$Getter") || name.endsWith("$Buffer")) { // Interface is ok we us giving them a read only impl. rc = msg; } else { @@ -73,9 +73,9 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { } public void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws IOException { - if( types.length == 0 ) { + if (types.length == 0) { return; - } else if( types.length == 1 ) { + } else if (types.length == 1) { encodeProtobuf(types[0], args[0], target); } else { throw new IllegalArgumentException("Invalid "+name()+" serialization method: methods must have zero or one argument."); @@ -83,9 +83,9 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { } public void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws IOException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException { - if( types.length == 0 ) { + if (types.length == 0) { return; - } else if( types.length == 1 ) { + } else if (types.length == 1) { target[0] = decodeProtobuf(types[0], source); } else { throw new IllegalArgumentException("Invalid "+name()+" serialization method: methods must have zero or one argument."); @@ -93,7 +93,7 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { } public void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws IOException, ClassNotFoundException { - if( error!=null ) { + if (error != null) { target.writeBoolean(true); target.writeUTF(error.getClass().getName()); target.writeUTF(error.getMessage()); @@ -105,7 +105,7 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { @SuppressWarnings("unchecked") public void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws IOException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException { - if( source.readBoolean() ) { + if (source.readBoolean()) { String className = source.readUTF(); String message = source.readUTF(); @@ -119,11 +119,9 @@ public class ProtobufSerializationStrategy implements SerializationStrategy { error = new RuntimeException(className+": "+message); } result.onFailure(error); - } else { result.onSuccess(decodeProtobuf(type, source)); } - } } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java index 6e62f8c8..ca0bc5be 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java @@ -73,12 +73,12 @@ public class InputStreamProxy extends InputStream implements Serializable { * @see java.io.InputStream#read() */ public int readInternal() throws IOException { - if(buffer == null || position==buffer.length) + if (buffer == null || position == buffer.length) fillBuffer(); - if(position==buffer.length) { + if (position == buffer.length) { //still no data. - if(reachedEnd) + if (reachedEnd) return -1; //try again return read(); @@ -87,12 +87,12 @@ public class InputStreamProxy extends InputStream implements Serializable { } private void fillBuffer() throws IOException { - if(reachedEnd) { + if (reachedEnd) { return; } position = 0; Chunk chunk = streamProvider.read(streamID); - if(expectedChunkNumber!=chunk.getChunkNumber()) + if (expectedChunkNumber != chunk.getChunkNumber()) throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+expectedChunkNumber); expectedChunkNumber++; buffer = chunk.getData(); @@ -100,11 +100,11 @@ public class InputStreamProxy extends InputStream implements Serializable { } public int readInternal(byte[] b, int off, int len) throws IOException { - if(len==0) + if (len == 0) return 0; int available = available(); - if(available <= 0) { - if(reachedEnd) + if (available <= 0) { + if (reachedEnd) return -1; fillBuffer(); return read(b, off, len); @@ -124,7 +124,7 @@ public class InputStreamProxy extends InputStream implements Serializable { @Override public int available() throws IOException { - if(buffer == null) + if (buffer == null) return 0; return buffer.length - position; } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java index 099c6cd6..23a7bcb9 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java @@ -80,41 +80,39 @@ public class OutputStreamProxy extends OutputStream implements Serializable { public void write(int b) throws IOException { try{ writeInternal(b); - } catch(IOException e) { + } catch (IOException e) { closeSilent(); throw e; } - } @Override public void write(byte[] b, int off, int len) throws IOException { try{ writeInternal(b, off, len); - } catch(IOException e) { + } catch (IOException e) { closeSilent(); throw e; } } public void writeInternal(int b) throws IOException { - if(position == buffer.length) + if (position == buffer.length) flush(); buffer[position++] = (byte)b; - } public void writeInternal(byte[] b, int off, int len) throws IOException { - if(len <= 0) + if (len <= 0) return; int processed = 0; - while(processed < len) { + while (processed < len) { int available = buffer.length - position; int chunkLength = Math.min(len - processed, available); System.arraycopy(b, off, buffer, position, chunkLength); position += chunkLength; processed += chunkLength; - if(processed < len) { + if (processed < len) { //there is more to go, but now the buffer is full -> flush it flush(); } @@ -125,17 +123,17 @@ public class OutputStreamProxy extends OutputStream implements Serializable { public void flush() throws IOException { try{ flushInternal(); - } catch(IOException e) { + } catch (IOException e) { closeSilent(); throw e; } } public void flushInternal() throws IOException { - if(position==0) + if (position == 0) return; byte[] toSend = buffer; - if(position < buffer.length) { + if (position < buffer.length) { toSend = new byte[position]; System.arraycopy(buffer, 0, toSend, 0, position); } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java index 60cf76ac..43fc3373 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java @@ -33,7 +33,7 @@ public class StreamProviderImpl implements StreamProvider { protected static final int CHUNK_SIZE = 4096 * 16; //64k private static final byte[] EMPTY = new byte[0]; - ThreadLocal<byte[]> buffer = new ThreadLocal<byte[]>(){ + ThreadLocal<byte[]> buffer = new ThreadLocal<byte[]>() { @Override protected byte[] initialValue() { return new byte[CHUNK_SIZE]; @@ -59,7 +59,7 @@ public class StreamProviderImpl implements StreamProvider { public void close(int streamID) throws IOException { Closeable stream = streams.remove(streamID); chunks.remove(streamID); - if(stream != null) { + if (stream != null) { stream.close(); } } @@ -70,11 +70,11 @@ public class StreamProviderImpl implements StreamProvider { AtomicInteger chunkNumber = chunks.get(streamID); byte[] result = buffer.get(); int read = inputStream.read(result); - if(read<0) { + if (read < 0) { close(streamID); //we are finished, best clean it up right away return new Chunk(EMPTY, chunkNumber.incrementAndGet(), true); } - if(read!=result.length) { + if (read != result.length) { byte[] tmp = new byte[read]; System.arraycopy(result, 0, tmp, 0, read); result = tmp; @@ -86,7 +86,7 @@ public class StreamProviderImpl implements StreamProvider { public void write(int streamID, Chunk chunk) throws IOException { OutputStream out = getStream(streamID); int nextChunkNumber = chunks.get(streamID).incrementAndGet(); - if(chunk.getChunkNumber() != nextChunkNumber) { + if (chunk.getChunkNumber() != nextChunkNumber) { throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+nextChunkNumber); } out.write(chunk.getData()); @@ -95,7 +95,7 @@ public class StreamProviderImpl implements StreamProvider { @SuppressWarnings({"unchecked"}) private <T extends Closeable> T getStream(int id) throws IOException { Closeable closeable = streams.get(id); - if(closeable == null) + if (closeable == null) throw new IOException("No Stream with id " + id + "available"); try { return (T)closeable; diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java index 0dd38eb6..6bfeb77c 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java @@ -47,10 +47,10 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy { protected void replaceStreamParameters(Method method, Object[] args) { Class< ? >[] types = method.getParameterTypes(); - if(args==null) + if (args == null) return; for (int i = 0; i < args.length; i++) { - if(isStream(types[i])) { + if (isStream(types[i])) { args[i] = replaceStream(args[i]); } } @@ -71,7 +71,7 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy { } protected boolean isStream(Class<?> clazz) { - return clazz==InputStream.class || clazz==OutputStream.class; + return clazz == InputStream.class || clazz == OutputStream.class; } /** @@ -100,24 +100,20 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy { @Override public final void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) { - if(method==null && target instanceof ServiceException) { + if (method == null && target instanceof ServiceException) { handleInvalidRequest(serializationStrategy, loader, method, target, responseStream, onComplete); return; } doService(serializationStrategy, loader, method, target, requestStream, responseStream, onComplete); - } protected void handleInvalidRequest(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayOutputStream responseStream, Runnable onComplete) { //client made an invalid request int pos = responseStream.position(); try { - Throwable error = (Throwable)target; serializationStrategy.encodeResponse(loader, null, null, error, responseStream); - - } catch(Exception e) { - + } catch (Exception e) { LOG.warn("Initial Encoding response for method {} failed. Retrying", method, e); // we failed to encode the response... reposition and write that error try { @@ -126,7 +122,6 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy { } catch (Exception unexpected) { LOG.error("Error while servicing {}", method, unexpected); } - } finally { onComplete.run(); } @@ -169,7 +164,7 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy { } public void send(Throwable error, Object value) { - if( responded.compareAndSet(false, true) ) { + if (responded.compareAndSet(false, true)) { Class resultType = getResultType(method); try { serializationStrategy.encodeResponse(loader, resultType, value, error, responseStream); diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java index e520ed62..a523355d 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java @@ -55,7 +55,7 @@ public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { serializationStrategy.decodeRequest(loader, types, requestStream, args); Future<Object> future = (Future<Object>)method.invoke(target, args); CompletableFuture<Object> completable; - if(future instanceof CompletableFuture) { + if (future instanceof CompletableFuture) { completable = (CompletableFuture<Object>)future; } else { @@ -66,7 +66,6 @@ public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { helper.send(exception, returnValue); } }); - } catch (Throwable t) { helper.send(t, null); } @@ -105,7 +104,7 @@ public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { } public void set(final DataByteArrayInputStream source) { - if( queue != null ) { + if (queue != null) { queue.execute(new Runnable() { public void run() { decodeIt(source); @@ -167,7 +166,7 @@ public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { @Override public void run() { - while(true) { + while (true) { // all currently available entries will be processed int takenPermits = Math.max(1, counter.availablePermits()); try { @@ -179,12 +178,12 @@ public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { Set<Entry<Future<Object>, CompletableFuture<Object >>> entrySet = futures.entrySet(); int processed = 0; for (Entry<Future<Object>, CompletableFuture<Object>> entry : entrySet) { - if(processed == takenPermits) { + if (processed == takenPermits) { //we only release as many as we took permits. The remainder will be handled in the next iteration break; } Future< ? > future = entry.getKey(); - if(future.isDone()) { + if (future.isDone()) { try { Object object = future.get(); entry.getValue().complete(object); @@ -213,7 +212,7 @@ public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { } public CompletableFuture<Object> complete(Future<Object> future) { - if(started.compareAndSet(false, true)) { + if (started.compareAndSet(false, true)) { start(); } CompletableFuture<Object> completable = new CompletableFuture<>(); diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java index 2ee1a113..db3ce7f1 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java @@ -62,7 +62,7 @@ public class AsyncInvocationStrategy extends AbstractInvocationStrategy { } public void set(final DataByteArrayInputStream source) { - if( queue!=null ) { + if (queue != null) { queue.execute(new Runnable() { public void run() { decodeIt(source); @@ -122,7 +122,6 @@ public class AsyncInvocationStrategy extends AbstractInvocationStrategy { final AsyncServiceResponse helper = new AsyncServiceResponse(loader, method, responseStream, onComplete, serializationStrategy); try { - Object[] new_args = new Object[method.getParameterTypes().length]; serializationStrategy.decodeRequest(loader, payloadTypes(method), requestStream, new_args); new_args[new_args.length - 1] = new AsyncCallback<Object>() { @@ -134,10 +133,8 @@ public class AsyncInvocationStrategy extends AbstractInvocationStrategy { } }; method.invoke(target, new_args); - } catch (Throwable t) { helper.send(t, null); } - } } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java index 793bb0b0..27cae452 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java @@ -46,13 +46,12 @@ public class AsyncPromiseInvocationStrategy extends AbstractInvocationStrategy { final Promise<Object> promise = (Promise<Object>)method.invoke(target, args); promise.onResolve(() -> { try{ - helper.send(promise.getFailure(), promise.getFailure()==null ? promise.getValue() : null); + helper.send(promise.getFailure(), promise.getFailure() == null ? promise.getValue() : null); } - catch (Exception e){ + catch (Exception e) { helper.send(e, null); } }); - } catch (Throwable t) { helper.send(t, null); } @@ -91,7 +90,7 @@ public class AsyncPromiseInvocationStrategy extends AbstractInvocationStrategy { } public void set(final DataByteArrayInputStream source) { - if( queue != null ) { + if (queue != null) { queue.execute(new Runnable() { public void run() { decodeIt(source); diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java index a67e312e..6415e101 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java @@ -100,7 +100,7 @@ public class BlockingInvocationStrategy extends AbstractInvocationStrategy { final Object[] args = new Object[types.length]; serializationStrategy.decodeRequest(loader, types, requestStream, args); value = method.invoke(target, args); - if(isStream(method.getReturnType())) { + if (isStream(method.getReturnType())) { value = replaceStream(value); } } catch (Throwable t) { @@ -113,7 +113,7 @@ public class BlockingInvocationStrategy extends AbstractInvocationStrategy { serializationStrategy.encodeResponse(loader, method.getReturnType(), value, error, responseStream); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("Initial Encoding response for method {} failed. Retrying", method, e); // we failed to encode the response... reposition and write that error. @@ -123,7 +123,6 @@ public class BlockingInvocationStrategy extends AbstractInvocationStrategy { } catch (Exception unexpected) { LOG.error("Error while servicing {}", method, unexpected); } - } finally { onComplete.run(); } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java index 8366472b..d9d988c9 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java @@ -136,12 +136,12 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { protected void onCommand(TransportPool pool, Object data) { try { - DataByteArrayInputStream bais = new DataByteArrayInputStream( (Buffer) data); + DataByteArrayInputStream bais = new DataByteArrayInputStream((Buffer) data); bais.readInt(); long correlation = bais.readVarLong(); pool.onDone(correlation); ResponseFuture response = requests.remove(correlation); - if( response!=null ) { + if (response != null) { response.set(bais); } } catch (Exception e) { @@ -151,7 +151,7 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { protected void onFailure(Object id, Throwable throwable) { ResponseFuture response = requests.remove(id); - if( response!=null ) { + if (response != null) { response.fail(throwable); } } @@ -175,13 +175,13 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { synchronized (method_cache) { rc = method_cache.get(method); } - if( rc==null ) { + if (rc == null) { StringBuilder sb = new StringBuilder(); sb.append(method.getName()); sb.append(","); Class<?>[] types = method.getParameterTypes(); - for(int i = 0; i < types.length; i++) { - if( i != 0 ) { + for (int i = 0; i < types.length; i++) { + if (i != 0) { sb.append(","); } sb.append(encodeClassName(types[i])); @@ -190,9 +190,9 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { Serialization annotation = method.getAnnotation(Serialization.class); SerializationStrategy serializationStrategy; - if( annotation!=null ) { + if (annotation != null) { serializationStrategy = serializationStrategies.get(annotation.value()); - if( serializationStrategy==null ) { + if (serializationStrategy == null) { throw new RuntimeException("Could not find the serialization strategy named: "+annotation.value()); } } else { @@ -210,10 +210,10 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { } String encodeClassName(Class<?> type) { - if( type.getComponentType()!=null ) { + if (type.getComponentType() != null) { return "["+ encodeClassName(type.getComponentType()); } - if( type.isPrimitive() ) { + if (type.isPrimitive()) { return CLASS_TO_PRIMITIVE.get(type); } else { return "L"+type.getName(); @@ -293,7 +293,7 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { - if(method.getDeclaringClass()==Object.class) { + if (method.getDeclaringClass() == Object.class) { if (args != null && args.length == 1 && "equals".equals(method.getName())) { //special treatment for equals to make sure proxy.equals(proxy) -> true @@ -320,13 +320,12 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { } Class< ? >[] exceptionTypes = method.getExceptionTypes(); for (Class< ? > exceptionType : exceptionTypes) { - if(exceptionType.isAssignableFrom(e.getClass())) + if (exceptionType.isAssignableFrom(e.getClass())) throw e; } throw new ServiceException(e.getMessage(), e); } } - } protected class InvokerTransportPool extends TransportPool { diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java index 1012a0fe..e8fbef28 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java @@ -25,14 +25,14 @@ import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; import org.osgi.util.promise.Promise; public enum InvocationType { - ASYNC_FUTURE(new AsyncFutureInvocationStrategy()){ + ASYNC_FUTURE(new AsyncFutureInvocationStrategy()) { @Override protected boolean applies(Method method) { return Future.class.isAssignableFrom(method.getReturnType()); } - }, ASYNC_CALLBACK(new AsyncInvocationStrategy()){ + }, ASYNC_CALLBACK(new AsyncInvocationStrategy()) { @Override protected boolean applies(Method method) { @@ -40,14 +40,14 @@ public enum InvocationType { return types.length != 0 && types[types.length - 1] == AsyncCallback.class; } - }, PROMISE(new AsyncPromiseInvocationStrategy()){ + }, PROMISE(new AsyncPromiseInvocationStrategy()) { @Override protected boolean applies(Method method) { return promiseAvailable && Promise.class.isAssignableFrom(method.getReturnType()); } - }, BLOCKING(new BlockingInvocationStrategy()){ + }, BLOCKING(new BlockingInvocationStrategy()) { @Override protected boolean applies(Method method) { @@ -69,7 +69,7 @@ public enum InvocationType { public static InvocationStrategy forMethod(Method method) { InvocationType[] values = values(); for (InvocationType invocationType : values) { - if(invocationType.applies(method)) { + if (invocationType.applies(method)) { return invocationType.strategy; } } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodec.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodec.java index c2a2523e..cbcd3bd7 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodec.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodec.java @@ -86,7 +86,7 @@ public class LengthPrefixedCodec implements ProtocolCodec { public BufferState flush() throws IOException { final long writeCounterBeforeFlush = write_counter; - while(!next_write_buffers.isEmpty()) { + while (!next_write_buffers.isEmpty()) { final ByteBuffer nextBuffer = next_write_buffers.peek(); if (nextBuffer.remaining() < 1) { next_write_buffers.remove(); @@ -130,8 +130,8 @@ public class LengthPrefixedCodec implements ProtocolCodec { } public Object read() throws IOException { - while(true) { - if( read_buffer.remaining()!=0 ) { + while (true) { + if (read_buffer.remaining() != 0) { // keep reading from the channel until we fill the read buffer int count = read_channel.read(read_buffer); if (count == -1) { @@ -144,16 +144,16 @@ public class LengthPrefixedCodec implements ProtocolCodec { //read buffer is full... interpret it read_buffer.flip(); - if( read_buffer.capacity() == 4 ) { + if (read_buffer.capacity() == 4) { // Finding out the int size = read_buffer.getInt(0); - if( size < 4 ) { + if (size < 4) { throw new ProtocolException("Expecting a size greater than 3"); } - else if( size > MAX_PACKET_SIZE ) { + else if (size > MAX_PACKET_SIZE) { throw new ProtocolException("Packet length was declared as " + size + " but at most " + MAX_PACKET_SIZE + "is allowed. You can configure this limit with the system property aries.fastbin.max.packet.bytes"); } - if( size == 4 ) { + if (size == 4) { // weird... empty frame... guess it could happen. Buffer rc = new Buffer(read_buffer); read_buffer = ByteBuffer.allocate(4); diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java index b4906648..e7479bd8 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java @@ -103,20 +103,20 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { private MethodData getMethodData(Buffer data) throws IOException, NoSuchMethodException, ClassNotFoundException { MethodData rc = method_cache.get(data); - if( rc == null ) { + if (rc == null) { String[] parts = data.utf8().toString().split(","); String name = parts[0]; Class[] params = new Class[parts.length - 1]; - for( int i = 0; i < params.length; i++) { + for (int i = 0; i < params.length; i++) { params[i] = decodeClass(parts[i + 1]); } Method method = clazz.getMethod(name, params); Serialization annotation = method.getAnnotation(Serialization.class); SerializationStrategy serializationStrategy; - if( annotation!=null ) { + if (annotation != null) { serializationStrategy = serializationStrategies.get(annotation.value()); - if( serializationStrategy==null ) { + if (serializationStrategy == null) { throw new RuntimeException("Could not find the serialization strategy named: "+annotation.value()); } } else { @@ -132,18 +132,17 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { } private Class<?> decodeClass(String s) throws ClassNotFoundException { - if( s.startsWith("[")) { + if (s.startsWith("[")) { Class<?> nested = decodeClass(s.substring(1)); return Array.newInstance(nested, 0).getClass(); } String c = s.substring(0, 1); - if( c.equals("L") ) { + if (c.equals("L")) { return loader.loadClass(s.substring(1)); } else { return PRIMITIVE_TO_CLASS.get(c); } } - } public ServerInvokerImpl(String address, DispatchQueue queue, Map<String, SerializationStrategy> serializationStrategies) throws Exception { @@ -206,7 +205,7 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { } @Override - public void unget(){ + public void unget() { // nothing to do } }, getClass().getClassLoader()); @@ -241,13 +240,13 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { final ServiceFactoryHolder holder = holders.get(service); Runnable task = null; - if(holder==null) { + if (holder == null) { String message = "The requested service {"+service+"} is not available"; LOG.warn(message); task = new SendTask(bais, correlation, transport, message); } - final Object svc = holder==null ? null : holder.factory.get(); - if(holder!=null) { + final Object svc = holder == null ? null : holder.factory.get(); + if (holder != null) { try { final MethodData methodData = holder.getMethodData(encoded_method); task = new SendTask(svc, bais, holder, correlation, methodData, transport); @@ -261,13 +260,12 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { } Executor executor; - if( svc instanceof Dispatched ) { + if (svc instanceof Dispatched) { executor = ((Dispatched)svc).queue(); } else { executor = blockingExecutor; } executor.execute(task); - } catch (Exception e) { LOG.info("Error while reading request", e); } @@ -351,10 +349,10 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { // Let's decode the remaining args on the target's executor // to take cpu load off the - ClassLoader loader = holder==null ? getClass().getClassLoader() : holder.loader; + ClassLoader loader = holder == null ? getClass().getClassLoader() : holder.loader; methodData.invocationStrategy.service(methodData.serializationStrategy, loader, methodData.method, svc, bais, baos, new Runnable() { public void run() { - if(holder!=null) + if (holder != null) holder.factory.unget(); final Buffer command = baos.toBuffer(); diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransport.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransport.java index 0573a2a9..1db0c394 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransport.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransport.java @@ -150,7 +150,7 @@ public class TcpTransport implements Transport { public void connected(SocketChannel channel) throws IOException, Exception { this.channel = channel; - if( codec !=null ) { + if (codec != null) { initializeCodec(); } @@ -218,7 +218,6 @@ public class TcpTransport implements Transport { }); readSource.setCancelHandler(CANCEL_HANDLER); readSource.resume(); - } else if (socketState.isConnected()) { dispatchQueue.execute(new Runnable() { public void run() { @@ -234,7 +233,7 @@ public class TcpTransport implements Transport { System.err.println("cannot be started. socket state is: "+socketState); } } finally { - if( onCompleted!=null ) { + if (onCompleted != null) { onCompleted.run(); } } @@ -247,7 +246,7 @@ public class TcpTransport implements Transport { protected String resolveHostName(String host) throws UnknownHostException { try { - if(isUseLocalHost()) { + if (isUseLocalHost()) { String localName = InetAddress.getLocalHost().getHostName(); if (localName != null) { if (localName.equals(host)) { @@ -280,7 +279,7 @@ public class TcpTransport implements Transport { } }); - if( max_read_rate!=0 || max_write_rate!=0 ) { + if (max_read_rate != 0 || max_write_rate != 0) { rateLimitingChannel = new RateLimitingChannel(); scheduleRateAllowanceReset(); } @@ -290,7 +289,7 @@ public class TcpTransport implements Transport { } private void scheduleRateAllowanceReset() { - dispatchQueue.executeAfter(1, TimeUnit.SECONDS, new Runnable(){ + dispatchQueue.executeAfter(1, TimeUnit.SECONDS, new Runnable() { public void run() { if (!socketState.isConnected()) { return; @@ -302,12 +301,12 @@ public class TcpTransport implements Transport { } private void dispose() { - if( readSource != null ) { + if (readSource != null) { readSource.cancel(); readSource = null; } - if( writeSource != null ) { + if (writeSource != null) { writeSource.cancel(); writeSource = null; } @@ -334,11 +333,11 @@ public class TcpTransport implements Transport { } ProtocolCodec.BufferState rc = codec.write(command); - switch (rc ) { + switch (rc) { case FULL: return false; default: - if( drained ) { + if (drained) { drained = false; resumeWrite(); } @@ -359,8 +358,8 @@ public class TcpTransport implements Transport { return; } try { - if( codec.flush() == ProtocolCodec.BufferState.WAS_EMPTY && flush() ) { - if( !drained ) { + if (codec.flush() == ProtocolCodec.BufferState.WAS_EMPTY && flush()) { + if (!drained) { drained = true; suspendWrite(); listener.onRefill(this); @@ -383,9 +382,9 @@ public class TcpTransport implements Transport { long initial = codec.getReadCounter(); // Only process up to 64k worth of data at a time, so we can give // other connections a chance to process their requests. - while( codec.getReadCounter() - initial < 1024 * 64 ) { + while (codec.getReadCounter() - initial < 1024 * 64) { Object command = codec.read(); - if ( command!=null ) { + if (command != null) { try { listener.onTransportCommand(this, command); } catch (Throwable e) { @@ -410,14 +409,14 @@ public class TcpTransport implements Transport { } public void suspendRead() { - if( isConnected() && readSource!=null ) { + if (isConnected() && readSource != null) { readSource.suspend(); } } public void resumeRead() { - if( isConnected() && readSource!=null ) { - if( rateLimitingChannel!=null ) { + if (isConnected() && readSource != null) { + if (rateLimitingChannel != null) { rateLimitingChannel.resumeRead(); } else { _resumeRead(); @@ -426,7 +425,7 @@ public class TcpTransport implements Transport { } private void _resumeRead() { readSource.resume(); - dispatchQueue.execute(new Runnable(){ + dispatchQueue.execute(new Runnable() { public void run() { drainInbound(); } @@ -434,14 +433,14 @@ public class TcpTransport implements Transport { } protected void suspendWrite() { - if( isConnected() && writeSource!=null ) { + if (isConnected() && writeSource != null) { writeSource.suspend(); } } protected void resumeWrite() { - if( isConnected() && writeSource!=null ) { + if (isConnected() && writeSource != null) { writeSource.resume(); - dispatchQueue.execute(new Runnable(){ + dispatchQueue.execute(new Runnable() { public void run() { drainOutbound(); } @@ -463,7 +462,7 @@ public class TcpTransport implements Transport { public void setProtocolCodec(ProtocolCodec protocolCodec) { this.codec = protocolCodec; - if( channel!=null && codec!=null ) { + if (channel != null && codec != null) { initializeCodec(); } } @@ -494,9 +493,9 @@ public class TcpTransport implements Transport { } private void trace(String message) { - if( LOG.isTraceEnabled() ) { + if (LOG.isTraceEnabled()) { final String label = dispatchQueue.getLabel(); - if( label !=null ) { + if (label != null) { LOG.trace("{} | {}", label, message); } else { LOG.trace(message); @@ -509,7 +508,7 @@ public class TcpTransport implements Transport { } public ReadableByteChannel readChannel() { - if(rateLimitingChannel!=null) { + if (rateLimitingChannel != null) { return rateLimitingChannel; } else { return channel; @@ -517,7 +516,7 @@ public class TcpTransport implements Transport { } public WritableByteChannel writeChannel() { - if(rateLimitingChannel!=null) { + if (rateLimitingChannel != null) { return rateLimitingChannel; } else { return channel; @@ -549,17 +548,17 @@ public class TcpTransport implements Transport { boolean write_suspended = false; public void resetAllowance() { - if( read_allowance != max_read_rate || write_allowance != max_write_rate) { + if (read_allowance != max_read_rate || write_allowance != max_write_rate) { read_allowance = max_read_rate; write_allowance = max_write_rate; - if( write_suspended ) { + if (write_suspended) { write_suspended = false; resumeWrite(); } - if( read_suspended ) { + if (read_suspended) { read_suspended = false; resumeRead(); - for( int i = 0; i < read_resume_counter ; i++ ) { + for (int i = 0; i < read_resume_counter ; i++) { resumeRead(); } } @@ -567,16 +566,16 @@ public class TcpTransport implements Transport { } public int read(ByteBuffer dst) throws IOException { - if( max_read_rate==0 ) { + if (max_read_rate == 0) { return channel.read(dst); } else { int remaining = dst.remaining(); - if( read_allowance ==0 || remaining ==0 ) { + if (read_allowance == 0 || remaining == 0) { return 0; } int reduction = 0; - if( remaining > read_allowance) { + if (remaining > read_allowance) { reduction = remaining - read_allowance; dst.limit(dst.limit() - reduction); } @@ -585,8 +584,8 @@ public class TcpTransport implements Transport { rc = channel.read(dst); read_allowance -= rc; } finally { - if( reduction!=0 ) { - if( dst.remaining() == 0 ) { + if (reduction != 0) { + if (dst.remaining() == 0) { // we need to suspend the read now until we get // a new allowance... readSource.suspend(); @@ -600,16 +599,16 @@ public class TcpTransport implements Transport { } public int write(ByteBuffer src) throws IOException { - if( max_write_rate==0 ) { + if (max_write_rate == 0) { return channel.write(src); } else { int remaining = src.remaining(); - if( write_allowance ==0 || remaining ==0 ) { + if (write_allowance == 0 || remaining == 0) { return 0; } int reduction = 0; - if( remaining > write_allowance) { + if (remaining > write_allowance) { reduction = remaining - write_allowance; src.limit(src.limit() - reduction); } @@ -618,8 +617,8 @@ public class TcpTransport implements Transport { rc = channel.write(src); write_allowance -= rc; } finally { - if( reduction!=0 ) { - if( src.remaining() == 0 ) { + if (reduction != 0) { + if (src.remaining() == 0) { // we need to suspend the read now until we get // a new allowance... write_suspended = true; @@ -641,13 +640,12 @@ public class TcpTransport implements Transport { } public void resumeRead() { - if( read_suspended ) { + if (read_suspended) { read_resume_counter += 1; } else { _resumeRead(); } } - } // @@ -723,7 +721,7 @@ public class TcpTransport implements Transport { void onCanceled() { } boolean is(Class<? extends SocketState> clazz) { - return getClass()==clazz; + return getClass() == clazz; } boolean isConnecting() { return is(CONNECTING.class); @@ -767,7 +765,7 @@ public class TcpTransport implements Transport { state.onCanceled(); } Runnable createDisconnectTask() { - return new Runnable(){ + return new Runnable() { public void run() { listener.onTransportDisconnected(TcpTransport.this); } @@ -781,11 +779,11 @@ public class TcpTransport implements Transport { private boolean dispose; public CANCELING() { - if( readSource!=null ) { + if (readSource != null) { remaining++; readSource.cancel(); } - if( writeSource!=null ) { + if (writeSource != null) { remaining++; writeSource.cancel(); } @@ -796,14 +794,14 @@ public class TcpTransport implements Transport { dispose = true; } void add(Runnable onCompleted) { - if( onCompleted!=null ) { + if (onCompleted != null) { runnables.add(onCompleted); } } void onCanceled() { trace("CANCELING.onCanceled"); remaining--; - if( remaining!=0 ) { + if (remaining != 0) { return; } try { @@ -829,7 +827,7 @@ public class TcpTransport implements Transport { void onStop(Runnable onCompleted) { trace("CANCELED.onStop"); - if( !disposed ) { + if (!disposed) { disposed = true; dispose(); } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportFactory.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportFactory.java index 6a7b59d7..edfd0a06 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportFactory.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportFactory.java @@ -79,7 +79,7 @@ public class TcpTransportFactory { * TcpTransportServer. */ protected TcpTransportServer createTcpTransportServer(final URI location) throws IOException, URISyntaxException, Exception { - if( !location.getScheme().equals("tcp") ) { + if (!location.getScheme().equals("tcp")) { return null; } return new TcpTransportServer(location); @@ -112,7 +112,7 @@ public class TcpTransportFactory { protected String getOption(Map options, String key, String def) { String rc = (String) options.remove(key); - if( rc == null ) { + if (rc == null) { rc = def; } return rc; diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportServer.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportServer.java index a95adad1..a1475c5c 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportServer.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TcpTransportServer.java @@ -104,7 +104,7 @@ public class TcpTransportServer implements TransportServer { public void run() { try { SocketChannel client = channel.accept(); - while( client!=null ) { + while (client != null) { handleSocket(client); client = channel.accept(); } @@ -122,7 +122,7 @@ public class TcpTransportServer implements TransportServer { } }); acceptSource.resume(); - if( onCompleted!=null ) { + if (onCompleted != null) { dispatchQueue.execute(onCompleted); } } @@ -162,7 +162,7 @@ public class TcpTransportServer implements TransportServer { stop(null); } public void stop(final Runnable onCompleted) { - if( acceptSource.isCanceled() ) { + if (acceptSource.isCanceled()) { onCompleted.run(); } else { acceptSource.setCancelHandler(new Runnable() { @@ -171,7 +171,7 @@ public class TcpTransportServer implements TransportServer { channel.close(); } catch (IOException e) { } - if( onCompleted!=null ) { + if (onCompleted != null) { onCompleted.run(); } } @@ -206,7 +206,7 @@ public class TcpTransportServer implements TransportServer { protected TcpTransport createTransport(SocketChannel socketChannel, HashMap<String, Object> options) throws Exception { TcpTransport transport = createTransport(); transport.connected(socketChannel); - if( options!=null ) { + if (options != null) { IntrospectionSupport.setProperties(transport, options); } if (transportOptions != null) { diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TransportPool.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TransportPool.java index 7a270185..3453225e 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TransportPool.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/TransportPool.java @@ -89,7 +89,7 @@ public abstract class TransportPool implements Service { Transport transport = getIdleTransport(); if (transport != null) { doOffer(transport, data, id); - if( transport.full() ) { + if (transport.full()) { transports.get(transport).time = 0L; } } else { @@ -208,7 +208,7 @@ public abstract class TransportPool implements Service { assert accepted: "Should have been accepted since the transport was not full"; } - if( transport.full() ) { + if (transport.full()) { transports.get(transport).time = 0L; } else { final long time = System.currentTimeMillis(); @@ -225,7 +225,6 @@ public abstract class TransportPool implements Service { }); } } - } public void onTransportFailure(Transport transport, IOException error) { diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/ClassLoaderObjectInputStream.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/ClassLoaderObjectInputStream.java index 17474175..e571916f 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/ClassLoaderObjectInputStream.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/ClassLoaderObjectInputStream.java @@ -74,7 +74,7 @@ public class ClassLoaderObjectInputStream extends ObjectInputStream { try{ //try to load it with our own classloader (could be e.g. a service exception) return Class.forName(className, false, getClass().getClassLoader()); - } catch(ClassNotFoundException e2) { + } catch (ClassNotFoundException e2) { //ignore } throw e; diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/IntrospectionSupport.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/IntrospectionSupport.java index 087ae386..a0f98698 100755 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/IntrospectionSupport.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/IntrospectionSupport.java @@ -79,10 +79,8 @@ public final class IntrospectionSupport { } props.put(optionPrefix + name, strValue); rc = true; - } catch (Throwable ignore) { } - } } @@ -185,8 +183,8 @@ public final class IntrospectionSupport { } private static Object convert(Object value, Class<?> type) { - if( type.isArray() ) { - if( value.getClass().isArray() ) { + if (type.isArray()) { + if (value.getClass().isArray()) { int length = Array.getLength(value); Class<?> componentType = type.getComponentType(); Object rc = Array.newInstance(componentType, length); @@ -252,13 +250,13 @@ public final class IntrospectionSupport { LinkedHashMap<String, Object> map = new LinkedHashMap<>(); addFields(target, target.getClass(), stopClass, map); if (overrideFields != null) { - for(String key : overrideFields.keySet()) { + for (String key : overrideFields.keySet()) { Object value = overrideFields.get(key); map.put(key, value); } } - if( fields!=null ) { + if (fields != null) { map.keySet().retainAll(Arrays.asList(fields)); } @@ -267,9 +265,9 @@ public final class IntrospectionSupport { for (Entry<String, Object> entry : map.entrySet()) { String key = entry.getKey(); String value = null; - if( entry.getValue() !=null ) { + if (entry.getValue() != null) { value = entry.getValue().toString(); - if( value!=null && ( value.indexOf('\n')>=0 || (key.length()+value.length())>70 ) ) { + if (value != null && (value.indexOf('\n') >= 0 || (key.length() + value.length()) > 70)) { useMultiLine = true; } } @@ -277,7 +275,7 @@ public final class IntrospectionSupport { } StringBuilder buffer = new StringBuilder(); - if( useMultiLine) { + if (useMultiLine) { buffer.append("{\n"); boolean first = true; for (Entry<String, String> entry : props.entrySet()) { @@ -350,7 +348,6 @@ public final class IntrospectionSupport { e.printStackTrace(); } } - } } diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/StringSupport.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/StringSupport.java index efea3823..47abc4e0 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/StringSupport.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/StringSupport.java @@ -27,7 +27,7 @@ import java.util.Arrays; public class StringSupport { public static String indent(String value, int spaces) { - if( value == null ) { + if (value == null) { return null; } String indent = fillString(spaces, ' '); diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/URISupport.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/URISupport.java index 97ceffe1..aadb8553 100755 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/URISupport.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/util/URISupport.java @@ -185,7 +185,6 @@ public class URISupport { p = ssp.lastIndexOf(")"); componentString = ssp.substring(initialParen + 1, p); params = ssp.substring(p + 1).trim(); - } else { componentString = ssp; params = ""; diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java index db9eae12..2235aefa 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java @@ -57,7 +57,7 @@ public class InvocationTest { final int BENCHMARK_CLIENTS = 100; final int BENCHMARK_INVOCATIONS_PER_CLIENT = 1000; - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testInvoke() throws Exception { DispatchQueue queue = Dispatch.createQueue(); @@ -103,7 +103,6 @@ public class InvocationTest { AsyncCallbackFuture<StringValue.Getter> future2 = new AsyncCallbackFuture<>(); hello.protobuf(stringValue("Hiram Async"), future2); assertEquals("Hello Hiram Async!", future2.get(2, TimeUnit.SECONDS).getValue()); - } finally { server.stop(); @@ -115,7 +114,7 @@ public class InvocationTest { * tests that requests to an unknown ID throw an exception instead of deadlocking the request * @throws Exception */ - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testInvokeInvalidServiceID() throws Exception { DispatchQueue queue = Dispatch.createQueue(); @@ -196,7 +195,7 @@ public class InvocationTest { } } - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testOverflowAsync() throws Exception { DispatchQueue queue = Dispatch.createQueue(); @@ -225,13 +224,13 @@ public class InvocationTest { String payload = new String(chars); final List<AsyncCallbackFuture<String>> futures = new ArrayList<>(); - for(int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) { AsyncCallbackFuture<String> future = new AsyncCallbackFuture<>(); hello.hello(payload, future); futures.add(future); } - for(Future<String> f : futures) { + for (Future<String> f : futures) { f.get(3, TimeUnit.SECONDS); } //future2.get(2, TimeUnit.SECONDS); @@ -245,7 +244,7 @@ public class InvocationTest { } } - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testOverflow() throws Exception { DispatchQueue queue = Dispatch.createQueue(); @@ -305,7 +304,6 @@ public class InvocationTest { assertEquals(BENCHMARK_CLIENTS, requests.get()); assertEquals(BENCHMARK_CLIENTS, responses.get()); assertEquals(0, failures.get()); - } finally { server.stop(); @@ -313,7 +311,7 @@ public class InvocationTest { } } - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testNoOverflow() throws Exception { DispatchQueue queue = Dispatch.createQueue(); @@ -341,7 +339,7 @@ public class InvocationTest { char[] chars = new char[65 * 1024]; String payload = new String(chars); - for(int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) { hello.hello(payload); } } @@ -351,7 +349,7 @@ public class InvocationTest { } } - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testUnderLoadSyncObject() throws Exception { HashMap<String, SerializationStrategy> map = new HashMap<>(); @@ -474,7 +472,7 @@ public class InvocationTest { } private void sendNext() { - if( i < nbInvocationsPerThread ) { + if (i < nbInvocationsPerThread) { requests.incrementAndGet(); start = System.nanoTime(); hello.protobuf(msg, this); @@ -501,7 +499,7 @@ public class InvocationTest { } } - @Test(timeout=30*1000) + @Test(timeout = 30 * 1000) public void testUnderLoadAsyncProto() throws Exception { HashMap<String, SerializationStrategy> map = new HashMap<>(); map.put("protobuf", new ProtobufSerializationStrategy()); @@ -600,7 +598,7 @@ public class InvocationTest { } private void queueCheck() { - if( !queue.isExecuting() ) { + if (!queue.isExecuting()) { throw new IllegalStateException("Not executing on our dispatch queue"); } } diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java index 836bb33a..958b68ac 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java @@ -88,29 +88,26 @@ public class StreamInvocationTest { @Test public void testToString() throws IOException { assertEquals("Test", testService.toString(new ByteArrayInputStream("Test".getBytes()))); - } - @Test(timeout=5000) + @Test(timeout = 5000) public void testToStringLarge() throws IOException { InputStream in = fillStream('a', 1000000); long time = System.currentTimeMillis(); String result = testService.toString(in); //roughly 1 MB of data System.out.println("Transferred 1MB of data in "+(System.currentTimeMillis()-time)+"ms"); assertEquals(1000000, result.length()); - for(int i = 0; i < result.length(); i++) { + for (int i = 0; i < result.length(); i++) { assertEquals('a', result.charAt(i)); } - } @Test public void testToStream() throws IOException { assertEquals("Test", new BufferedReader(new InputStreamReader(testService.toStream("Test"))).readLine()); - } - @Test(timeout=5000) + @Test(timeout = 5000) public void testToStreamLarge() throws IOException { String string = fillBuffer('a', 1000000); long time = System.currentTimeMillis(); @@ -119,10 +116,9 @@ public class StreamInvocationTest { String result = reader.readLine(); System.out.println("Transferred 1MB of data in "+(System.currentTimeMillis()-time)+"ms"); assertEquals(1000000, result.length()); - for(int i = 0;i < result.length(); i++) { + for (int i = 0;i < result.length(); i++) { assertEquals('a', result.charAt(i)); } - } @Test @@ -131,7 +127,6 @@ public class StreamInvocationTest { testService.intoStream(result, "Test"); Thread.sleep(100); assertEquals("Test", result.toString()); - } @Test @@ -141,7 +136,6 @@ public class StreamInvocationTest { byte[] digest = digester.digest(testString.getBytes()); Future<byte[]> future = testService.digest(new ByteArrayInputStream(testString.getBytes())); assertArrayEquals(digest, future.get()); - } public interface TestService { @@ -175,7 +169,7 @@ public class StreamInvocationTest { try{ out.write(string.getBytes()); out.close(); - } catch(Exception e) { + } catch (Exception e) { e.printStackTrace(); } }).start(); @@ -188,7 +182,7 @@ public class StreamInvocationTest { MessageDigest digest = MessageDigest.getInstance("MD5"); ByteArrayOutputStream out = new ByteArrayOutputStream(); int i; - while((i = in.read()) != -1) { + while ((i = in.read()) != -1) { out.write(i); } return digest.digest(out.toByteArray()); @@ -203,7 +197,7 @@ public class StreamInvocationTest { protected InputStream fillStream(char c, int repetitions) { ByteArrayOutputStream out = new ByteArrayOutputStream(); - for (int i = 0; i < repetitions; i++){ + for (int i = 0; i < repetitions; i++) { out.write(c); } return new ByteArrayInputStream(out.toByteArray()); diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java index c33f2854..cdef07de 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java @@ -84,7 +84,6 @@ public class TransportFailureTest { assertTrue(t1 - t0 > SLEEP_TIME / 2); assertTrue(t1 - t0 < MAX_DELAY / 2); } - } finally { server.stop(); diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java index a3a6bdf9..577dc962 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java @@ -42,7 +42,7 @@ public class InputStreamProxyTest { public void testUnsignedBytes() throws IOException { int length = 1024; ByteArrayOutputStream out = new ByteArrayOutputStream(length); - for(int i = 0; i < length; i++) { + for (int i = 0; i < length; i++) { out.write((byte)i); } byte[] data = out.toByteArray(); @@ -134,7 +134,7 @@ public class InputStreamProxyTest { try{ streamProvider.read(id); fail("must have been closed already"); - } catch(IOException e) {} + } catch (IOException e) {} } private OwnInputStream fillStream(char c, int repetitions) { diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodecTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodecTest.java index 7a5ceddb..2f18cc58 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodecTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/tcp/LengthPrefixedCodecTest.java @@ -140,7 +140,7 @@ public class LengthPrefixedCodecTest { assertEquals(bytesThatWillBeWritten, codec.getWriteCounter()); } - @Test(expected=ProtocolException.class) + @Test(expected = ProtocolException.class) public void testReadEvilPackage() throws Exception { expect(readableByteChannel.read(EasyMock.anyObject())).andAnswer(new IAnswer<Integer>() { @@ -162,7 +162,7 @@ public class LengthPrefixedCodecTest { @Override public Integer answer() throws Throwable { final ByteBuffer buffer = (ByteBuffer) getCurrentArguments()[0]; - if(buffer.remaining() < length) + if (buffer.remaining() < length) throw new BufferUnderflowException(); buffer.position(buffer.position() + length); return length; diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java index 4f305c70..9f6e9fca 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java @@ -59,7 +59,7 @@ import javax.net.ssl.SSLContext; * method invocations to the service instance.) */ @SuppressWarnings("rawtypes") -@RSADistributionProvider(configs=TcpProvider.TCP_CONFIG_TYPE) +@RSADistributionProvider(configs = TcpProvider.TCP_CONFIG_TYPE) @Component(property = { // RemoteConstants.REMOTE_INTENTS_SUPPORTED + "=osgi.basic", RemoteConstants.REMOTE_INTENTS_SUPPORTED + "=osgi.async", @@ -69,7 +69,7 @@ import javax.net.ssl.SSLContext; public class TcpProvider implements DistributionProvider { public static final String DISTRIBUTION_PROVIDER_TCP_PID = "org.apache.aries.rsa.provider.tcp"; public static final String TCP_CONFIG_TYPE = "aries.tcp"; - private static final String[] SUPPORTED_INTENTS = { "osgi.basic", "osgi.async"}; + private static final String[] SUPPORTED_INTENTS = { "osgi.basic", "osgi.async" }; private static final Logger LOG = LoggerFactory.getLogger(TcpProvider.class); diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/ser/BasicObjectOutputStream.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/ser/BasicObjectOutputStream.java index dc614758..eff82905 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/ser/BasicObjectOutputStream.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/ser/BasicObjectOutputStream.java @@ -45,7 +45,7 @@ public class BasicObjectOutputStream extends ObjectOutputStream { return obj; } else if (obj instanceof Version) { return new VersionMarker((Version) obj); - } else if (DTOUtil.isDTOType(obj.getClass())){ + } else if (DTOUtil.isDTOType(obj.getClass())) { return new DTOMarker(obj); } else { return obj; diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/ConfigTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/ConfigTest.java index f0d07a3f..3373f70d 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/ConfigTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/ConfigTest.java @@ -71,7 +71,7 @@ public class ConfigTest { Assert.assertEquals(11111, getParser().getPort()); } - @Test(expected=IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void testTimeoutInvalid() { props.put(Config.TIMEOUT, new Date()); getParser().getTimeoutMillis(); diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index 2ceb8955..a12acd85 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -143,7 +143,7 @@ public class TcpProviderTest { assertEquals("test", myServiceProxy.echo("test")); } - @Test(expected=ExpectedTestException.class) + @Test(expected = ExpectedTestException.class) public void testCallException() { myServiceProxy.callException(); } diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java index 41ba08c5..489e8f14 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java @@ -58,7 +58,6 @@ public class MyServiceImpl implements MyService { @Override public void callWithList(List<String> msg) { - } @Override @@ -71,7 +70,6 @@ public class MyServiceImpl implements MyService { sleep(delay); return "Finished"; } - }); } @@ -85,7 +83,6 @@ public class MyServiceImpl implements MyService { sleep(delay); return "Finished"; } - }); } diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/ExportRegistrationImpl.java b/rsa/src/main/java/org/apache/aries/rsa/core/ExportRegistrationImpl.java index e7240aaf..3c401e4e 100644 --- a/rsa/src/main/java/org/apache/aries/rsa/core/ExportRegistrationImpl.java +++ b/rsa/src/main/java/org/apache/aries/rsa/core/ExportRegistrationImpl.java @@ -104,7 +104,6 @@ public class ExportRegistrationImpl implements ExportRegistration, ExportReferen private AtomicBoolean closing = new AtomicBoolean(); private volatile boolean closed; - /** * Constructs an export registration that is linked * (shares state) with the given export registration. diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java b/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java index dec33487..164d1145 100644 --- a/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java +++ b/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java @@ -75,7 +75,7 @@ public class EventAdminSender { Event event = new Event(topic, props); notifyEventAdmins(type, event); } - + private static String getTypeName(int type) { switch (type) { case RemoteServiceAdminEvent.EXPORT_ERROR: return "EXPORT_ERROR"; diff --git a/spi/src/main/java/org/apache/aries/rsa/annotations/RSADiscoveryProvider.java b/spi/src/main/java/org/apache/aries/rsa/annotations/RSADiscoveryProvider.java index e2b92700..a8cf8f43 100644 --- a/spi/src/main/java/org/apache/aries/rsa/annotations/RSADiscoveryProvider.java +++ b/spi/src/main/java/org/apache/aries/rsa/annotations/RSADiscoveryProvider.java @@ -2,8 +2,8 @@ package org.apache.aries.rsa.annotations; import org.osgi.annotation.bundle.Attribute; [email protected]( - namespace = "osgi.remoteserviceadmin.discovery", [email protected]( + namespace = "osgi.remoteserviceadmin.discovery", version = "1.1.0" ) public @interface RSADiscoveryProvider { diff --git a/spi/src/main/java/org/apache/aries/rsa/annotations/RSADistributionProvider.java b/spi/src/main/java/org/apache/aries/rsa/annotations/RSADistributionProvider.java index a3b08536..cb732afe 100644 --- a/spi/src/main/java/org/apache/aries/rsa/annotations/RSADistributionProvider.java +++ b/spi/src/main/java/org/apache/aries/rsa/annotations/RSADistributionProvider.java @@ -2,8 +2,8 @@ package org.apache.aries.rsa.annotations; import org.osgi.annotation.bundle.Attribute; [email protected]( - namespace = "osgi.remoteserviceadmin.distribution", [email protected]( + namespace = "osgi.remoteserviceadmin.distribution", version = "1.1.0" ) public @interface RSADistributionProvider { diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index 07cd0e8e..fa7c1d78 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -203,7 +203,7 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi } } } - + private void unimportRegistration(ImportRegistration reg) { importedServices.remove(reg); reg.close();
