Author: frm Date: Thu Oct 26 13:39:55 2017 New Revision: 1813400 URL: http://svn.apache.org/viewvc?rev=1813400&view=rev Log: OAK-5521 - Make CommunicationObserver and CommunicationPartnerMBean thread safe
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java?rev=1813400&r1=1813399&r2=1813400&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java Thu Oct 26 13:39:55 2017 @@ -20,115 +20,170 @@ package org.apache.jackrabbit.oak.segment.standby.store; import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Consumer; import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.management.StandardMBean; +import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CommunicationObserver { - static final int MAX_CLIENT_STATISTICS = 10; + private static final int DEFAULT_MAX_CLIENT_MBEANS = 10; private static final Logger log = LoggerFactory.getLogger(CommunicationObserver.class); - private final Map<String, CommunicationPartnerMBean> partnerDetails = new HashMap<>(); + private static ObjectName getMBeanName(CommunicationPartnerMBean bean) throws MalformedObjectNameException { + return new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + bean.getName() + "\""); + } + + private static String oldest(Map<String, CommunicationPartnerMBean> beans) { + CommunicationPartnerMBean oldest = null; + + for (CommunicationPartnerMBean bean : beans.values()) { + if (oldest == null || oldest.getLastSeen().after(bean.getLastSeen())) { + oldest = bean; + } + } + + if (oldest == null) { + throw new IllegalArgumentException("no clients available"); + } + + return oldest.getName(); + } + + private final Map<String, CommunicationPartnerMBean> beans = new HashMap<>(); + + private final int maxClientMBeans; private final String id; public CommunicationObserver(String id) { - this.id = id; + this(id, DEFAULT_MAX_CLIENT_MBEANS); } - void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws Exception { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(m.getMBeanName()); + CommunicationObserver(String id, int maxClientMBeans) { + this.id = id; + this.maxClientMBeans = maxClientMBeans; } void registerCommunicationPartner(CommunicationPartnerMBean m) throws Exception { - ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName()); + ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), getMBeanName(m)); } - private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean m) { + private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m) { try { - unregisterCommunicationPartner(m); + registerCommunicationPartner(m); } catch (Exception e) { - log.error(String.format("Unable to unregister MBean for client %s", m.getName()), e); + log.error(String.format("Unable to register MBean for client %s", m.getName()), e); } } - private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m) { + void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws Exception { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(getMBeanName(m)); + } + + private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean m) { try { - registerCommunicationPartner(m); + unregisterCommunicationPartner(m); } catch (Exception e) { - log.error(String.format("Unable to register MBean for client %s", m.getName()), e); + log.error(String.format("Unable to unregister MBean for client %s", m.getName()), e); } } public void unregister() { - for (CommunicationPartnerMBean m : partnerDetails.values()) { - safeUnregisterCommunicationPartner(m); + Collection<CommunicationPartnerMBean> unregister; + + synchronized (beans) { + unregister = new ArrayList<>(beans.values()); + beans.clear(); + } + + for (CommunicationPartnerMBean bean : unregister) { + safeUnregisterCommunicationPartner(bean); } } public void gotMessageFrom(String client, String request, String address, int port) throws MalformedObjectNameException { log.debug("Message '{}' received from client {}", request, client); - CommunicationPartnerMBean m = partnerDetails.get(client); - boolean register = false; - if (m == null) { - cleanUp(); - m = new CommunicationPartnerMBean(client); - m.setRemoteAddress(address); - m.setRemotePort(port); - register = true; - } - m.setLastSeen(new Date()); - m.setLastRequest(request); - partnerDetails.put(client, m); - if (register) { - safeRegisterCommunicationPartner(m); - } + createOrUpdateClientMBean(client, address, port, m -> m.onMessageReceived(new Date(), request)); } public void didSendSegmentBytes(String client, int size) { log.debug("Segment with size {} sent to client {}", size, client); - CommunicationPartnerMBean m = partnerDetails.get(client); - m.onSegmentSent(size); - partnerDetails.put(client, m); + updateClientMBean(client, m -> m.onSegmentSent(size)); } public void didSendBinariesBytes(String client, long size) { log.debug("Binary with size {} sent to client {}", size, client); - CommunicationPartnerMBean m = partnerDetails.get(client); - m.onBinarySent(size); - partnerDetails.put(client, m); + updateClientMBean(client, m -> m.onBinarySent(size)); } public String getID() { return id; } - private void cleanUp() { - while (partnerDetails.size() >= MAX_CLIENT_STATISTICS) { - CommunicationPartnerMBean oldestEntry = oldestEntry(); - log.info("Housekeeping: Removing statistics for client " + oldestEntry.getName()); - safeUnregisterCommunicationPartner(oldestEntry); - partnerDetails.remove(oldestEntry.getName()); + private void createOrUpdateClientMBean(String clientName, String remoteAddress, int remotePort, Consumer<CommunicationPartnerMBean> update) throws MalformedObjectNameException { + List<CommunicationPartnerMBean> unregister = null; + boolean register = false; + CommunicationPartnerMBean bean; + + synchronized (beans) { + bean = beans.get(clientName); + + if (bean == null) { + bean = new CommunicationPartnerMBean(clientName, remoteAddress, remotePort); + + while (beans.size() >= maxClientMBeans) { + if (unregister == null) { + unregister = new ArrayList<>(); + } + unregister.add(beans.remove(oldest(beans))); + } + + beans.put(clientName, bean); + + register = true; + } } - } - private CommunicationPartnerMBean oldestEntry() { - CommunicationPartnerMBean ret = null; - for (CommunicationPartnerMBean m : partnerDetails.values()) { - if (ret == null || ret.getLastSeen().after(m.getLastSeen())) { - ret = m; + update.accept(bean); + + if (register) { + safeRegisterCommunicationPartner(bean); + } + + if (unregister != null) { + for (CommunicationPartnerMBean c : unregister) { + safeUnregisterCommunicationPartner(c); } } - return ret; + } + + private void updateClientMBean(String id, Consumer<CommunicationPartnerMBean> update) { + CommunicationPartnerMBean c; + + synchronized (beans) { + c = beans.get(id); + } + + if (c == null) { + throw new IllegalStateException("no client found with id " + id); + } + + update.accept(c); } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java?rev=1813400&r1=1813399&r2=1813400&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java Thu Oct 26 13:39:55 2017 @@ -18,45 +18,37 @@ package org.apache.jackrabbit.oak.segment.standby.store; import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean; -import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; class CommunicationPartnerMBean implements ObservablePartnerMBean { - private final ObjectName mbeanName; - private final String clientName; - private String lastRequest; - - private Date lastSeen; + private final String remoteAddress; - private String lastSeenTimestamp; + private final int remotePort; - private String remoteAddress; + private final AtomicLong segmentsSent = new AtomicLong(); - private int remotePort; + private final AtomicLong segmentBytesSent = new AtomicLong(); - private long segmentsSent; + private final AtomicLong binariesSent = new AtomicLong(); - private long segmentBytesSent; + private final AtomicLong binariesBytesSent = new AtomicLong(); - private long binariesSent; + private volatile String lastRequest; - private long binariesBytesSent; + private volatile Date lastSeen; - CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException { + CommunicationPartnerMBean(String clientName, String remoteAddress, int remotePort) throws MalformedObjectNameException { this.clientName = clientName; - this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\""); - } - - ObjectName getMBeanName() { - return this.mbeanName; + this.remoteAddress = remoteAddress; + this.remotePort = remotePort; } @Nonnull @@ -82,58 +74,48 @@ class CommunicationPartnerMBean implemen @Override public String getLastSeenTimestamp() { - return this.lastSeenTimestamp; + return this.lastSeen.toString(); } @Override public long getTransferredSegments() { - return this.segmentsSent; + return this.segmentsSent.get(); } @Override public long getTransferredSegmentBytes() { - return this.segmentBytesSent; + return this.segmentBytesSent.get(); } @Override public long getTransferredBinaries() { - return this.binariesSent; + return this.binariesSent.get(); } @Override public long getTransferredBinariesBytes() { - return this.binariesBytesSent; - } - - void setRemoteAddress(String remoteAddress) { - this.remoteAddress = remoteAddress; - } - - void setRemotePort(int remotePort) { - this.remotePort = remotePort; - } - - Date getLastSeen() { - return lastSeen; + return this.binariesBytesSent.get(); } - void setLastSeen(Date lastSeen) { - this.lastSeen = lastSeen; - this.lastSeenTimestamp = lastSeen.toString(); - } - - void setLastRequest(String lastRequest) { - this.lastRequest = lastRequest; + void onMessageReceived(Date lastSeen, String lastRequest) { + synchronized (this) { + this.lastSeen = lastSeen; + this.lastRequest = lastRequest; + } } void onSegmentSent(long bytes) { - segmentsSent++; - segmentBytesSent += bytes; + this.segmentsSent.incrementAndGet(); + this.segmentBytesSent.addAndGet(bytes); } void onBinarySent(long bytes) { - binariesSent++; - binariesBytesSent += bytes; + this.binariesSent.incrementAndGet(); + this.binariesBytesSent.addAndGet(bytes); + } + + Date getLastSeen() { + return this.lastSeen; } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java?rev=1813400&r1=1813399&r2=1813400&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java Thu Oct 26 13:39:55 2017 @@ -36,8 +36,8 @@ public class CommunicationObserverTest { private final List<CommunicationPartnerMBean> communicationPartners = new ArrayList<>(); - TestCommunicationObserver(String id) { - super(id); + TestCommunicationObserver(String id, int maxClientMBeans) { + super(id, maxClientMBeans); } @Override @@ -58,7 +58,7 @@ public class CommunicationObserverTest { @Before public void before() { - observer = new TestCommunicationObserver("test"); + observer = new TestCommunicationObserver("test", 10); } @After @@ -80,10 +80,10 @@ public class CommunicationObserverTest { @Test public void shouldNotKeepManyObservablePartnerMBeans() throws Exception { - for (int i = 0; i < CommunicationObserver.MAX_CLIENT_STATISTICS * 2; i++) { + for (int i = 0; i < 20; i++) { observer.gotMessageFrom(randomUUID().toString(), "request", "127.0.0.1", 8080); } - assertEquals(CommunicationObserver.MAX_CLIENT_STATISTICS, observer.communicationPartners.size()); + assertEquals(10, observer.communicationPartners.size()); } @Test