Author: tommaso
Date: Thu Aug 18 14:17:38 2016
New Revision: 1756791

URL: http://svn.apache.org/viewvc?rev=1756791&view=rev
Log:
SLING-5977 - moved SimpleDistributionAgent inner classes out, added tests, 
created proper class for agent authentication info

Added:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java
   (with props)
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java
   (with props)
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
   (with props)
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java
   (with props)
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java
   (with props)
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java
   (with props)
Modified:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java?rev=1756791&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java
 Thu Aug 18 14:17:38 2016
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.component.impl.DistributionComponentKind;
+import org.apache.sling.distribution.event.DistributionEventTopics;
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.impl.SimpleDistributionResponse;
+import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import 
org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+
+/**
+ * The package exporter callback function is responsible to process the 
exported packages.
+ * The exported packages are scheduled for import by passing them to a {@link 
DistributionQueueDispatchingStrategy}.
+ */
+class DistributionPackageExporterProcessor implements 
DistributionPackageProcessor {
+
+    private final String callingUser;
+    private final String requestId;
+    private final long requestStartTime;
+    private final AtomicInteger packagesCount = new AtomicInteger();
+    private final AtomicLong packagesSize = new AtomicLong();
+    private final List<DistributionResponse> allResponses = new 
LinkedList<DistributionResponse>();
+
+    private final DistributionEventFactory distributionEventFactory;
+    private final DistributionQueueDispatchingStrategy scheduleQueueStrategy;
+    private final DistributionQueueProvider queueProvider;
+    private final DefaultDistributionLog log;
+    private final String agentName;
+
+    public List<DistributionResponse> getAllResponses() {
+        return allResponses;
+    }
+
+    public int getPackagesCount() {
+        return packagesCount.get();
+    }
+
+    public long getPackagesSize() {
+        return packagesSize.get();
+    }
+
+    DistributionPackageExporterProcessor(@Nullable String callingUser, 
@Nonnull String requestId, long requestStartTime,
+                                         @Nonnull DistributionEventFactory 
distributionEventFactory,
+                                         @Nonnull 
DistributionQueueDispatchingStrategy scheduleQueueStrategy,
+                                         @Nonnull DistributionQueueProvider 
queueProvider, @Nonnull DefaultDistributionLog log,
+                                         @Nonnull String agentName) {
+        this.callingUser = callingUser;
+        this.requestId = requestId;
+        this.requestStartTime = requestStartTime;
+        this.distributionEventFactory = distributionEventFactory;
+        this.scheduleQueueStrategy = scheduleQueueStrategy;
+        this.queueProvider = queueProvider;
+        this.log = log;
+        this.agentName = agentName;
+    }
+
+    @Override
+    public void process(DistributionPackage distributionPackage) {
+        final long startTime = System.currentTimeMillis();
+
+        Collection<SimpleDistributionResponse> responses = 
scheduleImportPackage(distributionPackage, callingUser,
+                requestId, requestStartTime);
+        packagesCount.incrementAndGet();
+        packagesSize.addAndGet(distributionPackage.getSize());
+        allResponses.addAll(responses);
+
+        final long endTime = System.currentTimeMillis();
+
+        log.debug("PACKAGE-QUEUED {}: packageId={}, paths={}, queueTime={}ms, 
responses={}", requestId, distributionPackage.getId(),
+                distributionPackage.getInfo().getPaths(), endTime - startTime, 
responses.size());
+    }
+
+    private Collection<SimpleDistributionResponse> 
scheduleImportPackage(DistributionPackage distributionPackage, String 
callingUser, String requestId, long startTime) {
+        Collection<SimpleDistributionResponse> distributionResponses = new 
LinkedList<SimpleDistributionResponse>();
+
+        // dispatch the distribution package to one or more queues
+        try {
+            // add metadata to the package
+            
distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER,
 callingUser);
+            
distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID,
 requestId);
+            
distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME,
 startTime);
+
+            // put the package in the queue
+            Iterable<DistributionQueueItemStatus> states = 
scheduleQueueStrategy.add(distributionPackage, queueProvider);
+            for (DistributionQueueItemStatus state : states) {
+                DistributionRequestState requestState = 
getRequestStateFromQueueState(state.getItemState());
+                distributionResponses.add(new 
SimpleDistributionResponse(requestState, state.getItemState().toString()));
+            }
+
+            
distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_QUEUED,
+                    DistributionComponentKind.AGENT, agentName, 
distributionPackage.getInfo());
+        } catch (DistributionException e) {
+            log.error("an error happened during dispatching items to the 
queue(s)", e);
+            distributionResponses.add(new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
+        }
+
+        return distributionResponses;
+    }
+
+    /* Convert the state of a certain item in the queue into a request state */
+    private DistributionRequestState 
getRequestStateFromQueueState(DistributionQueueItemState itemState) {
+        DistributionRequestState requestState;
+        switch (itemState) {
+            case QUEUED:
+                requestState = DistributionRequestState.ACCEPTED;
+                break;
+            case ERROR:
+                requestState = DistributionRequestState.DROPPED;
+                break;
+            default:
+                requestState = DistributionRequestState.DROPPED;
+                break;
+        }
+        return requestState;
+    }
+}

Propchange: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1756791&r1=1756790&r2=1756791&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
 Thu Aug 18 14:17:38 2016
@@ -20,23 +20,12 @@ package org.apache.sling.distribution.ag
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.SimpleCredentials;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.DistributionRequest;
@@ -45,38 +34,28 @@ import org.apache.sling.distribution.Dis
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgent;
 import org.apache.sling.distribution.agent.DistributionAgentState;
-import org.apache.sling.distribution.common.RecoverableDistributionException;
+import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.component.impl.DistributionComponentKind;
 import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.event.DistributionEventTopics;
 import org.apache.sling.distribution.event.impl.DistributionEventFactory;
 import org.apache.sling.distribution.impl.CompositeDistributionResponse;
-import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.impl.SimpleDistributionResponse;
 import org.apache.sling.distribution.log.DistributionLog;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
-import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
-import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
-import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.queue.DistributionQueue;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.DistributionQueueState;
 import 
org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue;
 import org.apache.sling.distribution.trigger.DistributionRequestHandler;
 import org.apache.sling.distribution.trigger.DistributionTrigger;
 import org.apache.sling.distribution.util.impl.DistributionUtils;
 import org.apache.sling.jcr.api.SlingRepository;
-import org.apache.sling.jcr.resource.JcrResourceConstants;
-
 
 /**
  * Basic implementation of a {@link 
org.apache.sling.distribution.agent.DistributionAgent}
@@ -92,22 +71,19 @@ public class SimpleDistributionAgent imp
     private final DistributionRequestAuthorizationStrategy 
distributionRequestAuthorizationStrategy;
     private final DefaultDistributionLog log;
     private final DistributionEventFactory distributionEventFactory;
+    private final DistributionQueueProcessor queueProcessor;
     private AgentBasedRequestHandler agentBasedRequestHandler;
 
-    private final SlingRepository slingRepository;
-    private final ResourceResolverFactory resourceResolverFactory;
-
     private final String name;
     private final boolean queueProcessingEnabled;
     private final DistributionRequestType[] allowedRequests;
-    private final String subServiceName;
     private boolean active = false;
     private final Set<String> processingQueues;
-    private final int retryAttempts;
-    private final boolean impersonateUser;
     private final String[] allowedRoots;
     private final AtomicInteger nextRequestId = new AtomicInteger();
 
+    private final SimpleDistributionAgentAuthenticationInfo 
agentAuthenticationInfo;
+
     public SimpleDistributionAgent(String name,
                                    boolean queueProcessingEnabled,
                                    Set<String> processingQueues,
@@ -125,19 +101,14 @@ public class SimpleDistributionAgent imp
                                    DistributionRequestType[] allowedRequests,
                                    String[] allowedRoots,
                                    int retryAttempts) {
-        this.slingRepository = slingRepository;
         this.log = log;
         this.allowedRequests = allowedRequests;
         this.processingQueues = processingQueues;
-        this.retryAttempts = retryAttempts;
 
         validateConfiguration(name, queueProcessingEnabled, subServiceName, 
distributionPackageImporter, distributionPackageExporter, 
distributionRequestAuthorizationStrategy, queueProvider, scheduleQueueStrategy, 
distributionEventFactory, resourceResolverFactory);
 
         this.allowedRoots = SettingsUtils.removeEmptyEntries(allowedRoots);
-        this.subServiceName = SettingsUtils.removeEmptyEntry(subServiceName);
-        this.impersonateUser = this.subServiceName == null;
         this.distributionRequestAuthorizationStrategy = 
distributionRequestAuthorizationStrategy;
-        this.resourceResolverFactory = resourceResolverFactory;
         this.name = SettingsUtils.removeEmptyEntry(name);
         this.queueProcessingEnabled = queueProcessingEnabled;
         this.distributionPackageImporter = distributionPackageImporter;
@@ -146,6 +117,9 @@ public class SimpleDistributionAgent imp
         this.scheduleQueueStrategy = scheduleQueueStrategy;
         this.errorQueueStrategy = errorQueueStrategy;
         this.distributionEventFactory = distributionEventFactory;
+        this.agentAuthenticationInfo = new 
SimpleDistributionAgentAuthenticationInfo(slingRepository, 
DEFAULT_AGENT_SERVICE, resourceResolverFactory, subServiceName);
+        this.queueProcessor = new 
SimpleDistributionAgentQueueProcessor(distributionPackageExporter, 
distributionPackageImporter,
+                retryAttempts, errorQueueStrategy, log, queueProvider, 
distributionEventFactory, agentAuthenticationInfo, name);
     }
 
     private void validateConfiguration(String name, boolean 
queueProcessingEnabled, String subServiceName, DistributionPackageImporter 
distributionPackageImporter, DistributionPackageExporter 
distributionPackageExporter, DistributionRequestAuthorizationStrategy 
distributionRequestAuthorizationStrategy, DistributionQueueProvider 
queueProvider, DistributionQueueDispatchingStrategy scheduleQueueStrategy, 
DistributionEventFactory distributionEventFactory, ResourceResolverFactory 
resourceResolverFactory) {
@@ -178,7 +152,7 @@ public class SimpleDistributionAgent imp
 
         ResourceResolver agentResourceResolver = null;
 
-        final String requestId = "DSTRQ"+ nextRequestId.incrementAndGet();
+        final String requestId = "DSTRQ" + nextRequestId.incrementAndGet();
         String callingUser = resourceResolver.getUserID();
 
         try {
@@ -196,13 +170,15 @@ public class SimpleDistributionAgent imp
 
             boolean silent = 
DistributionRequestType.PULL.equals(distributionRequest.getRequestType());
 
-            log.info(silent, "REQUEST-START {}: {} paths={}, user={}",  new 
Object[]{ requestId,
+            log.info(silent, "REQUEST-START {}: {} paths={}, user={}", new 
Object[]{requestId,
                     distributionRequest.getRequestType(), 
distributionRequest.getPaths(), callingUser});
 
             // check permissions
             
distributionRequestAuthorizationStrategy.checkPermission(resourceResolver, 
distributionRequest);
 
-            agentResourceResolver = getAgentResourceResolver(callingUser);
+            agentResourceResolver = 
DistributionUtils.getResourceResolver(callingUser, 
agentAuthenticationInfo.getAgentService(),
+                    agentAuthenticationInfo.getSlingRepository(), 
agentAuthenticationInfo.getSubServiceName(),
+                    agentAuthenticationInfo.getResourceResolverFactory());
 
             // export packages
             CompositeDistributionResponse distributionResponse = 
exportPackages(agentResourceResolver, distributionRequest, callingUser, 
requestId);
@@ -222,7 +198,7 @@ public class SimpleDistributionAgent imp
                     distributionRequest.getRequestType(), 
distributionRequest.getPaths(), callingUser, e.getMessage()});
             throw e;
         } finally {
-            ungetAgentResourceResolver(agentResourceResolver);
+            DistributionUtils.ungetResourceResolver(agentResourceResolver);
         }
     }
 
@@ -233,7 +209,8 @@ public class SimpleDistributionAgent imp
     private CompositeDistributionResponse exportPackages(ResourceResolver 
agentResourceResolver, DistributionRequest distributionRequest, String 
callingUser, String requestId) throws DistributionException {
         final long startTime = System.currentTimeMillis();
         // callback function
-        PackageExporterProcessor packageProcessor = new 
PackageExporterProcessor(callingUser, requestId, startTime);
+        DistributionPackageExporterProcessor packageProcessor = new 
DistributionPackageExporterProcessor(callingUser, requestId, startTime,
+                distributionEventFactory, scheduleQueueStrategy, 
queueProvider, log, name);
 
         // export packages
         distributionPackageExporter.exportPackages(agentResourceResolver, 
distributionRequest, packageProcessor);
@@ -250,34 +227,6 @@ public class SimpleDistributionAgent imp
         return new CompositeDistributionResponse(distributionResponses, 
packagesCount, packagesSize, endTime - startTime);
     }
 
-
-    private Collection<SimpleDistributionResponse> 
scheduleImportPackage(DistributionPackage distributionPackage, String 
callingUser, String requestId, long startTime) {
-        Collection<SimpleDistributionResponse> distributionResponses = new 
LinkedList<SimpleDistributionResponse>();
-
-        // dispatch the distribution package to the queue distribution handler
-        try {
-            // add metadata to packages
-            
distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER,
 callingUser);
-            
distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID,
 requestId);
-            
distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME,
 startTime);
-
-            // put the packages in the queue
-            Iterable<DistributionQueueItemStatus> states = 
scheduleQueueStrategy.add(distributionPackage, queueProvider);
-            for (DistributionQueueItemStatus state : states) {
-                DistributionRequestState requestState = 
getRequestStateFromQueueState(state.getItemState());
-                distributionResponses.add(new 
SimpleDistributionResponse(requestState, state.getItemState().toString()));
-            }
-
-            generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_QUEUED, 
distributionPackage);
-        } catch (DistributionException e) {
-            log.error("an error happened during dispatching items to the 
queue(s)", e);
-            distributionResponses.add(new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
-        }
-
-        return distributionResponses;
-    }
-
-
     @Nonnull
     public Set<String> getQueueNames() {
         Set<String> queueNames = new TreeSet<String>();
@@ -347,7 +296,7 @@ public class SimpleDistributionAgent imp
 
         if (!isPassive()) {
             try {
-                queueProvider.enableQueueProcessing(new 
PackageQueueProcessor(), processingQueues.toArray(new 
String[processingQueues.size()]));
+                queueProvider.enableQueueProcessing(queueProcessor, 
processingQueues.toArray(new String[processingQueues.size()]));
             } catch (DistributionException e) {
                 log.error("cannot enable queue processing", e);
             }
@@ -402,126 +351,6 @@ public class SimpleDistributionAgent imp
 
     }
 
-    private boolean processQueueItem(String queueName, DistributionQueueEntry 
queueEntry) throws DistributionException {
-        boolean removeItemFromQueue = false;
-        ResourceResolver agentResourceResolver = null;
-        DistributionPackage distributionPackage = null;
-        DistributionQueueItem queueItem = queueEntry.getItem();
-        DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
-        try {
-
-            String callingUser = 
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, 
String.class);
-            String requestId = 
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, 
String.class);
-            Long globalStartTime = 
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME,
 Long.class);
-
-            agentResourceResolver = getAgentResourceResolver(callingUser);
-
-            final long startTime = System.currentTimeMillis();
-
-            distributionPackage = 
distributionPackageExporter.getPackage(agentResourceResolver, 
queueItem.getPackageId());
-
-            if (distributionPackage != null) {
-                final long packageSize = distributionPackage.getSize();
-                
DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), 
queueEntry);
-
-                final DistributionRequestType requestType = 
distributionPackage.getInfo().getRequestType();
-                final String[] paths = 
distributionPackage.getInfo().getPaths();
-
-                try {
-                    
distributionPackageImporter.importPackage(agentResourceResolver, 
distributionPackage);
-                    
generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, 
distributionPackage);
-                    removeItemFromQueue = true;
-                    final long endTime = System.currentTimeMillis();
-
-                    log.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, 
importTime={}ms, execTime={}ms, size={}B", new Object[] {
-                            queueName, requestId,
-                            requestType, paths,
-                            endTime - startTime, endTime - globalStartTime,
-                            packageSize
-                    });
-                } catch (RecoverableDistributionException e) {
-                    log.error("[{}] PACKAGE-FAIL {}: could not deliver {}, 
{}", queueName, requestId, distributionPackage.getId(), e.getMessage());
-                    log.debug("could not deliver package {}", 
distributionPackage.getId(), e);
-                } catch (Throwable e) {
-                    log.error("[{}] PACKAGE-FAIL {}: could not deliver package 
{} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e);
-
-                    if (errorQueueStrategy != null && 
queueItemStatus.getAttempts() > retryAttempts) {
-                        removeItemFromQueue = 
reEnqueuePackage(distributionPackage);
-                        log.info("[{}] PACKAGE-QUEUED {}: distribution package 
{} was enqueued to an error queue", queueName, requestId, 
distributionPackage.getId());
-                    }
-                }
-            } else {
-                removeItemFromQueue = true; // return success if package does 
not exist in order to clear the queue.
-                log.error("distribution package with id {} does not exist. the 
package will be skipped.", queueItem.getPackageId());
-            }
-        } finally {
-            if (removeItemFromQueue) {
-                DistributionPackageUtils.releaseOrDelete(distributionPackage, 
queueName);
-            } else {
-                DistributionPackageUtils.closeSafely(distributionPackage);
-            }
-            ungetAgentResourceResolver(agentResourceResolver);
-        }
-
-        // return true if item should be removed from queue
-        return removeItemFromQueue;
-    }
-
-
-    private boolean reEnqueuePackage(DistributionPackage distributionPackage) {
-
-        if (errorQueueStrategy == null) {
-            return false;
-        }
-
-        try {
-            errorQueueStrategy.add(distributionPackage, queueProvider);
-        } catch (DistributionException e) {
-            log.error("could not reenqueue package {}", 
distributionPackage.getId(), e);
-            return false;
-        }
-
-        return true;
-    }
-
-    private ResourceResolver getAgentResourceResolver(String user) throws 
DistributionException {
-        ResourceResolver resourceResolver;
-
-        try {
-            Map<String, Object> authenticationInfo = new HashMap<String, 
Object>();
-
-            if (impersonateUser && user != null) {
-                Session session = 
slingRepository.impersonateFromService(DEFAULT_AGENT_SERVICE, new 
SimpleCredentials(user, new char[0]), null);
-                
authenticationInfo.put(JcrResourceConstants.AUTHENTICATION_INFO_SESSION, 
session);
-                resourceResolver = 
resourceResolverFactory.getResourceResolver(authenticationInfo);
-            } else {
-                authenticationInfo.put(ResourceResolverFactory.SUBSERVICE, 
subServiceName);
-                resourceResolver = 
resourceResolverFactory.getServiceResourceResolver(authenticationInfo);
-            }
-
-            return resourceResolver;
-        } catch (LoginException le) {
-            throw new DistributionException(le);
-        } catch (RepositoryException re) {
-            throw new DistributionException(re);
-        }
-    }
-
-    private void ungetAgentResourceResolver(ResourceResolver resourceResolver) 
{
-
-        if (resourceResolver != null) {
-            try {
-                if (resourceResolver.hasChanges()) {
-                    resourceResolver.commit();
-                }
-            } catch (PersistenceException e) {
-                log.error("cannot commit changes to resource resolver", e);
-            } finally {
-                DistributionUtils.safelyLogout(resourceResolver);
-            }
-        }
-    }
-
     private void generatePackageEvent(String topic, DistributionPackage... 
distributionPackages) {
         for (DistributionPackage distributionPackage : distributionPackages) {
             distributionEventFactory.generatePackageEvent(topic, 
DistributionComponentKind.AGENT, name, distributionPackage.getInfo());
@@ -582,82 +411,6 @@ public class SimpleDistributionAgent imp
         return true;
     }
 
-    /**
-     * processor for items in the queue
-     */
-    class PackageQueueProcessor implements DistributionQueueProcessor {
-        public boolean process(@Nonnull String queueName, @Nonnull 
DistributionQueueEntry queueEntry) {
-            DistributionQueueItem queueItem = queueEntry.getItem();
-
-            try {
-                final long startTime = System.currentTimeMillis();
-
-                log.debug("[{}] ITEM-PROCESS processing item={}", queueName, 
queueItem);
-
-                boolean success = processQueueItem(queueName, queueEntry);
-
-                final long endTime = System.currentTimeMillis();
-
-
-                log.debug("[{}] ITEM-PROCESSED item={}, status={}, 
processingTime={}ms", queueName, queueItem, success, endTime - startTime);
-
-                return success;
-
-            } catch (Throwable t) {
-                log.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t);
-                return false;
-            }
-        }
-    }
-
-    /**
-     * The package exporter callback function to process the exported packages.
-     * The exported packages are scheduled for import.
-     */
-    class PackageExporterProcessor implements DistributionPackageProcessor {
-
-        private final String callingUser;
-        private final String requestId;
-        private final long requestStartTime;
-        private final AtomicInteger packagesCount = new AtomicInteger();
-        private final AtomicLong packagesSize = new AtomicLong();
-        private final List<DistributionResponse> allResponses = new 
ArrayList<DistributionResponse>();
-
-        public List<DistributionResponse> getAllResponses() {
-            return allResponses;
-        }
-
-        public int getPackagesCount() {
-            return packagesCount.get();
-        }
-
-        public long getPackagesSize() {
-            return packagesSize.get();
-        }
-
-        PackageExporterProcessor(String callingUser, String requestId, long 
requestStartTime) {
-            this.callingUser = callingUser;
-            this.requestId = requestId;
-            this.requestStartTime = requestStartTime;
-        }
-
-        @Override
-        public void process(DistributionPackage distributionPackage) {
-            final long startTime = System.currentTimeMillis();
-
-            Collection<SimpleDistributionResponse> responses = 
scheduleImportPackage(distributionPackage, callingUser,
-                    requestId, requestStartTime);
-            packagesCount.incrementAndGet();
-            packagesSize.addAndGet(distributionPackage.getSize());
-            allResponses.addAll(responses);
-
-            final long endTime = System.currentTimeMillis();
-
-            log.debug("PACKAGE-QUEUED {}: packageId={}, paths={}, 
queueTime={}ms, responses={}", requestId, distributionPackage.getId(),
-                    distributionPackage.getInfo().getPaths(), endTime - 
startTime, responses.size());
-        }
-    }
-
     public class AgentBasedRequestHandler implements 
DistributionRequestHandler {
         private final DistributionAgent agent;
 
@@ -682,34 +435,20 @@ public class SimpleDistributionAgent imp
                 ResourceResolver agentResourceResolver = null;
 
                 try {
-                    agentResourceResolver = getAgentResourceResolver(null);
+                    agentResourceResolver = 
DistributionUtils.getResourceResolver(null, 
agentAuthenticationInfo.getAgentService(),
+                            agentAuthenticationInfo.getSlingRepository(), 
agentAuthenticationInfo.getSubServiceName(),
+                            
agentAuthenticationInfo.getResourceResolverFactory());
 
                     agent.execute(agentResourceResolver, request);
                 } catch (Throwable e) {
                     log.error("Error executing handler {}", request, e);
                 } finally {
-                    ungetAgentResourceResolver(agentResourceResolver);
+                    
DistributionUtils.ungetResourceResolver(agentResourceResolver);
                 }
             }
 
         }
     }
 
-    /* Convert the state of a certain item in the queue into a request state */
-    private DistributionRequestState 
getRequestStateFromQueueState(DistributionQueueItemState itemState) {
-        DistributionRequestState requestState;
-        switch (itemState) {
-            case QUEUED:
-                requestState = DistributionRequestState.ACCEPTED;
-                break;
-            case ERROR:
-                requestState = DistributionRequestState.DROPPED;
-                break;
-            default:
-                requestState = DistributionRequestState.DROPPED;
-                break;
-        }
-        return requestState;
-    }
 
 }

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java?rev=1756791&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java
 Thu Aug 18 14:17:38 2016
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.jcr.api.SlingRepository;
+
+/**
+ * Authentication information required by a {@link SimpleDistributionAgent} to 
perform its tasks.
+ */
+public class SimpleDistributionAgentAuthenticationInfo {
+
+    private final SlingRepository slingRepository;
+    private final String agentService;
+
+    private final ResourceResolverFactory resourceResolverFactory;
+    private final String subServiceName;
+
+    public SimpleDistributionAgentAuthenticationInfo(@Nonnull SlingRepository 
slingRepository, @Nonnull String agentService,
+                                                     @Nonnull 
ResourceResolverFactory resourceResolverFactory,
+                                                     @Nullable String 
subServiceName) {
+        this.slingRepository = slingRepository;
+        this.agentService = agentService;
+        this.resourceResolverFactory = resourceResolverFactory;
+        this.subServiceName = subServiceName;
+    }
+
+    public SlingRepository getSlingRepository() {
+        return slingRepository;
+    }
+
+    public String getAgentService() {
+        return agentService;
+    }
+
+    public ResourceResolverFactory getResourceResolverFactory() {
+        return resourceResolverFactory;
+    }
+
+    public String getSubServiceName() {
+        return subServiceName;
+    }
+}

Propchange: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java?rev=1756791&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
 Thu Aug 18 14:17:38 2016
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import javax.annotation.Nonnull;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.common.RecoverableDistributionException;
+import org.apache.sling.distribution.component.impl.DistributionComponentKind;
+import org.apache.sling.distribution.event.DistributionEventTopics;
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageExporter;
+import org.apache.sling.distribution.packaging.DistributionPackageImporter;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import 
org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+
+/**
+ * A processor of agent queue entries, each entry's underlying package is 
fecthed and passed to the {@link DistributionPackageImporter} for import.
+ */
+class SimpleDistributionAgentQueueProcessor implements 
DistributionQueueProcessor {
+
+    private final DistributionPackageExporter distributionPackageExporter;
+    private final DistributionPackageImporter distributionPackageImporter;
+    private final int retryAttempts;
+    private final DistributionQueueDispatchingStrategy errorQueueStrategy;
+    private final DefaultDistributionLog log;
+    private final DistributionQueueProvider queueProvider;
+    private final DistributionEventFactory distributionEventFactory;
+    private final SimpleDistributionAgentAuthenticationInfo authenticationInfo;
+    private final String agentName;
+
+    public SimpleDistributionAgentQueueProcessor(DistributionPackageExporter 
distributionPackageExporter,
+                                                 DistributionPackageImporter 
distributionPackageImporter, int retryAttempts,
+                                                 
DistributionQueueDispatchingStrategy errorQueueStrategy, DefaultDistributionLog 
log,
+                                                 DistributionQueueProvider 
queueProvider, DistributionEventFactory distributionEventFactory,
+                                                 
SimpleDistributionAgentAuthenticationInfo authenticationInfo, String agentName) 
{
+        this.distributionPackageExporter = distributionPackageExporter;
+
+        this.distributionPackageImporter = distributionPackageImporter;
+        this.retryAttempts = retryAttempts;
+        this.errorQueueStrategy = errorQueueStrategy;
+        this.log = log;
+        this.queueProvider = queueProvider;
+        this.distributionEventFactory = distributionEventFactory;
+        this.authenticationInfo = authenticationInfo;
+        this.agentName = agentName;
+    }
+
+    @Override
+    public boolean process(@Nonnull String queueName, @Nonnull 
DistributionQueueEntry queueEntry) {
+        DistributionQueueItem queueItem = queueEntry.getItem();
+
+        try {
+            final long startTime = System.currentTimeMillis();
+
+            log.debug("[{}] ITEM-PROCESS processing item={}", queueName, 
queueItem);
+
+            boolean success = processQueueItem(queueName, queueEntry);
+
+            final long endTime = System.currentTimeMillis();
+
+            log.debug("[{}] ITEM-PROCESSED item={}, status={}, 
processingTime={}ms", queueName, queueItem, success, endTime - startTime);
+
+            return success;
+
+        } catch (Throwable t) {
+            log.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t);
+            return false;
+        }
+    }
+
+    private boolean processQueueItem(String queueName, DistributionQueueEntry 
queueEntry) throws DistributionException {
+        boolean removeItemFromQueue = false;
+        ResourceResolver agentResourceResolver = null;
+        DistributionPackage distributionPackage = null;
+        DistributionQueueItem queueItem = queueEntry.getItem();
+        DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
+        try {
+
+            String callingUser = 
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, 
String.class);
+            String requestId = 
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, 
String.class);
+            Long globalStartTime = 
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME,
 Long.class);
+
+            agentResourceResolver = 
DistributionUtils.getResourceResolver(callingUser, 
authenticationInfo.getAgentService(),
+                    authenticationInfo.getSlingRepository(), 
authenticationInfo.getSubServiceName(),
+                    authenticationInfo.getResourceResolverFactory());
+
+            final long startTime = System.currentTimeMillis();
+
+            distributionPackage = 
distributionPackageExporter.getPackage(agentResourceResolver, 
queueItem.getPackageId());
+
+            if (distributionPackage != null) {
+                final long packageSize = distributionPackage.getSize();
+                
DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), 
queueEntry);
+
+                final DistributionRequestType requestType = 
distributionPackage.getInfo().getRequestType();
+                final String[] paths = 
distributionPackage.getInfo().getPaths();
+
+                try {
+                    // import package
+                    
distributionPackageImporter.importPackage(agentResourceResolver, 
distributionPackage);
+
+                    // generated event
+                    
distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
+                            DistributionComponentKind.AGENT, agentName, 
distributionPackage.getInfo());
+
+                    removeItemFromQueue = true;
+                    final long endTime = System.currentTimeMillis();
+
+                    log.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, 
importTime={}ms, execTime={}ms, size={}B", new Object[]{
+                            queueName, requestId,
+                            requestType, paths,
+                            endTime - startTime, endTime - globalStartTime,
+                            packageSize
+                    });
+                } catch (RecoverableDistributionException e) {
+                    log.error("[{}] PACKAGE-FAIL {}: could not deliver {}, 
{}", queueName, requestId, distributionPackage.getId(), e.getMessage());
+                    log.debug("could not deliver package {}", 
distributionPackage.getId(), e);
+                } catch (Throwable e) {
+                    log.error("[{}] PACKAGE-FAIL {}: could not deliver package 
{} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e);
+
+                    if (errorQueueStrategy != null && 
queueItemStatus.getAttempts() > retryAttempts) {
+                        removeItemFromQueue = 
reEnqueuePackage(distributionPackage);
+                        log.info("[{}] PACKAGE-QUEUED {}: distribution package 
{} was enqueued to an error queue", queueName, requestId, 
distributionPackage.getId());
+                    }
+                }
+            } else {
+                removeItemFromQueue = true; // return success if package does 
not exist in order to clear the queue.
+                log.error("distribution package with id {} does not exist. the 
package will be skipped.", queueItem.getPackageId());
+            }
+        } finally {
+            if (removeItemFromQueue) {
+                DistributionPackageUtils.releaseOrDelete(distributionPackage, 
queueName);
+            } else {
+                DistributionPackageUtils.closeSafely(distributionPackage);
+            }
+            DistributionUtils.ungetResourceResolver(agentResourceResolver);
+        }
+
+        // return true if item should be removed from queue
+        return removeItemFromQueue;
+    }
+
+    private boolean reEnqueuePackage(DistributionPackage distributionPackage) {
+
+        if (errorQueueStrategy == null) {
+            return false;
+        }
+
+        try {
+            errorQueueStrategy.add(distributionPackage, queueProvider);
+        } catch (DistributionException e) {
+            log.error("could not reenqueue package {}", 
distributionPackage.getId(), e);
+            return false;
+        }
+
+        return true;
+    }
+
+}

Propchange: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java?rev=1756791&r1=1756790&r2=1756791&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
 Thu Aug 18 14:17:38 2016
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.distribution.util.impl;
 
+import javax.annotation.Nonnull;
+
 import static java.lang.Math.min;
 
 import java.io.File;
@@ -52,7 +54,7 @@ final class ByteBufferBackedInputStream
         return memory.get() & 0xFF;
     }
 
-    public int read(byte[] bytes, int off, int len) throws IOException {
+    public int read(@Nonnull byte[] bytes, int off, int len) throws 
IOException {
         if (!memory.hasRemaining()) {
             if (fileInputStream != null) {
                 return fileInputStream.read(bytes, off, len);

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java?rev=1756791&r1=1756790&r2=1756791&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java
 Thu Aug 18 14:17:38 2016
@@ -20,12 +20,18 @@
 package org.apache.sling.distribution.util.impl;
 
 import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.jcr.resource.JcrResourceConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jcr.RepositoryException;
 import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -55,4 +61,44 @@ public class DistributionUtils {
             log.error("cannot safely close resource resolver {}", 
resourceResolver);
         }
     }
+
+    public static void ungetResourceResolver(ResourceResolver 
resourceResolver) {
+
+        if (resourceResolver != null) {
+            try {
+                if (resourceResolver.hasChanges()) {
+                    resourceResolver.commit();
+                }
+            } catch (PersistenceException e) {
+                log.error("cannot commit changes to resource resolver", e);
+            } finally {
+                safelyLogout(resourceResolver);
+            }
+        }
+    }
+
+    public static ResourceResolver getResourceResolver(String user, String 
service, SlingRepository slingRepository,
+                                                       String subServiceName, 
ResourceResolverFactory resourceResolverFactory)
+            throws DistributionException {
+        ResourceResolver resourceResolver;
+
+        try {
+            Map<String, Object> authenticationInfo = new HashMap<String, 
Object>();
+
+            if (subServiceName == null && user != null) {
+                Session session = 
slingRepository.impersonateFromService(service, new SimpleCredentials(user, new 
char[0]), null);
+                
authenticationInfo.put(JcrResourceConstants.AUTHENTICATION_INFO_SESSION, 
session);
+                resourceResolver = 
resourceResolverFactory.getResourceResolver(authenticationInfo);
+            } else {
+                authenticationInfo.put(ResourceResolverFactory.SUBSERVICE, 
subServiceName);
+                resourceResolver = 
resourceResolverFactory.getServiceResourceResolver(authenticationInfo);
+            }
+
+            return resourceResolver;
+        } catch (LoginException le) {
+            throw new DistributionException(le);
+        } catch (RepositoryException re) {
+            throw new DistributionException(re);
+        }
+    }
 }

Added: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java?rev=1756791&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java
 Thu Aug 18 14:17:38 2016
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import 
org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link DistributionPackageExporterProcessor}
+ */
+public class DistributionPackageExporterProcessorTest {
+
+    @Test
+    public void testGetAllResponses() throws Exception {
+        String callingUser = "mr-who-cares";
+        String requestId = "id231";
+        long startTime = System.currentTimeMillis();
+        DistributionEventFactory eventFactory = 
mock(DistributionEventFactory.class);
+        DistributionQueueDispatchingStrategy scheduleQueueStrategy = 
mock(DistributionQueueDispatchingStrategy.class);
+        DistributionQueueProvider queueProvider = 
mock(DistributionQueueProvider.class);
+        DefaultDistributionLog log = mock(DefaultDistributionLog.class);
+        String agentName = "dummy";
+        DistributionPackageExporterProcessor exporterProcessor = new 
DistributionPackageExporterProcessor(callingUser, requestId,
+                startTime, eventFactory, scheduleQueueStrategy, queueProvider, 
log, agentName);
+
+        List<DistributionResponse> allResponses = 
exporterProcessor.getAllResponses();
+        assertNotNull(allResponses);
+        assertEquals(0, allResponses.size());
+    }
+
+    @Test
+    public void testGetPackagesCount() throws Exception {
+        String callingUser = "mr-who-cares";
+        String requestId = "id231";
+        long startTime = System.currentTimeMillis();
+        DistributionEventFactory eventFactory = 
mock(DistributionEventFactory.class);
+        DistributionQueueDispatchingStrategy scheduleQueueStrategy = 
mock(DistributionQueueDispatchingStrategy.class);
+        DistributionQueueProvider queueProvider = 
mock(DistributionQueueProvider.class);
+        DefaultDistributionLog log = mock(DefaultDistributionLog.class);
+        String agentName = "dummy";
+        DistributionPackageExporterProcessor exporterProcessor = new 
DistributionPackageExporterProcessor(callingUser, requestId,
+                startTime, eventFactory, scheduleQueueStrategy, queueProvider, 
log, agentName);
+
+        int packagesCount = exporterProcessor.getPackagesCount();
+        assertEquals(0, packagesCount);
+
+    }
+
+    @Test
+    public void testGetPackagesSize() throws Exception {
+        String callingUser = "mr-who-cares";
+        String requestId = "id231";
+        long startTime = System.currentTimeMillis();
+        DistributionEventFactory eventFactory = 
mock(DistributionEventFactory.class);
+        DistributionQueueDispatchingStrategy scheduleQueueStrategy = 
mock(DistributionQueueDispatchingStrategy.class);
+        DistributionQueueProvider queueProvider = 
mock(DistributionQueueProvider.class);
+        DefaultDistributionLog log = mock(DefaultDistributionLog.class);
+        String agentName = "dummy";
+        DistributionPackageExporterProcessor exporterProcessor = new 
DistributionPackageExporterProcessor(callingUser, requestId,
+                startTime, eventFactory, scheduleQueueStrategy, queueProvider, 
log, agentName);
+
+        long packagesSize = exporterProcessor.getPackagesSize();
+        assertEquals(0L, packagesSize);
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        String callingUser = "mr-who-cares";
+        String requestId = "id231";
+        long startTime = System.currentTimeMillis();
+        DistributionEventFactory eventFactory = 
mock(DistributionEventFactory.class);
+        DistributionQueueProvider queueProvider = 
mock(DistributionQueueProvider.class);
+        DistributionPackage distributionPackage = 
mock(DistributionPackage.class);
+        DistributionQueueDispatchingStrategy scheduleQueueStrategy = 
mock(DistributionQueueDispatchingStrategy.class);
+        // assume scheduling works
+        List<DistributionQueueItemStatus> statuses = new 
LinkedList<DistributionQueueItemStatus>();
+        DistributionQueueItemStatus qis = new 
DistributionQueueItemStatus(DistributionQueueItemState.QUEUED, "queue-1");
+        statuses.add(qis);
+        when(scheduleQueueStrategy.add(distributionPackage, 
queueProvider)).thenReturn(statuses);
+
+        DefaultDistributionLog log = mock(DefaultDistributionLog.class);
+        String agentName = "dummy";
+        DistributionPackageExporterProcessor exporterProcessor = new 
DistributionPackageExporterProcessor(callingUser, requestId,
+                startTime, eventFactory, scheduleQueueStrategy, queueProvider, 
log, agentName);
+
+        DistributionPackageInfo info = new DistributionPackageInfo("type-a", 
new HashMap<String, Object>());
+        when(distributionPackage.getInfo()).thenReturn(info);
+        exporterProcessor.process(distributionPackage);
+    }
+}
\ No newline at end of file

Propchange: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java?rev=1756791&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java
 Thu Aug 18 14:17:38 2016
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link SimpleDistributionAgentAuthenticationInfo}
+ */
+public class SimpleDistributionAgentAuthenticationInfoTest {
+
+    @Test
+    public void testInfoNotNull() throws Exception {
+        SlingRepository slingRepository = mock(SlingRepository.class);
+        String agentService = "dumb";
+        ResourceResolverFactory resourceResolverFactory = 
mock(ResourceResolverFactory.class);
+        String subServiceName = "ssn";
+        SimpleDistributionAgentAuthenticationInfo authenticationInfo = new 
SimpleDistributionAgentAuthenticationInfo(slingRepository,
+                agentService, resourceResolverFactory, subServiceName);
+        assertNotNull(authenticationInfo.getAgentService());
+        assertNotNull(authenticationInfo.getResourceResolverFactory());
+        assertNotNull(authenticationInfo.getSlingRepository());
+        assertNotNull(authenticationInfo.getSubServiceName());
+    }
+
+}
\ No newline at end of file

Propchange: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java?rev=1756791&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java
 Thu Aug 18 14:17:38 2016
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import java.util.HashMap;
+
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageExporter;
+import org.apache.sling.distribution.packaging.DistributionPackageImporter;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import 
org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link SimpleDistributionAgentQueueProcessor}
+ */
+public class SimpleDistributionAgentQueueProcessorTest {
+
+    @Test
+    public void testProcess() throws Exception {
+        DistributionPackageExporter packageExporter = 
mock(DistributionPackageExporter.class);
+        DistributionPackageImporter packageImporter = 
mock(DistributionPackageImporter.class);
+        int retryAttempts = 3;
+        DefaultDistributionLog log = mock(DefaultDistributionLog.class);
+        DistributionQueueProvider queueProvider = 
mock(DistributionQueueProvider.class);
+        DistributionEventFactory eventFactory = 
mock(DistributionEventFactory.class);
+        SimpleDistributionAgentAuthenticationInfo authenticationInfo = 
mock(SimpleDistributionAgentAuthenticationInfo.class);
+        String agentName = "dummy-a";
+        SimpleDistributionAgentQueueProcessor queueProcessor = new 
SimpleDistributionAgentQueueProcessor(packageExporter,
+                packageImporter, retryAttempts, null, log, queueProvider, 
eventFactory, authenticationInfo, agentName);
+
+        String id = "123-456";
+        DistributionQueueItem item = new DistributionQueueItem("pckg-123", new 
HashMap<String, Object>());
+        String queueName = "queue-1";
+        DistributionQueueItemStatus status = new 
DistributionQueueItemStatus(DistributionQueueItemState.QUEUED, queueName);
+        DistributionQueueEntry entry = new DistributionQueueEntry(id, item, 
status);
+        queueProcessor.process(queueName, entry);
+    }
+}
\ No newline at end of file

Propchange: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java?rev=1756791&r1=1756790&r2=1756791&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
 Thu Aug 18 14:17:38 2016
@@ -36,6 +36,9 @@ import java.util.Random;
 import 
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
 import org.junit.Test;
 
+/**
+ * Tests for {@link 
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream}
+ */
 public class FileBackedMemoryOutputStreamTest {
 
     @Test


Reply via email to