Author: tommaso
Date: Wed Aug 27 12:57:39 2014
New Revision: 1620884

URL: http://svn.apache.org/r1620884
Log:
SLING-3872 - fixing configuration for event based reverse replication, changed 
remote trigger rule class name

Added:
    
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java
      - copied, changed from r1620785, 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
Removed:
    
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
    
sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.rule.impl.RuleBasedReplicationTrigger-remote.json
Modified:
    
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java
    
sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json
    
sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java

Copied: 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java
 (from r1620785, 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java)
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java?p2=sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java&p1=sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java&r1=1620785&r2=1620884&rev=1620884&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
 (original)
+++ 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/rule/impl/RemoteEventReplicationRule.java
 Wed Aug 27 12:57:39 2014
@@ -21,7 +21,6 @@ package org.apache.sling.replication.rul
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
@@ -43,37 +42,29 @@ import org.apache.http.nio.ContentDecode
 import org.apache.http.nio.IOControl;
 import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
 import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
-import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
-import org.apache.sling.replication.agent.AgentReplicationException;
-import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.communication.ReplicationActionType;
 import org.apache.sling.replication.communication.ReplicationRequest;
-import org.apache.sling.replication.resources.ReplicationConstants;
 import org.apache.sling.replication.rule.ReplicationRequestHandler;
 import org.apache.sling.replication.rule.ReplicationRule;
-import org.apache.sling.replication.transport.ReplicationTransportHandler;
-import 
org.apache.sling.replication.transport.impl.ReplicationTransportConstants;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * {@link org.apache.sling.replication.rule.ReplicationRule} to trigger 
replication upon reception of server sent events
- * on a certain queue
+ * on a certain URL
  */
-@Component(immediate = true, label = "Rule for listening on Server Sent Events 
for Queues")
+@Component(immediate = true, label = "Rule for listening on Server Sent Events 
on a certain URL")
 @Service(value = ReplicationRule.class)
-public class ReplicateOnQueueEventRule implements ReplicationRule {
+public class RemoteEventReplicationRule implements ReplicationRule {
 
     @Property(label = "Name", value = "remote", propertyPrivate = true)
     private static final String NAME = "name";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final String SIGNATURE = "remote trigger on {path} with 
user {user} and password {password}";
+    private static final String SIGNATURE = "remote trigger on {host} with 
user {user} and password {password}";
 
     private static final String SIGNATURE_REGEX = 
"remote\\strigger\\son\\s([^\\s]+)\\swith\\suser\\s([^\\s]+)\\sand\\spassword\\s([^\\s]+)";
 
@@ -82,13 +73,10 @@ public class ReplicateOnQueueEventRule i
     @Reference
     private Scheduler scheduler;
 
-    private BundleContext context;
-
     private Map<String, Future<HttpResponse>> requests;
 
     @Activate
-    protected void activate(BundleContext context, Map<String, ?> config) {
-        this.context = context;
+    protected void activate() {
         this.requests = new ConcurrentHashMap<String, Future<HttpResponse>>();
     }
 
@@ -103,24 +91,21 @@ public class ReplicateOnQueueEventRule i
     public void apply(String handleId, ReplicationRequestHandler agent, String 
ruleString) {
         Matcher matcher = signaturePattern.matcher(ruleString);
         if (matcher.find()) {
-            String remotePath = matcher.group(1);
+            String remoteHost = matcher.group(1);
             String user = matcher.group(2);
             String password = matcher.group(3);
 
-
-
             try {
-                log.info("applying queue event replication rule");
+                log.info("applying remote event replication rule");
 
                 ScheduleOptions options = scheduler.NOW();
                 options.name(handleId);
-                scheduler.schedule(new EventBasedReplication(handleId, agent, 
remotePath, user, password), options);
+                scheduler.schedule(new EventBasedReplication(handleId, agent, 
remoteHost, user, password), options);
 
             } catch (Exception e) {
-                log.error("{}", e);
                 log.error("cannot apply rule {} to agent {}", ruleString, 
agent);
+                log.error("{}", e);
             }
-
         }
     }
 
@@ -136,24 +121,22 @@ public class ReplicateOnQueueEventRule i
         private final String handleId;
         private final ReplicationRequestHandler agent;
 
-
         private SSEResponseConsumer(String handleId, ReplicationRequestHandler 
agent) {
             this.handleId = handleId;
             this.agent = agent;
-
         }
 
         @Override
         protected void onContentReceived(ContentDecoder decoder, IOControl 
ioctrl) throws IOException {
-            log.info("complete ? ", decoder.isCompleted());
+            log.debug("complete {}", decoder.isCompleted());
             ByteBuffer buffer = ByteBuffer.allocate(1024);
             decoder.read(buffer);
-            log.info("content {} received {},{}", new Object[]{buffer, 
decoder, ioctrl});
-            log.info("event received");
+            log.debug("content {} received {},{}", new Object[]{buffer, 
decoder, ioctrl});
 
-            ReplicationRequest replicationRequest = new 
ReplicationRequest(System.currentTimeMillis(), ReplicationActionType.POLL, 
null);
+            // TODO : currently it always triggers poll request on /, should 
this be configurable?
+            ReplicationRequest replicationRequest = new 
ReplicationRequest(System.currentTimeMillis(), ReplicationActionType.POLL, "/");
             agent.execute(replicationRequest);
-            log.info("replication request to agent {} sent ({} on {})", new 
Object[]{
+            log.info("replication request to agent {} sent ({} {})", new 
Object[]{
                     handleId,
                     replicationRequest.getAction(),
                     replicationRequest.getPaths()});
@@ -168,8 +151,6 @@ public class ReplicateOnQueueEventRule i
         }
     }
 
-
-
     private class EventBasedReplication implements Runnable {
         private final String handleId;
         private final ReplicationRequestHandler agent;
@@ -187,14 +168,11 @@ public class ReplicateOnQueueEventRule i
 
         public void run() {
             try {
-
-
-                log.info("getting event from {}", targetTransport + 
"?sec=600");
+                log.debug("getting events from {}", targetTransport);
 
                 URI eventEndpoint = URI.create(targetTransport);
 
-
-                log.info("preparing request");
+                log.debug("preparing request");
                 CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
                 credentialsProvider.setCredentials(
                         new AuthScope(eventEndpoint.getHost(), 
eventEndpoint.getPort()),
@@ -207,7 +185,7 @@ public class ReplicateOnQueueEventRule i
                     HttpHost target = URIUtils.extractHost(get.getURI());
                     BasicAsyncRequestProducer basicAsyncRequestProducer = new 
BasicAsyncRequestProducer(target, get);
                     httpClient.start();
-                    log.info("sending request");
+                    log.debug("sending request");
                     Future<HttpResponse> futureResponse = httpClient.execute(
                             basicAsyncRequestProducer,
                             new SSEResponseConsumer(handleId, agent), null);
@@ -216,7 +194,7 @@ public class ReplicateOnQueueEventRule i
                 } finally {
                     httpClient.close();
                 }
-                log.info("request finished");
+                log.debug("request finished");
             } catch (Exception e) {
                 log.error("cannot execute event based replication");
             }

Modified: 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java?rev=1620884&r1=1620883&r2=1620884&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java
 (original)
+++ 
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationTriggerServlet.java
 Wed Aug 27 12:57:39 2014
@@ -18,33 +18,26 @@
  */
 package org.apache.sling.replication.servlet;
 
-import org.apache.felix.scr.annotations.*;
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.api.SlingHttpServletRequest;
 import org.apache.sling.api.SlingHttpServletResponse;
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
-import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.communication.ReplicationRequest;
-import org.apache.sling.replication.event.ReplicationEvent;
-import org.apache.sling.replication.event.ReplicationEventType;
-import org.apache.sling.replication.resources.ReplicationConstants;
 import org.apache.sling.replication.rule.ReplicationRequestHandler;
 import org.apache.sling.replication.rule.ReplicationTrigger;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.Servlet;
-import javax.servlet.ServletException;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Triggers Server Sent Events servlet
  */
@@ -57,29 +50,23 @@ import java.util.concurrent.ConcurrentHa
 })
 public class ReplicationTriggerServlet extends SlingAllMethodsServlet {
 
-    private final int MAX_NUMBER_OF_SECONDS = 3600;
-    private final int DEFAULT_NUMBER_OF_SECONDS = 60;
-    private final String HANDLER_NAME = "ReplicationTriggerServlet";
-
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private final static int DEFAULT_NUMBER_OF_SECONDS = 60;
+
     @Override
     protected void doGet(SlingHttpServletRequest request, 
SlingHttpServletResponse response)
             throws ServletException, IOException {
 
-        int seconds = DEFAULT_NUMBER_OF_SECONDS;
+        String secondsParameter = request.getParameter("sec");
 
-        try{
-            seconds = Integer.parseInt(request.getParameter("sec"));
-
-        } catch(Throwable e){
-
-        }
+        int seconds = secondsParameter != null && secondsParameter.length() > 
0 ? Integer.parseInt(secondsParameter) :
+                DEFAULT_NUMBER_OF_SECONDS;
 
+        int MAX_NUMBER_OF_SECONDS = 3600;
         if (seconds > MAX_NUMBER_OF_SECONDS) {
             seconds = MAX_NUMBER_OF_SECONDS;
-        }
-        else if (seconds < 0) {
+        } else if (seconds < 0) {
             seconds = DEFAULT_NUMBER_OF_SECONDS;
         }
 
@@ -96,7 +83,7 @@ public class ReplicationTriggerServlet e
 
         final PrintWriter writer = response.getWriter();
 
-        String handlerId = HANDLER_NAME + UUID.randomUUID().toString();
+        String handlerId = "ReplicationTriggerServlet" + 
UUID.randomUUID().toString();
 
         replicationTrigger.register(handlerId, new ReplicationRequestHandler() 
{
             public void execute(ReplicationRequest request) {
@@ -123,10 +110,11 @@ public class ReplicationTriggerServlet e
         // write the actual data
         // this could be simple text or could be JSON-encoded text that the
         // client then decodes
-        writer.write("data: " + replicationRequest.getAction() + "\n\n");
+        writer.write("data: " + replicationRequest.getAction() + " " + 
Arrays.toString(replicationRequest.getPaths()) + "\n\n");
 
         // flush the buffers to make sure the container sends the bytes
         writer.flush();
-        log.debug("SSE event {}: {}", replicationRequest.getTime(), 
replicationRequest.getAction());
+        log.debug("SSE event {}: {} {}", new 
Object[]{replicationRequest.getTime(), replicationRequest.getAction(),
+                replicationRequest.getPaths()});
     }
 }

Modified: 
sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json?rev=1620884&r1=1620883&r2=1620884&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json
 (original)
+++ 
sling/trunk/contrib/extensions/replication/core/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.SimpleReplicationAgentFactory-publish-reverse.json
 Wed Aug 27 12:57:39 2014
@@ -6,6 +6,6 @@
     "ReplicationQueueProvider.target": "(name=sjh)",
     "ReplicationQueueDistributionStrategy.target": "(name=error)",
     "rules": ["scheduled poll every 30 sec",
-        "remote trigger on 
http://localhost:4503/libs/sling/replication/services/triggers/content-event 
with user admin and password admin"],
+        "remote trigger on 
http://localhost:4503/libs/sling/replication/services/triggers/content-event?3600000
 with user admin and password admin"],
     "runModes": ["author"]
 }

Modified: 
sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java?rev=1620884&r1=1620883&r2=1620884&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java
 (original)
+++ 
sling/trunk/contrib/extensions/replication/it/src/test/java/org/apache/sling/replication/it/ReplicationIntegrationTestBase.java
 Wed Aug 27 12:57:39 2014
@@ -45,7 +45,6 @@ public abstract class ReplicationIntegra
         authorClient = new SlingClient(author.getServerBaseUrl(), 
author.getServerUsername(), author.getServerPassword());
         publishClient = new SlingClient(publish.getServerBaseUrl(), 
publish.getServerUsername(), publish.getServerPassword());
 
-
         try {
             // change the url for publish agent and wait for it to start
             String remoteImporterUrl = publish.getServerBaseUrl() + 
importerUrl("default");


Reply via email to