Author: mpetria
Date: Tue Jul 14 13:15:17 2015
New Revision: 1690924

URL: http://svn.apache.org/r1690924
Log:
SLING-4651: delete remote package only after it is fetched

Added:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java
Modified:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java?rev=1690924&r1=1690923&r2=1690924&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
 Tue Jul 14 13:15:17 2015
@@ -34,6 +34,7 @@ import org.apache.sling.distribution.Dis
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.resources.DistributionResourceTypes;
+import org.apache.sling.distribution.transport.impl.HttpTransportUtils;
 import org.apache.sling.distribution.util.RequestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,10 +47,40 @@ public class DistributionPackageExporter
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+
+
     @Override
     protected void doPost(SlingHttpServletRequest request, 
SlingHttpServletResponse response)
             throws ServletException, IOException {
 
+        String operation = request.getParameter("operation");
+
+
+
+        try {
+            if ("delete".equals(operation)) {
+
+                deletePackage(request, response);
+
+            } if ("fetch".equals(operation)) {
+
+                exportOnePackage(request, response, false);
+
+            } else {
+
+                exportOnePackage(request, response, true);
+            }
+
+        } catch (Throwable t) {
+            response.setStatus(503);
+            log.error("error while exporting package", t);
+        }
+    }
+
+
+    protected void exportOnePackage(SlingHttpServletRequest request, 
SlingHttpServletResponse response, boolean delete)
+            throws ServletException, IOException {
+
         DistributionPackageExporter distributionPackageExporter = request
                 .getResource()
                 .adaptTo(DistributionPackageExporter.class);
@@ -77,14 +108,20 @@ public class DistributionPackageExporter
                         InputStream inputStream = null;
                         int bytesCopied = -1;
                         try {
+                            
response.addHeader(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID, 
distributionPackage.getId());
+
                             inputStream = 
distributionPackage.createInputStream();
+
                             bytesCopied = IOUtils.copy(inputStream, 
response.getOutputStream());
                         } finally {
                             IOUtils.closeQuietly(inputStream);
                         }
 
-                        // delete the package permanently
-                        distributionPackage.delete();
+                        if (delete) {
+                            // delete the package permanently
+                            distributionPackage.delete();
+                        }
+
 
                         // everything ok
                         response.setStatus(200);
@@ -107,4 +144,30 @@ public class DistributionPackageExporter
         }
     }
 
+
+
+    void deletePackage(final SlingHttpServletRequest request, final 
SlingHttpServletResponse response) {
+        DistributionPackageExporter distributionPackageExporter = request
+                .getResource()
+                .adaptTo(DistributionPackageExporter.class);
+
+        ResourceResolver resourceResolver = request.getResourceResolver();
+
+
+        String id = request.getParameter("id");
+
+        DistributionPackage distributionPackage = 
distributionPackageExporter.getPackage(resourceResolver, id);
+
+        if (distributionPackage != null) {
+            distributionPackage.delete();
+
+            response.setStatus(200);
+        } else {
+            response.setStatus(204);
+            log.debug("nothing to delete {}", id);
+        }
+
+
+    }
+
 }

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java?rev=1690924&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java
 Tue Jul 14 13:15:17 2015
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.transport.impl;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.fluent.Executor;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class HttpTransportUtils {
+
+    static final Logger log = 
LoggerFactory.getLogger(HttpTransportUtils.class);
+
+    public final static String HEADER_DISTRIBUTION_ORIGINAL_ID = 
"X-Distribution-OriginalId";
+
+
+    public static InputStream fetchNextPackage(Executor executor, URI 
distributionURI, Map<String, String> headers) throws URISyntaxException, 
IOException {
+
+        // always clear the result headers map
+        headers.clear();
+
+
+        URI fetchUri = getFetchUri(distributionURI);
+        Request fetchReq = Request.Post(fetchUri).useExpectContinue();
+        HttpResponse httpResponse = 
executor.execute(fetchReq).returnResponse();
+
+        if (httpResponse.getStatusLine().getStatusCode() != 200) {
+            return null;
+        }
+
+        HttpEntity entity = httpResponse.getEntity();
+
+
+        Header header = 
httpResponse.getFirstHeader(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID);
+        if (header != null && header.getValue() != null) {
+            String originalId = header.getValue();
+            headers.put(HEADER_DISTRIBUTION_ORIGINAL_ID, originalId);
+        } else {
+            log.warn("cannot retrieve original id header");
+        }
+
+        return entity.getContent();
+    }
+
+
+
+    public static boolean deletePackage(Executor executor, URI 
distributionURI, String remotePackageId) throws URISyntaxException, IOException 
{
+
+        URI deleteUri = getDeleteUri(distributionURI, remotePackageId);
+        Request deleteReq = Request.Post(deleteUri).useExpectContinue();
+        HttpResponse httpResponse = 
executor.execute(deleteReq).returnResponse();
+
+        return httpResponse.getStatusLine().getStatusCode() == 200;
+    }
+
+
+    public static URI getFetchUri(URI uri) throws URISyntaxException {
+        URIBuilder uriBuilder = new URIBuilder(uri);
+        uriBuilder.addParameter("operation", "fetch");
+
+        return uriBuilder.build();
+    }
+
+
+
+    public static URI getDeleteUri(URI uri, String id) throws 
URISyntaxException {
+        URIBuilder uriBuilder = new URIBuilder(uri);
+        uriBuilder.addParameter("operation", "delete");
+        uriBuilder.addParameter("id", id);
+
+        return uriBuilder.build();
+    }
+
+}

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java?rev=1690924&r1=1690923&r2=1690924&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
 Tue Jul 14 13:15:17 2015
@@ -23,10 +23,12 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
@@ -42,6 +44,7 @@ import org.apache.sling.distribution.Dis
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
+import org.apache.sling.distribution.servlet.ServletJsonUtils;
 import 
org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
 import org.apache.sling.distribution.transport.core.DistributionTransport;
 import 
org.apache.sling.distribution.transport.core.DistributionTransportException;
@@ -145,29 +148,30 @@ public class SimpleHttpDistributionTrans
             // TODO : add queue parameter
 
             // continuously requests package streams as long as type header is 
received with the response (meaning there's a package of a certain type)
-            HttpResponse httpResponse;
+            InputStream inputStream;
+            final Map<String, String> headers = new HashMap<String, String>();
 
             int pulls = 0;
             int maxNumberOfPackages = 
DistributionRequestType.PULL.equals(distributionRequest.getRequestType()) ? 
maxPullItems : 1;
-            while (pulls < maxNumberOfPackages
-                    && (httpResponse = 
executor.execute(req).returnResponse()).getStatusLine().getStatusCode() == 200) 
{
-                HttpEntity entity = httpResponse.getEntity();
-                if (entity != null) {
-                    final DistributionPackage responsePackage = 
packageBuilder.readPackage(resourceResolver, entity.getContent());
-                    if (responsePackage != null) {
-                        responsePackage.getInfo().setOrigin(distributionURI);
-                        log.debug("pulled package no {} with info {}", pulls, 
responsePackage.getInfo());
-
-                        result.add(responsePackage);
-                    } else {
-                        log.warn("responsePackage is null");
-                    }
 
-                    pulls++;
+            while (pulls < maxNumberOfPackages && (inputStream = 
HttpTransportUtils.fetchNextPackage(executor, distributionURI, headers)) != 
null) {
+
+                final DistributionPackage responsePackage = 
packageBuilder.readPackage(resourceResolver, inputStream);
+                if (responsePackage != null) {
+                    responsePackage.getInfo().setOrigin(distributionURI);
+                    log.debug("pulled package no {} with info {}", pulls, 
responsePackage.getInfo());
+
+                    result.add(responsePackage);
+
+                    String originalId = 
headers.get(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID);
+
+                    HttpTransportUtils.deletePackage(executor, 
distributionURI, originalId);
+
                 } else {
-                    log.info("no entity available");
-                    break;
+                    log.warn("responsePackage is null");
                 }
+
+                pulls++;
             }
 
         } catch (HttpHostConnectException e) {


Reply via email to