Author: tommaso Date: Wed Aug 27 12:57:39 2014 New Revision: 1620884 URL: http://svn.apache.org/r1620884 Log: SLING-3872 - fixing configuration for event based reverse replication, changed remote trigger rule class name
Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java - copied, changed from r1620785, sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java Removed: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.rule.impl.RuleBasedReplicationTrigger-remote.json Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java Copied: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java (from r1620785, sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java) URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java?p2=sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java&p1=sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java&r1=1620785&r2=1620884&rev=1620884&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java (original) +++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java Wed Aug 27 12:57:39 2014 @@ -21,7 +21,6 @@ package org.apache.sling.replication.rul import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -43,37 +42,29 @@ import org.apache.http.nio.ContentDecode import org.apache.http.nio.IOControl; import org.apache.http.nio.protocol.BasicAsyncRequestProducer; import org.apache.http.nio.protocol.BasicAsyncResponseConsumer; -import org.apache.sling.commons.osgi.PropertiesUtil; import org.apache.sling.commons.scheduler.ScheduleOptions; import org.apache.sling.commons.scheduler.Scheduler; -import org.apache.sling.replication.agent.AgentReplicationException; -import org.apache.sling.replication.agent.ReplicationAgent; import org.apache.sling.replication.communication.ReplicationActionType; import org.apache.sling.replication.communication.ReplicationRequest; -import org.apache.sling.replication.resources.ReplicationConstants; import org.apache.sling.replication.rule.ReplicationRequestHandler; import org.apache.sling.replication.rule.ReplicationRule; -import org.apache.sling.replication.transport.ReplicationTransportHandler; -import org.apache.sling.replication.transport.impl.ReplicationTransportConstants; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * {@link org.apache.sling.replication.rule.ReplicationRule} to trigger replication upon reception of server sent events - * on a certain queue + * on a certain URL */ -@Component(immediate = true, label = "Rule for listening on Server Sent Events for Queues") +@Component(immediate = true, label = "Rule for listening on Server Sent Events on a certain URL") @Service(value = ReplicationRule.class) -public class ReplicateOnQueueEventRule implements ReplicationRule { +public class RemoteEventReplicationRule implements ReplicationRule { @Property(label = "Name", value = "remote", propertyPrivate = true) private static final String NAME = "name"; private final Logger log = LoggerFactory.getLogger(getClass()); - private static final String SIGNATURE = "remote trigger on {path} with user {user} and password {password}"; + private static final String SIGNATURE = "remote trigger on {host} with user {user} and password {password}"; private static final String SIGNATURE_REGEX = "remote\\strigger\\son\\s([^\\s]+)\\swith\\suser\\s([^\\s]+)\\sand\\spassword\\s([^\\s]+)"; @@ -82,13 +73,10 @@ public class ReplicateOnQueueEventRule i @Reference private Scheduler scheduler; - private BundleContext context; - private Map<String, Future<HttpResponse>> requests; @Activate - protected void activate(BundleContext context, Map<String, ?> config) { - this.context = context; + protected void activate() { this.requests = new ConcurrentHashMap<String, Future<HttpResponse>>(); } @@ -103,24 +91,21 @@ public class ReplicateOnQueueEventRule i public void apply(String handleId, ReplicationRequestHandler agent, String ruleString) { Matcher matcher = signaturePattern.matcher(ruleString); if (matcher.find()) { - String remotePath = matcher.group(1); + String remoteHost = matcher.group(1); String user = matcher.group(2); String password = matcher.group(3); - - try { - log.info("applying queue event replication rule"); + log.info("applying remote event replication rule"); ScheduleOptions options = scheduler.NOW(); options.name(handleId); - scheduler.schedule(new EventBasedReplication(handleId, agent, remotePath, user, password), options); + scheduler.schedule(new EventBasedReplication(handleId, agent, remoteHost, user, password), options); } catch (Exception e) { - log.error("{}", e); log.error("cannot apply rule {} to agent {}", ruleString, agent); + log.error("{}", e); } - } } @@ -136,24 +121,22 @@ public class ReplicateOnQueueEventRule i private final String handleId; private final ReplicationRequestHandler agent; - private SSEResponseConsumer(String handleId, ReplicationRequestHandler agent) { this.handleId = handleId; this.agent = agent; - } @Override protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException { - log.info("complete ? ", decoder.isCompleted()); + log.debug("complete {}", decoder.isCompleted()); ByteBuffer buffer = ByteBuffer.allocate(1024); decoder.read(buffer); - log.info("content {} received {},{}", new Object[]{buffer, decoder, ioctrl}); - log.info("event received"); + log.debug("content {} received {},{}", new Object[]{buffer, decoder, ioctrl}); - ReplicationRequest replicationRequest = new ReplicationRequest(System.currentTimeMillis(), ReplicationActionType.POLL, null); + // TODO : currently it always triggers poll request on /, should this be configurable? + ReplicationRequest replicationRequest = new ReplicationRequest(System.currentTimeMillis(), ReplicationActionType.POLL, "/"); agent.execute(replicationRequest); - log.info("replication request to agent {} sent ({} on {})", new Object[]{ + log.info("replication request to agent {} sent ({} {})", new Object[]{ handleId, replicationRequest.getAction(), replicationRequest.getPaths()}); @@ -168,8 +151,6 @@ public class ReplicateOnQueueEventRule i } } - - private class EventBasedReplication implements Runnable { private final String handleId; private final ReplicationRequestHandler agent; @@ -187,14 +168,11 @@ public class ReplicateOnQueueEventRule i public void run() { try { - - - log.info("getting event from {}", targetTransport + "?sec=600"); + log.debug("getting events from {}", targetTransport); URI eventEndpoint = URI.create(targetTransport); - - log.info("preparing request"); + log.debug("preparing request"); CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( new AuthScope(eventEndpoint.getHost(), eventEndpoint.getPort()), @@ -207,7 +185,7 @@ public class ReplicateOnQueueEventRule i HttpHost target = URIUtils.extractHost(get.getURI()); BasicAsyncRequestProducer basicAsyncRequestProducer = new BasicAsyncRequestProducer(target, get); httpClient.start(); - log.info("sending request"); + log.debug("sending request"); Future<HttpResponse> futureResponse = httpClient.execute( basicAsyncRequestProducer, new SSEResponseConsumer(handleId, agent), null); @@ -216,7 +194,7 @@ public class ReplicateOnQueueEventRule i } finally { httpClient.close(); } - log.info("request finished"); + log.debug("request finished"); } catch (Exception e) { log.error("cannot execute event based replication"); } Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java?rev=1620884&r1=1620883&r2=1620884&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java (original) +++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java Wed Aug 27 12:57:39 2014 @@ -18,33 +18,26 @@ */ package org.apache.sling.replication.servlet; -import org.apache.felix.scr.annotations.*; +import javax.servlet.Servlet; +import javax.servlet.ServletException; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.UUID; + +import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Properties; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Service; import org.apache.sling.api.SlingHttpServletRequest; import org.apache.sling.api.SlingHttpServletResponse; import org.apache.sling.api.servlets.SlingAllMethodsServlet; -import org.apache.sling.replication.agent.ReplicationAgent; import org.apache.sling.replication.communication.ReplicationRequest; -import org.apache.sling.replication.event.ReplicationEvent; -import org.apache.sling.replication.event.ReplicationEventType; -import org.apache.sling.replication.resources.ReplicationConstants; import org.apache.sling.replication.rule.ReplicationRequestHandler; import org.apache.sling.replication.rule.ReplicationTrigger; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; -import org.osgi.service.event.Event; -import org.osgi.service.event.EventConstants; -import org.osgi.service.event.EventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.Servlet; -import javax.servlet.ServletException; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - /** * Triggers Server Sent Events servlet */ @@ -57,29 +50,23 @@ import java.util.concurrent.ConcurrentHa }) public class ReplicationTriggerServlet extends SlingAllMethodsServlet { - private final int MAX_NUMBER_OF_SECONDS = 3600; - private final int DEFAULT_NUMBER_OF_SECONDS = 60; - private final String HANDLER_NAME = "ReplicationTriggerServlet"; - private final Logger log = LoggerFactory.getLogger(getClass()); + private final static int DEFAULT_NUMBER_OF_SECONDS = 60; + @Override protected void doGet(SlingHttpServletRequest request, SlingHttpServletResponse response) throws ServletException, IOException { - int seconds = DEFAULT_NUMBER_OF_SECONDS; + String secondsParameter = request.getParameter("sec"); - try{ - seconds = Integer.parseInt(request.getParameter("sec")); - - } catch(Throwable e){ - - } + int seconds = secondsParameter != null && secondsParameter.length() > 0 ? Integer.parseInt(secondsParameter) : + DEFAULT_NUMBER_OF_SECONDS; + int MAX_NUMBER_OF_SECONDS = 3600; if (seconds > MAX_NUMBER_OF_SECONDS) { seconds = MAX_NUMBER_OF_SECONDS; - } - else if (seconds < 0) { + } else if (seconds < 0) { seconds = DEFAULT_NUMBER_OF_SECONDS; } @@ -96,7 +83,7 @@ public class ReplicationTriggerServlet e final PrintWriter writer = response.getWriter(); - String handlerId = HANDLER_NAME + UUID.randomUUID().toString(); + String handlerId = "ReplicationTriggerServlet" + UUID.randomUUID().toString(); replicationTrigger.register(handlerId, new ReplicationRequestHandler() { public void execute(ReplicationRequest request) { @@ -123,10 +110,11 @@ public class ReplicationTriggerServlet e // write the actual data // this could be simple text or could be JSON-encoded text that the // client then decodes - writer.write("data: " + replicationRequest.getAction() + "\n\n"); + writer.write("data: " + replicationRequest.getAction() + " " + Arrays.toString(replicationRequest.getPaths()) + "\n\n"); // flush the buffers to make sure the container sends the bytes writer.flush(); - log.debug("SSE event {}: {}", replicationRequest.getTime(), replicationRequest.getAction()); + log.debug("SSE event {}: {} {}", new Object[]{replicationRequest.getTime(), replicationRequest.getAction(), + replicationRequest.getPaths()}); } } Modified: sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json?rev=1620884&r1=1620883&r2=1620884&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json (original) +++ sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json Wed Aug 27 12:57:39 2014 @@ -6,6 +6,6 @@ "ReplicationQueueProvider.target": "(name=sjh)", "ReplicationQueueDistributionStrategy.target": "(name=error)", "rules": ["scheduled poll every 30 sec", - "remote trigger on http://localhost:4503/libs/sling/replication/services/triggers/content-event with user admin and password admin"], + "remote trigger on http://localhost:4503/libs/sling/replication/services/triggers/content-event?3600000 with user admin and password admin"], "runModes": ["author"] } Modified: sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java?rev=1620884&r1=1620883&r2=1620884&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java (original) +++ sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java Wed Aug 27 12:57:39 2014 @@ -45,7 +45,6 @@ public abstract class ReplicationIntegra authorClient = new SlingClient(author.getServerBaseUrl(), author.getServerUsername(), author.getServerPassword()); publishClient = new SlingClient(publish.getServerBaseUrl(), publish.getServerUsername(), publish.getServerPassword()); - try { // change the url for publish agent and wait for it to start String remoteImporterUrl = publish.getServerBaseUrl() + importerUrl("default");