Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787638
--- Diff:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
---
@@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener
readyListener) {
@Override
public void onRemoteOpen(Connection connection) throws Exception {
- lock();
+ handler.requireHandler();
try {
- try {
- initInternal();
- } catch (Exception e) {
- log.error("Error init connection", e);
- }
- if (!validateConnection(connection)) {
- connection.close();
- } else {
- connection.setContext(AMQPConnectionContext.this);
- connection.setContainer(containerId);
- connection.setProperties(connectionProperties);
-
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
- connection.open();
- }
- } finally {
- unlock();
+ initInternal();
+ } catch (Exception e) {
+ log.error("Error init connection", e);
+ }
+ if (!validateConnection(connection)) {
+ connection.close();
+ } else {
+ connection.setContext(AMQPConnectionContext.this);
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.open();
}
initialise();
- /*
- * This can be null which is in effect an empty map, also we
really don't need to check this for in bound connections
- * but its here in case we add support for outbound connections.
- * */
+ /*
+ * This can be null which is in effect an empty map, also we really
don't need to check this for in bound connections
+ * but its here in case we add support for outbound connections.
+ * */
if (connection.getRemoteProperties() == null ||
!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true);
if (nextKeepAliveTime != 0 && scheduledPool != null) {
- scheduledPool.schedule(new Runnable() {
- @Override
- public void run() {
- Long rescheduleAt = handler.tick(false);
- if (rescheduleAt == null) {
- // this mean tick could not acquire a lock, we will
just retry in 10 milliseconds.
- scheduledPool.schedule(this, 10,
TimeUnit.MILLISECONDS);
- } else if (rescheduleAt != 0) {
- scheduledPool.schedule(this, rescheduleAt -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
- }
- }
- }, (nextKeepAliveTime -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+ scheduledPool.schedule(new ScheduleRunnable(),
(nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())),
TimeUnit.MILLISECONDS);
}
}
}
+ class TickerRunnable implements Runnable {
+
+ final ScheduleRunnable scheduleRunnable;
+
+ TickerRunnable(ScheduleRunnable scheduleRunnable) {
+ this.scheduleRunnable = scheduleRunnable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Long rescheduleAt = handler.tick(false);
+ if (rescheduleAt == null) {
+ // this mean tick could not acquire a lock, we will just
retry in 10 milliseconds.
+ scheduledPool.schedule(scheduleRunnable, 10,
TimeUnit.MILLISECONDS);
+ } else if (rescheduleAt != 0) {
+ scheduledPool.schedule(scheduleRunnable, rescheduleAt -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
+ }
+ } catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ }
--- End diff --
This should have a static logger method with a code
---