Author: mpetria
Date: Mon Apr 11 09:17:33 2016
New Revision: 1738558

URL: http://svn.apache.org/viewvc?rev=1738558&view=rev
Log:
SLING-5577: refactor content serialization to be independent of persistence

Added:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/DistributionContentSerializer.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultContentSerializer.java
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtilsTest.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroContentSerializer.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionContentSerializerFactory.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/kryo/KryoContentSerializer.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/kryo/KryoDistributionContentSerializerFactory.java
Removed:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/JcrVaultDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/JcrVaultDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/serialization/impl/vlt/
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionPackageBuilderFactory.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/kryo/KryoDistributionPackageBuilderFactory.java
    
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/kryo/KryoResourceDistributionPackageBuilder.java
    sling/trunk/contrib/extensions/distribution/extensions/src/test/java/
Modified:
    sling/trunk/contrib/extensions/distribution/core/pom.xml
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/AbstractDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
    
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/serialization/impl/SimpleDistributionPackageTest.java
    sling/trunk/contrib/extensions/distribution/extensions/pom.xml
    
sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/install.publish/cache-flush/org.apache.sling.distribution.agent.impl.SimpleDistributionAgentFactory-cache-flush.json

Modified: sling/trunk/contrib/extensions/distribution/core/pom.xml
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/pom.xml?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/pom.xml (original)
+++ sling/trunk/contrib/extensions/distribution/core/pom.xml Mon Apr 11 
09:17:33 2016
@@ -71,7 +71,7 @@
                             org.apache.sling.distribution.util
                         </Export-Package>
                         <Import-Package>
-                            
org.apache.sling.event.jobs;version="[1.5.0,3.0)",org.apache.sling.event.jobs.consumer;version="[1.1,2)",org.apache.sling.jcr.api;version="[2.2.0,2.4.0)",*
+                            
org.apache.sling.event.jobs;version="[1.5.0,3.0)",org.apache.sling.event.jobs.consumer;version="[1.1,2)",org.apache.sling.jcr.api;version="2.2.0",*
                         </Import-Package>
                         <Embed-Dependency>httpasyncclient</Embed-Dependency>
                     </instructions>
@@ -110,7 +110,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.api</artifactId>
-            <version>2.5.0</version>
+            <version>2.7.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
 Mon Apr 11 09:17:33 2016
@@ -433,14 +433,12 @@ public class SimpleDistributionAgent imp
             distributionPackage = 
distributionPackageExporter.getPackage(agentResourceResolver, 
queueItem.getId());
 
             if (distributionPackage != null) {
-                final long getTime = System.currentTimeMillis();
+                final long packageSize = distributionPackage.getSize();
+                
DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), 
queueEntry);
 
                 final DistributionRequestType requestType = 
distributionPackage.getInfo().getRequestType();
-                final long packageSize = distributionPackage.getSize();
                 final String[] paths = 
distributionPackage.getInfo().getPaths();
 
-                
DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), 
queueEntry);
-
                 try {
                     
distributionPackageImporter.importPackage(agentResourceResolver, 
distributionPackage);
                     
generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, 
distributionPackage);
@@ -668,7 +666,8 @@ public class SimpleDistributionAgent imp
 
             final long endTime = System.currentTimeMillis();
 
-            log.debug("PACKAGE-QUEUED {}: packageId={}, queueTime={}ms, 
responses={}", requestId, distributionPackage.getId(), endTime - startTime, 
responses.size());
+            log.debug("PACKAGE-QUEUED {}: packageId={}, paths={}, 
queueTime={}ms, responses={}", requestId, distributionPackage.getId(),
+                    distributionPackage.getInfo().getPaths(), endTime - 
startTime, responses.size());
         }
     }
 

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
 Mon Apr 11 09:17:33 2016
@@ -19,6 +19,11 @@
 
 package org.apache.sling.distribution.packaging.impl;
 
+import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.serialization.DistributionPackage;
@@ -28,8 +33,23 @@ import org.apache.sling.distribution.que
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jcr.Binary;
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.RepositoryException;
+import javax.jcr.nodetype.NodeType;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.SequenceInputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Package related utility methods
@@ -38,6 +58,12 @@ public class DistributionPackageUtils {
 
     private static final Logger log = 
LoggerFactory.getLogger(DistributionPackageUtils.class);
 
+    private final static String META_START = "DSTRPACKMETA";
+
+    private static Object repolock = new Object();
+
+
+
     /**
      * distribution package origin queue
      */
@@ -174,4 +200,90 @@ public class DistributionPackageUtils {
         return deepPaths.toArray(new String[deepPaths.size()]);
     }
 
+    public static InputStream createStreamWithHeader(DistributionPackage 
distributionPackage) throws IOException {
+
+        DistributionPackageInfo packageInfo = distributionPackage.getInfo();
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        Map<String, Object> headerInfo = new HashMap<String, Object>();
+        headerInfo.put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, 
packageInfo.getRequestType());
+        headerInfo.put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, 
packageInfo.getPaths());
+        writeInfo(outputStream, headerInfo);
+
+        InputStream headerStream = new 
ByteArrayInputStream(outputStream.toByteArray());
+        InputStream bodyStream = distributionPackage.createInputStream();
+        return new SequenceInputStream(headerStream, bodyStream);
+    }
+
+
+    public static void readInfo(InputStream inputStream, Map<String, Object> 
info) {
+
+        try {
+            int size = META_START.getBytes("UTF-8").length;
+            inputStream.mark(size);
+            byte[] buffer = new byte[size];
+            int bytesRead = inputStream.read(buffer, 0, size);
+            String s = new String(buffer, "UTF-8");
+
+            if (bytesRead > 0 && buffer[0] > 0 && META_START.equals(s)) {
+                ObjectInputStream stream = new ObjectInputStream(inputStream);
+                HashMap<String, Object> map = (HashMap<String, Object>) 
stream.readObject();
+                info.putAll(map);
+            } else {
+                inputStream.reset();
+            }
+        } catch (IOException e) {
+            log.error("Cannot read stream info", e);
+        } catch (ClassNotFoundException e) {
+            log.error("Cannot read stream info", e);
+        }
+
+    }
+
+    public static void writeInfo(OutputStream outputStream, Map<String, 
Object> info) {
+
+        HashMap<String, Object> map = new HashMap<String, Object>(info);
+
+
+        try {
+            outputStream.write(META_START.getBytes("UTF-8"));
+
+            ObjectOutputStream stream = new ObjectOutputStream(outputStream);
+
+            stream.writeObject(map);
+
+        } catch (IOException e) {
+            log.error("Cannot read stream info", e);
+        }
+    }
+
+
+    public static Resource getPackagesRoot(ResourceResolver resourceResolver, 
String packagesRootPath) throws PersistenceException {
+        Resource packagesRoot = resourceResolver.getResource(packagesRootPath);
+
+        if (packagesRoot != null) {
+            return packagesRoot;
+        }
+
+        synchronized (repolock) {
+            resourceResolver.refresh();
+            packagesRoot = ResourceUtil.getOrCreateResource(resourceResolver, 
packagesRootPath, "sling:Folder", "sling:Folder", true);
+        }
+
+        return packagesRoot;
+    }
+
+    public static InputStream getStream(Resource resource) throws 
RepositoryException {
+        Node parent = resource.adaptTo(Node.class);
+        InputStream in = 
parent.getProperty("bin/jcr:content/jcr:data").getBinary().getStream();
+        return in;
+    }
+
+    public static void uploadStream(Resource resource, InputStream stream) 
throws RepositoryException {
+        Node parent = resource.adaptTo(Node.class);
+        Node file = JcrUtils.getOrAddNode(parent, "bin", NodeType.NT_FILE);
+        Node content = JcrUtils.getOrAddNode(file, Node.JCR_CONTENT, 
NodeType.NT_RESOURCE);
+        Binary binary = 
parent.getSession().getValueFactory().createBinary(stream);
+        content.setProperty(Property.JCR_DATA, binary);
+    }
+
 }

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java
 Mon Apr 11 09:17:33 2016
@@ -88,6 +88,7 @@ public class AgentDistributionPackageExp
 
                 if (packageBuilder != null) {
                     distributionPackage = 
packageBuilder.getPackage(resourceResolver, queueItem.getId());
+                    distributionPackage.getInfo().putAll(info);
 
                     log.debug("item {} fetched from the queue", info);
                     if (distributionPackage != null) {
@@ -113,7 +114,8 @@ public class AgentDistributionPackageExp
             log.debug("getting package from queue {}", queueName);
 
             DistributionQueue queue = agent.getQueue(queueName);
-            DistributionQueueEntry entry = queue.getHead();
+            String itemId = distributionPackageId;
+            DistributionQueueEntry entry = queue.getItem(itemId);
             DistributionPackage distributionPackage;
 
             if (entry != null) {
@@ -124,6 +126,8 @@ public class AgentDistributionPackageExp
 
                 if (packageBuilder != null) {
                     distributionPackage = 
packageBuilder.getPackage(resourceResolver, queueItem.getId());
+                    distributionPackage.getInfo().putAll(info);
+
                     log.debug("item {} fetched from the queue", info);
                     if (distributionPackage != null) {
                         return new 
AgentDistributionPackage(distributionPackage, queue);

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/DistributionContentSerializer.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/DistributionContentSerializer.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/DistributionContentSerializer.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/DistributionContentSerializer.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+
+/**
+ * A content serializer used to extract and import distribution packages.
+ */
+public interface DistributionContentSerializer {
+
+    /**
+     * extracts the resources identified by the given request into the given 
outputStream
+     * @param resourceResolver the user resource resolver
+     * @param request a distribution request
+     * @param outputStream the output stream
+     * @throws DistributionException if extraction fails for some reason
+     */
+    void exportToStream(ResourceResolver resourceResolver, DistributionRequest 
request, OutputStream outputStream) throws DistributionException;
+
+    /**
+     * imports the given stream
+     * @param resourceResolver the user resource resolver
+     * @param stream the stream to import
+     * @throws DistributionException if importing fails for some reason
+     */
+    void importFromStream(ResourceResolver resourceResolver, InputStream 
stream) throws DistributionException;
+
+    /**
+     * retrieve the name of this content serializer
+     * @return the name of this content serializer
+     */
+    String getName();
+}

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/AbstractDistributionPackageBuilder.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/AbstractDistributionPackageBuilder.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/AbstractDistributionPackageBuilder.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/AbstractDistributionPackageBuilder.java
 Mon Apr 11 09:17:33 2016
@@ -24,6 +24,8 @@ import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import java.io.BufferedInputStream;
 import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
@@ -85,6 +87,9 @@ public abstract class AbstractDistributi
         if (!stream.markSupported()) {
             stream = new BufferedInputStream(stream);
         }
+        Map<String, Object> headerInfo = new HashMap<String, Object>();
+        DistributionPackageUtils.readInfo(stream, headerInfo);
+
         DistributionPackage distributionPackage = 
SimpleDistributionPackage.fromStream(stream, type);
 
         stream.mark(-1);
@@ -93,6 +98,8 @@ public abstract class AbstractDistributi
         if (distributionPackage == null) {
             distributionPackage = readPackageInternal(resourceResolver, 
stream);
         }
+
+        distributionPackage.getInfo().putAll(headerInfo);
         return distributionPackage;
     }
 

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import 
org.apache.sling.distribution.component.impl.DistributionComponentConstants;
+import org.apache.sling.distribution.component.impl.SettingsUtils;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+
+/**
+ * A factory for package builders
+ */
+@Component(metatype = true,
+        label = "Apache Sling Distribution Packaging - Package Builder 
Factory",
+        description = "OSGi configuration for package builders",
+        configurationFactory = true,
+        specVersion = "1.1",
+        policy = ConfigurationPolicy.REQUIRE
+)
+@Service(DistributionPackageBuilder.class)
+@Property(name = "webconsole.configurationFactory.nameHint", value = "Builder 
name: {name}")
+public class DistributionPackageBuilderFactory implements 
DistributionPackageBuilder {
+
+    /**
+     * name of this package builder.
+     */
+    @Property(label = "Name", description = "The name of the package builder.")
+    private static final String NAME = DistributionComponentConstants.PN_NAME;
+
+    /**
+     * type of this package builder.
+     */
+    @Property(options = {
+            @PropertyOption(name = "resource",
+                    value = "resource packages"
+            ),
+            @PropertyOption(name = "file",
+                    value = "file packages"
+            )},
+            value = "resource", label = "type", description = "The persistence 
type used by this package builder")
+    private static final String PERSISTENCE = 
DistributionComponentConstants.PN_TYPE;
+
+    @Property(name = "format.target", label = "Content Serializer", 
description = "The target reference for the DistributionSerializationFormat 
used to (de)serialize packages, " +
+            "e.g. use target=(name=...) to bind to services by name.", value = 
SettingsUtils.COMPONENT_NAME_DEFAULT)
+    @Reference(name = "format")
+    private DistributionContentSerializer contentSerializer;
+
+
+    /**
+     * Temp file folder
+     */
+    @Property(label = "Temp Filesystem Folder", description = "The filesystem 
folder where the temporary files should be saved.")
+    private static final String TEMP_FS_FOLDER = "tempFsFolder";
+
+    private DistributionPackageBuilder packageBuilder;
+
+    @Activate
+    public void activate(Map<String, Object> config) {
+
+        String persistenceType = 
PropertiesUtil.toString(config.get(PERSISTENCE), null);
+        String tempFsFolder = 
SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(TEMP_FS_FOLDER),
 null));
+
+
+        if ("file".equals(persistenceType)) {
+            packageBuilder = new 
FileDistributionPackageBuilder(contentSerializer.getName(), contentSerializer, 
tempFsFolder);
+        } else {
+            packageBuilder = new 
ResourceDistributionPackageBuilder(contentSerializer.getName(), 
contentSerializer, tempFsFolder);
+        }
+
+
+    }
+
+    public String getType() {
+        return packageBuilder.getType();
+    }
+
+    @Nonnull
+    public DistributionPackage createPackage(@Nonnull ResourceResolver 
resourceResolver, @Nonnull DistributionRequest request) throws 
DistributionException {
+        return packageBuilder.createPackage(resourceResolver, request);
+    }
+
+    @Nonnull
+    public DistributionPackage readPackage(@Nonnull ResourceResolver 
resourceResolver, @Nonnull InputStream stream) throws DistributionException {
+        return packageBuilder.readPackage(resourceResolver, stream);
+    }
+
+    @CheckForNull
+    public DistributionPackage getPackage(@Nonnull ResourceResolver 
resourceResolver, @Nonnull String id) throws DistributionException {
+        return packageBuilder.getPackage(resourceResolver, id);
+    }
+
+    public boolean installPackage(@Nonnull ResourceResolver resourceResolver, 
@Nonnull DistributionPackage distributionPackage) throws DistributionException {
+        return packageBuilder.installPackage(resourceResolver, 
distributionPackage);
+    }
+}

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackage.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackage.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackage.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl;
+
+import javax.annotation.Nonnull;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.serialization.DistributionPackageInfo;
+
+/**
+ * A {@link DistributionPackage} based on a {@link File}.
+ */
+public class FileDistributionPackage implements DistributionPackage {
+
+    private final File file;
+    private final String type;
+    private final DistributionPackageInfo info;
+
+    public FileDistributionPackage(@Nonnull File file, @Nonnull String type) {
+        this.info = new DistributionPackageInfo(type);
+        this.file = file;
+        this.type = type;
+
+        this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, 
DistributionRequestType.ADD);
+    }
+
+    @Nonnull
+    public String getId() {
+        return file.getAbsolutePath();
+    }
+
+    @Nonnull
+    public String getType() {
+        return type;
+    }
+
+    @Nonnull
+    public InputStream createInputStream() throws IOException {
+        return new PackageInputStream(file);
+    }
+
+    @Override
+    public long getSize() {
+        return file.length();
+    }
+
+    public void close() {
+        // do nothing
+    }
+
+    public void delete() {
+        assert file.delete();
+    }
+
+    @Nonnull
+    @Override
+    public DistributionPackageInfo getInfo() {
+        return info;
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+
+    public class PackageInputStream extends BufferedInputStream {
+        private final File file;
+
+        public PackageInputStream(File file) throws IOException {
+            super(FileUtils.openInputStream(file));
+
+            this.file = file;
+        }
+
+
+        public File getFile() {
+            return file;
+        }
+    }
+
+}

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackageBuilder.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackageBuilder.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackageBuilder.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/FileDistributionPackageBuilder.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.serialization.impl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.serialization.impl.vlt.VltUtils;
+
+import javax.annotation.Nonnull;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+
+public class FileDistributionPackageBuilder extends 
AbstractDistributionPackageBuilder  {
+
+    private final File tempDirectory;
+    private final DistributionContentSerializer distributionContentSerializer;
+
+    public FileDistributionPackageBuilder(String type, 
DistributionContentSerializer distributionContentSerializer, String 
tempFilesFolder) {
+        super(type);
+        this.distributionContentSerializer = distributionContentSerializer;
+        this.tempDirectory = VltUtils.getTempFolder(tempFilesFolder);
+
+    }
+
+    @Override
+    protected DistributionPackage createPackageForAdd(@Nonnull 
ResourceResolver resourceResolver, @Nonnull DistributionRequest request) throws 
DistributionException {
+        DistributionPackage distributionPackage;
+        OutputStream outputStream = null;
+
+        try {
+            File file = File.createTempFile("distrpck-create-" + 
System.nanoTime(),  "." + getType(), tempDirectory);
+            outputStream = new BufferedOutputStream(new 
FileOutputStream(file));
+
+            distributionContentSerializer.exportToStream(resourceResolver, 
request, outputStream);
+            outputStream.flush();
+
+            distributionPackage = new FileDistributionPackage(file, getType());
+        } catch (IOException e) {
+            throw new DistributionException(e);
+        } finally {
+            IOUtils.closeQuietly(outputStream);
+        }
+
+        return distributionPackage;
+    }
+
+    @Override
+    protected DistributionPackage readPackageInternal(@Nonnull 
ResourceResolver resourceResolver, @Nonnull InputStream stream)
+            throws DistributionException {
+        DistributionPackage distributionPackage;
+
+        OutputStream outputStream = null;
+        try {
+            File file = File.createTempFile("distrpck-read-" + 
System.nanoTime(), "." + getType(), tempDirectory);
+            outputStream = new BufferedOutputStream(new 
FileOutputStream(file));
+
+            IOUtils.copy(stream, outputStream);
+            outputStream.flush();
+
+            distributionPackage = new FileDistributionPackage(file, getType());
+        } catch (Exception e) {
+            throw new DistributionException(e);
+        } finally {
+            IOUtils.closeQuietly(outputStream);
+        }
+
+        return distributionPackage;
+    }
+
+    @Override
+    protected boolean installPackageInternal(@Nonnull ResourceResolver 
resourceResolver, @Nonnull DistributionPackage distributionPackage)
+            throws DistributionException {
+        InputStream inputStream = null;
+        try {
+            inputStream = distributionPackage.createInputStream();
+            distributionContentSerializer.importFromStream(resourceResolver, 
inputStream);
+            return true;
+        } catch (IOException e) {
+            throw new DistributionException(e);
+        } finally {
+            IOUtils.closeQuietly(inputStream);
+        }
+    }
+
+    @Override
+    protected DistributionPackage getPackageInternal(@Nonnull ResourceResolver 
resourceResolver, @Nonnull String id) {
+        return new FileDistributionPackage(new File(id), getType());
+    }
+}

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackage.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackage.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackage.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl;
+
+import javax.annotation.Nonnull;
+import javax.jcr.RepositoryException;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.serialization.DistributionPackageInfo;
+
+/**
+ * {@link Resource} based {@link DistributionPackage}
+ */
+public class ResourceDistributionPackage implements DistributionPackage {
+
+    private final String type;
+    private final Resource resource;
+    private final ResourceResolver resourceResolver;
+    private final DistributionPackageInfo info;
+
+    ResourceDistributionPackage(Resource resource, String type, 
ResourceResolver resourceResolver) {
+        this.info = new DistributionPackageInfo(type);
+        this.resourceResolver = resourceResolver;
+        this.type = type;
+        ValueMap valueMap = resource.getValueMap();
+        assert type.equals(valueMap.get("type")) : "wrong resource type";
+        this.resource = resource;
+
+        this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, 
DistributionRequestType.ADD);
+    }
+
+    @Nonnull
+    @Override
+    public String getId() {
+        return resource.getPath();
+    }
+
+    @Nonnull
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Nonnull
+    @Override
+    public InputStream createInputStream() throws IOException {
+        try {
+            return new 
BufferedInputStream(DistributionPackageUtils.getStream(resource));
+        } catch (RepositoryException e) {
+            throw new IOException("Cannot create stream", e);
+        }
+    }
+
+    @Override
+    public long getSize() {
+        Object size = resource.getValueMap().get("size");
+        return size == null ? -1 : Long.parseLong(size.toString());
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    @Override
+    public void delete() {
+        try {
+            resourceResolver.delete(resource);
+            resourceResolver.commit();
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Nonnull
+    @Override
+    public DistributionPackageInfo getInfo() {
+        return info;
+    }
+}
\ No newline at end of file

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackageBuilder.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackageBuilder.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackageBuilder.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceDistributionPackageBuilder.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.serialization.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.serialization.impl.vlt.VltUtils;
+
+import javax.annotation.Nonnull;
+import javax.jcr.RepositoryException;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+public class ResourceDistributionPackageBuilder extends 
AbstractDistributionPackageBuilder {
+    private static final String PREFIX_PATH = 
"/var/sling/distribution/packages/";
+
+    private final String packagesPath;
+    private final File tempDirectory;
+    private final DistributionContentSerializer distributionContentSerializer;
+
+    public ResourceDistributionPackageBuilder(String type, 
DistributionContentSerializer distributionContentSerializer, String 
tempFilesFolder) {
+        super(type);
+        this.distributionContentSerializer = distributionContentSerializer;
+        this.packagesPath = PREFIX_PATH + type + "/data";
+        this.tempDirectory = VltUtils.getTempFolder(tempFilesFolder);
+
+    }
+
+    @Override
+    protected DistributionPackage createPackageForAdd(@Nonnull 
ResourceResolver resourceResolver, @Nonnull DistributionRequest request) throws 
DistributionException {
+        DistributionPackage distributionPackage;
+        // TODO : write to file if size > threshold
+
+        File file = null;
+        try {
+            file = File.createTempFile("distrpck-create-" + System.nanoTime(), 
 "." + getType(), tempDirectory);
+
+            OutputStream outputStream = null;
+
+            try {
+                outputStream = new BufferedOutputStream(new 
FileOutputStream(file));
+                distributionContentSerializer.exportToStream(resourceResolver, 
request, outputStream);
+                outputStream.flush();
+            } finally {
+                IOUtils.closeQuietly(outputStream);
+            }
+
+
+            Resource packagesRoot = 
DistributionPackageUtils.getPackagesRoot(resourceResolver, packagesPath);
+
+            InputStream inputStream = null;
+            Resource packageResource = null;
+
+            try {
+                inputStream = new BufferedInputStream(new 
FileInputStream(file));
+
+                packageResource = uploadStream(packagesRoot, inputStream, 
file.length());
+            } finally {
+                IOUtils.closeQuietly(inputStream);
+            }
+
+            distributionPackage = new 
ResourceDistributionPackage(packageResource, getType(), resourceResolver);
+        } catch (IOException e) {
+            throw new DistributionException(e);
+        } finally {
+            FileUtils.deleteQuietly(file);
+        }
+        return distributionPackage;
+    }
+
+    @Override
+    protected DistributionPackage readPackageInternal(@Nonnull 
ResourceResolver resourceResolver, @Nonnull InputStream inputStream)
+            throws DistributionException {
+        try {
+            Resource packagesRoot = 
DistributionPackageUtils.getPackagesRoot(resourceResolver, packagesPath);
+
+            Resource packageResource = uploadStream(packagesRoot, inputStream, 
-1);
+            return new ResourceDistributionPackage(packageResource, getType(), 
resourceResolver);
+        } catch (PersistenceException e) {
+            throw new DistributionException(e);
+        }
+    }
+
+    @Override
+    protected boolean installPackageInternal(@Nonnull ResourceResolver 
resourceResolver, @Nonnull DistributionPackage distributionPackage)
+            throws DistributionException {
+        InputStream  inputStream = null;
+        try {
+            inputStream = distributionPackage.createInputStream();
+            distributionContentSerializer.importFromStream(resourceResolver, 
inputStream);
+            return true;
+        } catch (IOException e) {
+            throw new DistributionException(e);
+        } finally {
+            IOUtils.closeQuietly(inputStream);
+        }
+    }
+
+    @Override
+    protected DistributionPackage getPackageInternal(@Nonnull ResourceResolver 
resourceResolver, @Nonnull String id) {
+        return new 
ResourceDistributionPackage(resourceResolver.getResource(id), getType(), 
resourceResolver);
+    }
+
+
+    Resource uploadStream(Resource parent, InputStream stream, long size) 
throws PersistenceException {
+        String name = "dstrpck-" + System.currentTimeMillis() + "-" + 
UUID.randomUUID().toString();
+        Map<String, Object> props = new HashMap<String, Object>();
+        props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, "sling:Folder");
+        props.put("type", getType());
+
+        if (size != -1) {
+            props.put("size", size);
+        }
+
+        Resource resource = parent.getResourceResolver().create(parent, name, 
props);
+        try {
+            DistributionPackageUtils.uploadStream(resource, stream);
+        } catch (RepositoryException e) {
+            throw new PersistenceException("cannot upload stream", e);
+
+        }
+
+        parent.getResourceResolver().commit();
+
+        return resource;
+    }
+}

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultContentSerializer.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultContentSerializer.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultContentSerializer.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/FileVaultContentSerializer.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl.vlt;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.vault.fs.api.ImportMode;
+import org.apache.jackrabbit.vault.fs.api.WorkspaceFilter;
+import org.apache.jackrabbit.vault.fs.io.AccessControlHandling;
+import org.apache.jackrabbit.vault.fs.io.ImportOptions;
+import org.apache.jackrabbit.vault.packaging.ExportOptions;
+import org.apache.jackrabbit.vault.packaging.PackageManager;
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.jackrabbit.vault.packaging.VaultPackage;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import 
org.apache.sling.distribution.serialization.impl.FileDistributionPackage;
+import org.apache.sling.distribution.util.DistributionJcrUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DistributionContentSerializer} based on Apache Jackrabbit FileVault
+ */
+public class FileVaultContentSerializer implements 
DistributionContentSerializer {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    static final String TYPE = "filevault";
+    private static final String VERSION = "0.0.1";
+    private static final String PACKAGE_GROUP = "sling/distribution";
+
+    private final Packaging packaging;
+    private final ImportMode importMode;
+    private final AccessControlHandling aclHandling;
+    private final String[] packageRoots;
+    private final int autosaveThreshold;
+    private final TreeMap<String, List<String>> filters;
+    private final boolean useBinaryReferences;
+    private final String name;
+
+    public FileVaultContentSerializer(String name, Packaging packaging, 
ImportMode importMode, AccessControlHandling aclHandling, String[] packageRoots,
+                                      String[] filters, boolean 
useBinaryReferences, int autosaveThreshold) {
+        this.name = name;
+        this.packaging = packaging;
+        this.importMode = importMode;
+        this.aclHandling = aclHandling;
+        this.packageRoots = packageRoots;
+        this.autosaveThreshold = autosaveThreshold;
+        this.filters = VltUtils.parseFilters(filters);;
+        this.useBinaryReferences = useBinaryReferences;
+    }
+
+    @Override
+    public void exportToStream(ResourceResolver resourceResolver, 
DistributionRequest request, OutputStream outputStream) throws 
DistributionException {
+        Session session = null;
+        try {
+            session = getSession(resourceResolver);
+            String packageGroup = PACKAGE_GROUP;
+            String packageName = TYPE + "_" + System.currentTimeMillis() + "_" 
+ UUID.randomUUID();
+
+            WorkspaceFilter filter = VltUtils.createFilter(request, filters);
+            ExportOptions opts = VltUtils.getExportOptions(filter, 
packageRoots, packageGroup, packageName, VERSION, useBinaryReferences);
+
+            log.debug("assembling package {} user {}", packageGroup + '/' + 
packageName + "-" + VERSION, resourceResolver.getUserID());
+
+            packaging.getPackageManager().assemble(session, opts, 
outputStream);
+        } catch (Exception e) {
+            throw new DistributionException(e);
+        } finally {
+            ungetSession(session);
+        }
+
+    }
+
+    @Override
+    public void importFromStream(ResourceResolver resourceResolver, 
InputStream inputStream) throws DistributionException {
+
+        Session session = null;
+        OutputStream outputStream = null;
+        File file = null;
+        boolean isTmp = true;
+        try {
+            session = getSession(resourceResolver);
+            ImportOptions importOptions = 
VltUtils.getImportOptions(aclHandling, importMode, autosaveThreshold);
+
+
+            if (inputStream instanceof 
FileDistributionPackage.PackageInputStream) {
+                file = ((FileDistributionPackage.PackageInputStream) 
inputStream).getFile();
+                isTmp = false;
+            } else {
+                file = File.createTempFile("distrpck-tmp-" + 
System.nanoTime(), "." + TYPE);
+            }
+
+            outputStream = new BufferedOutputStream(new 
FileOutputStream(file));
+
+            IOUtils.copy(inputStream, outputStream);
+            IOUtils.closeQuietly(outputStream);
+
+            PackageManager packageManager = packaging.getPackageManager();
+            VaultPackage vaultPackage = packageManager.open(file);
+
+            vaultPackage.extract(session, importOptions);
+
+            vaultPackage.close();
+        } catch (Exception e) {
+            throw new DistributionException(e);
+        } finally {
+            IOUtils.closeQuietly(outputStream);
+            if (isTmp) {
+                FileUtils.deleteQuietly(file);
+            }
+
+            ungetSession(session);
+        }
+
+    }
+
+    protected Session getSession(ResourceResolver resourceResolver) throws 
RepositoryException {
+        Session session = resourceResolver.adaptTo(Session.class);
+        if (session != null) {
+            DistributionJcrUtils.setDoNotDistribute(session);
+        } else {
+            throw new RepositoryException("could not obtain a session from 
calling user " + resourceResolver.getUserID());
+        }
+        return session;
+    }
+
+    protected void ungetSession(Session session) {
+        if (session != null) {
+            try {
+                if (session.hasPendingChanges()) {
+                    session.save();
+                }
+            } catch (RepositoryException e) {
+                log.error("Cannot save session", e);
+            }
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+}

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
 Mon Apr 11 09:17:33 2016
@@ -36,12 +36,14 @@ import org.apache.jackrabbit.vault.packa
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
 import 
org.apache.sling.distribution.component.impl.DistributionComponentConstants;
 import org.apache.sling.distribution.component.impl.SettingsUtils;
-import org.apache.sling.distribution.common.DistributionException;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
-import 
org.apache.sling.distribution.serialization.impl.DefaultSharedDistributionPackageBuilder;
+import 
org.apache.sling.distribution.serialization.impl.FileDistributionPackageBuilder;
+import 
org.apache.sling.distribution.serialization.impl.ResourceDistributionPackageBuilder;
 
 /**
  * A package builder for Apache Jackrabbit FileVault based implementations.
@@ -147,10 +149,13 @@ public class VaultDistributionPackageBui
             aclHandling = 
AccessControlHandling.valueOf(aclHandlingString.trim());
         }
 
+        DistributionContentSerializer contentSerializer = new 
FileVaultContentSerializer(name, packaging, importMode, aclHandling,
+                packageRoots, packageFilters, useBinaryReferences, 
autosaveThreshold);
+
         if ("filevlt".equals(type)) {
-            packageBuilder = new FileVaultDistributionPackageBuilder(name, 
packaging, importMode, aclHandling, packageRoots, packageFilters, tempFsFolder, 
useBinaryReferences, autosaveThreshold);
+            packageBuilder = new FileDistributionPackageBuilder(name, 
contentSerializer, tempFsFolder);
         } else {
-            packageBuilder = new JcrVaultDistributionPackageBuilder(name, 
packaging, importMode, aclHandling, packageRoots, packageFilters, tempFsFolder, 
useBinaryReferences, autosaveThreshold);
+            packageBuilder = new ResourceDistributionPackageBuilder(name, 
contentSerializer, tempFsFolder);
         }
     }
 

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
 Mon Apr 11 09:17:33 2016
@@ -33,6 +33,7 @@ import org.apache.sling.api.servlets.Sli
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.resources.DistributionResourceTypes;
@@ -103,7 +104,7 @@ public class DistributionPackageExporter
                     try {
                         
response.addHeader(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID, 
distributionPackage.getId());
 
-                        inputStream = distributionPackage.createInputStream();
+                        inputStream = 
DistributionPackageUtils.createStreamWithHeader(distributionPackage);
 
                         bytesCopied = IOUtils.copy(inputStream, 
response.getOutputStream());
                     } catch (IOException e) {

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
 Mon Apr 11 09:17:33 2016
@@ -30,8 +30,6 @@ import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
 import org.apache.http.client.HttpResponseException;
 import org.apache.http.client.fluent.Executor;
 import org.apache.http.client.fluent.Request;
@@ -43,6 +41,7 @@ import org.apache.sling.distribution.Dis
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.common.RecoverableDistributionException;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
 import org.apache.sling.distribution.transport.DistributionTransportSecret;
@@ -96,7 +95,7 @@ public class SimpleHttpDistributionTrans
 
                 InputStream inputStream = null;
                 try {
-                    inputStream = distributionPackage.createInputStream();
+                    inputStream = 
DistributionPackageUtils.createStreamWithHeader(distributionPackage);
 
                     req = req.bodyStream(inputStream, 
ContentType.APPLICATION_OCTET_STREAM);
 

Added: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtilsTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtilsTest.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtilsTest.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtilsTest.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.packaging.impl;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DistributionPackageUtilsTest {
+
+
+    @Test
+    public void testInfoEmptyStreams(){
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        Map<String, Object> info = new HashMap<String, Object>();
+        DistributionPackageUtils.writeInfo(outputStream, info);
+
+        InputStream inputStream = new 
ByteArrayInputStream(outputStream.toByteArray());
+
+        Map<String, Object> resultInfo = new HashMap<String, Object>();
+        DistributionPackageUtils.readInfo(inputStream, resultInfo);
+
+        assertEquals(info.size(), resultInfo.size());
+
+    }
+
+
+    @Test
+    public void testInfoFullStreams(){
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        Map<String, Object> info = new HashMap<String, Object>();
+        info.put("test1", "value1");
+        info.put("test2", "value2");
+        info.put("test3", new String[] { "value1", "value2" });
+
+        DistributionPackageUtils.writeInfo(outputStream, info);
+
+        InputStream inputStream = new 
ByteArrayInputStream(outputStream.toByteArray());
+
+        Map<String, Object> resultInfo = new HashMap<String, Object>();
+        DistributionPackageUtils.readInfo(inputStream, resultInfo);
+
+        assertEquals(info.size(), resultInfo.size());
+        assertEquals("value1", resultInfo.get("test1"));
+        assertEquals("value2", resultInfo.get("test2"));
+        String[] array = (String[]) resultInfo.get("test3");
+
+        assertEquals("value1", array[0]);
+        assertEquals("value2", array[1]);
+    }
+
+
+    @Test
+    public void testStreamsWithoutInfo() throws IOException {
+
+        byte[] bytes =new byte[100];
+        for (int i=0; i< bytes.length; i++) {
+            bytes[i] = (byte) i;
+        }
+
+        InputStream inputStream = new BufferedInputStream(new 
ByteArrayInputStream(bytes));
+
+        Map<String, Object> resultInfo = new HashMap<String, Object>();
+        DistributionPackageUtils.readInfo(inputStream, resultInfo);
+
+        assertEquals(0, resultInfo.size());
+
+        byte[] resultBytes = new byte[100];
+
+        inputStream.read(resultBytes, 0, 100);
+
+        assertEquals(-1, inputStream.read());
+
+        for (int i=0; i < bytes.length; i++) {
+            assertEquals((byte)i, resultBytes[i]);
+        }
+
+    }
+}

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/serialization/impl/SimpleDistributionPackageTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/serialization/impl/SimpleDistributionPackageTest.java?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/serialization/impl/SimpleDistributionPackageTest.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/serialization/impl/SimpleDistributionPackageTest.java
 Mon Apr 11 09:17:33 2016
@@ -28,6 +28,7 @@ import org.apache.sling.distribution.Sim
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -40,6 +41,7 @@ public class SimpleDistributionPackageTe
         DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.DELETE, "/abc");
         SimpleDistributionPackage createdPackage = new 
SimpleDistributionPackage(request, "VOID");
         SimpleDistributionPackage readPackage = 
SimpleDistributionPackage.fromStream(new 
ByteArrayInputStream(("DSTRPCK:DELETE|/abc").getBytes()), "VOID");
+        assertNotNull(readPackage);
         assertEquals(createdPackage.getType(), readPackage.getType());
         assertEquals(createdPackage.getInfo().getRequestType(), 
readPackage.getInfo().getRequestType());
         assertEquals(Arrays.toString(createdPackage.getInfo().getPaths()), 
Arrays.toString(readPackage.getInfo().getPaths()));

Modified: sling/trunk/contrib/extensions/distribution/extensions/pom.xml
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/extensions/pom.xml?rev=1738558&r1=1738557&r2=1738558&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/extensions/pom.xml (original)
+++ sling/trunk/contrib/extensions/distribution/extensions/pom.xml Mon Apr 11 
09:17:33 2016
@@ -188,7 +188,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.core</artifactId>
-            <version>0.1.15-SNAPSHOT</version>
+            <version>0.1.17-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>

Added: 
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroContentSerializer.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroContentSerializer.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroContentSerializer.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroContentSerializer.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl.avro;
+
+import javax.annotation.Nonnull;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.JcrConstants;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apache Avro based {@link DistributionContentSerializer}
+ */
+public class AvroContentSerializer implements DistributionContentSerializer {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String name;
+    private final DataFileWriter<AvroShallowResource> dataFileWriter;
+    private final Schema schema;
+    private final Set<String> ignoredProperties;
+    private final Set<String> ignoredNodeNames;
+    private final SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.sss+hh:mm");
+
+    public AvroContentSerializer(String name) {
+        DatumWriter<AvroShallowResource> datumWriter = new 
SpecificDatumWriter<AvroShallowResource>(AvroShallowResource.class);
+        this.dataFileWriter = new 
DataFileWriter<AvroShallowResource>(datumWriter);
+        try {
+            schema = new 
Schema.Parser().parse(getClass().getResourceAsStream("/shallowresource.avsc"));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        Set<String> iProps = new HashSet<String>();
+        iProps.add(JcrConstants.JCR_FROZENMIXINTYPES);
+        iProps.add(JcrConstants.JCR_FROZENPRIMARYTYPE);
+        iProps.add(JcrConstants.JCR_FROZENUUID);
+        iProps.add(JcrConstants.JCR_VERSIONHISTORY);
+        iProps.add(JcrConstants.JCR_BASEVERSION);
+        iProps.add(JcrConstants.JCR_PREDECESSORS);
+        iProps.add(JcrConstants.JCR_SUCCESSORS);
+        iProps.add(JcrConstants.JCR_ISCHECKEDOUT);
+        iProps.add(JcrConstants.JCR_UUID);
+        ignoredProperties = Collections.unmodifiableSet(iProps);
+
+        Set<String> iNames = new HashSet<String>();
+        iNames.add("rep:policy");
+        ignoredNodeNames = Collections.unmodifiableSet(iNames);
+        this.name = name;
+    }
+
+    @Override
+    public void exportToStream(ResourceResolver resourceResolver, 
DistributionRequest request, OutputStream outputStream) throws 
DistributionException {
+
+        DataFileWriter<AvroShallowResource> writer;
+        try {
+            writer = dataFileWriter.create(schema, outputStream);
+        } catch (IOException e) {
+            throw new DistributionException(e);
+        }
+
+        try {
+
+            for (String path : request.getPaths()) {
+                Resource resource = resourceResolver.getResource(path);
+                AvroShallowResource avroShallowResource = 
getAvroShallowResource(request.isDeep(path), path, resource);
+                writer.append(avroShallowResource);
+            }
+            outputStream.flush();
+
+        } catch (Exception e) {
+            throw new DistributionException(e);
+        } finally {
+            try {
+                outputStream.close();
+                writer.close();
+            } catch (IOException e) {
+                // do nothing
+            }
+        }
+
+    }
+
+    @Override
+    public void importFromStream(ResourceResolver resourceResolver, 
InputStream stream) throws DistributionException {
+        try {
+            byte[] bin = IOUtils.toByteArray(stream); // TODO : avoid byte[] 
conversion
+            Collection<AvroShallowResource> avroShallowResources = 
readAvroResources(bin);
+            for (AvroShallowResource ar : avroShallowResources) {
+                persistResource(resourceResolver, ar);
+            }
+            resourceResolver.commit();
+        } catch (Exception e) {
+            throw new DistributionException(e);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    private AvroShallowResource getAvroShallowResource(boolean deep, String 
path, Resource resource) throws IOException {
+        AvroShallowResource avroShallowResource = new AvroShallowResource();
+        avroShallowResource.setName("avro_" + System.nanoTime());
+        avroShallowResource.setPath(path);
+        avroShallowResource.setResourceType(resource.getResourceType());
+        ValueMap valueMap = resource.getValueMap();
+        Map<CharSequence, Object> map = new HashMap<CharSequence, Object>();
+        for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+            if (!ignoredProperties.contains(entry.getKey())) {
+                Object value = entry.getValue();
+                if (value instanceof GregorianCalendar) {
+                    value = dateFormat.format(((GregorianCalendar) 
value).getTime());
+                } else if (value instanceof Object[]) {
+                    Object[] ar = (Object[]) value;
+                    value = Arrays.asList(ar);
+                } else if (value instanceof InputStream) {
+                    value = ByteBuffer.wrap(IOUtils.toByteArray(((InputStream) 
value)));
+                }
+                map.put(entry.getKey(), value);
+            }
+        }
+        avroShallowResource.setValueMap(map);
+        List<AvroShallowResource> children = new 
LinkedList<AvroShallowResource>();
+        if (deep) {
+            for (Resource child : resource.getChildren()) {
+                String childPath = child.getPath();
+                if (!ignoredNodeNames.contains(child.getName())) {
+                    children.add(getAvroShallowResource(true, childPath, 
child));
+                }
+            }
+        }
+        avroShallowResource.setChildren(children);
+        return avroShallowResource;
+    }
+
+    private Collection<AvroShallowResource> readAvroResources(byte[] bytes) 
throws IOException {
+        DatumReader<AvroShallowResource> datumReader = new 
SpecificDatumReader<AvroShallowResource>(AvroShallowResource.class);
+        DataFileReader<AvroShallowResource> dataFileReader = new 
DataFileReader<AvroShallowResource>(new SeekableByteArrayInput(bytes), 
datumReader);
+        AvroShallowResource avroResource = null;
+        Collection<AvroShallowResource> avroResources = new 
LinkedList<AvroShallowResource>();
+        while (dataFileReader.hasNext()) {
+// Reuse avroResource object by passing it to next(). This saves us from
+// allocating and garbage collecting many objects for files with
+// many items.
+            avroResource = dataFileReader.next(avroResource);
+            avroResources.add(avroResource);
+        }
+        return avroResources;
+    }
+
+    private void persistResource(@Nonnull ResourceResolver resourceResolver, 
AvroShallowResource r) throws PersistenceException {
+        String path = r.getPath().toString().trim();
+        String name = path.substring(path.lastIndexOf('/') + 1);
+        String substring = path.substring(0, path.lastIndexOf('/'));
+        String parentPath = substring.length() == 0 ? "/" : substring;
+        Map<String, Object> map = new HashMap<String, Object>();
+        Map<CharSequence, Object> valueMap = r.getValueMap();
+        for (Map.Entry<CharSequence, Object> entry : valueMap.entrySet()) {
+            Object value = entry.getValue();
+            if (value instanceof GenericData.Array) {
+                GenericData.Array array = (GenericData.Array) value;
+                String[] s = new String[array.size()];
+                for (int i = 0; i < s.length; i++) {
+                    Object gd = array.get(i);
+                    s[i] = gd.toString();
+                }
+                value = s;
+            } else if (value instanceof Utf8) {
+                value = value.toString();
+            } else if (value instanceof ByteBuffer) {
+                byte[] bytes = ((ByteBuffer) value).array();
+                value = new BufferedInputStream(new 
ByteArrayInputStream(bytes));
+            }
+            map.put(entry.getKey().toString(), value);
+        }
+        Resource existingResource = resourceResolver.getResource(path);
+        if (existingResource != null) {
+            resourceResolver.delete(existingResource);
+        }
+        Resource parent = resourceResolver.getResource(parentPath);
+        if (parent == null) {
+            parent = createParent(resourceResolver, parentPath);
+        }
+        Resource createdResource = resourceResolver.create(parent, name, map);
+        log.info("created resource {}", createdResource);
+        for (AvroShallowResource child : r.getChildren()) {
+            persistResource(createdResource.getResourceResolver(), child);
+        }
+    }
+
+    private Resource createParent(ResourceResolver resourceResolver, String 
path) throws PersistenceException {
+        String parentPath = path.substring(0, path.lastIndexOf('/'));
+        String name = path.substring(path.lastIndexOf('/') + 1);
+        Resource parentResource = resourceResolver.getResource(parentPath);
+        if (parentResource == null) {
+            parentResource = createParent(resourceResolver, parentPath);
+        }
+        Map<String, Object> properties = new HashMap<String, Object>();
+        return resourceResolver.create(parentResource, name, properties);
+    }
+}

Added: 
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionContentSerializerFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionContentSerializerFactory.java?rev=1738558&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionContentSerializerFactory.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/extensions/src/main/java/org/apache/sling/distribution/serialization/impl/avro/AvroDistributionContentSerializerFactory.java
 Mon Apr 11 09:17:33 2016
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl.avro;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for {@link DistributionContentSerializer}s based on Apache Avro.
+ */
+@Component(metatype = true,
+        label = "Apache Sling Distribution Packaging - Avro Serialization 
Format Factory",
+        description = "OSGi configuration for Avro serialization formats",
+        configurationFactory = true,
+        specVersion = "1.1",
+        policy = ConfigurationPolicy.REQUIRE
+)
+@Service(DistributionContentSerializer.class)
+public class AvroDistributionContentSerializerFactory implements 
DistributionContentSerializer {
+
+    /**
+     * name of this package builder.
+     */
+    @Property(label = "Name", description = "The name of the avro format.")
+    public static final String NAME = "name";
+
+    private DistributionContentSerializer format;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Activate
+    public void activate(Map<String, Object> config) {
+
+        String name = PropertiesUtil.toString(config.get(NAME), null);
+        log.info("starting avro format {}", name);
+
+        format = new AvroContentSerializer(name);
+        log.info("started avro resource package builder");
+    }
+
+
+    @Override
+    public void exportToStream(ResourceResolver resourceResolver, 
DistributionRequest request, OutputStream outputStream) throws 
DistributionException {
+        format.exportToStream(resourceResolver, request, outputStream);
+    }
+
+    @Override
+    public void importFromStream(ResourceResolver resourceResolver, 
InputStream stream) throws DistributionException {
+        format.importFromStream(resourceResolver, stream);
+    }
+
+    @Override
+    public String getName() {
+        return format.getName();
+    }
+}


Reply via email to