Author: ppoddar
Date: Wed Apr  3 15:51:52 2013
New Revision: 1464082

URL: http://svn.apache.org/r1464082
Log:
OPENJPA-2365: Support for delete-by-query. Correction for hint processing

Added:
    
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
   (with props)
    
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
   (with props)
Modified:
    
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
    
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
    
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    openjpa/trunk/scripts/test.bat

Modified: 
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
 (original)
+++ 
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
 Wed Apr  3 15:51:52 2013
@@ -584,7 +584,12 @@ public class JDBCStoreQuery 
                 }
             }
         } finally {
-            try { conn.close(); } catch (SQLException se) {}
+            try { 
+               if (conn.getAutoCommit())
+                       conn.close(); 
+            } catch (SQLException se) {
+               
+            }
         }
 
         localContext.remove();

Modified: 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
 Wed Apr  3 15:51:52 2013
@@ -1829,6 +1829,25 @@ public class BrokerImpl implements Broke
             endOperation();
         }
     }
+    
+    /**
+     * Sets the given flag to the status.
+     * 
+     * @since 2.3.0
+     */
+    protected void setStatusFlag(int flag) {
+       _flags |= flag;
+    }
+    
+    /**
+     * Clears the given flag from the status.
+     * 
+     * @since 2.3.0
+     */
+    protected void clearStatusFlag(int flag) {
+       _flags &= ~flag;
+    }
+
 
     public void flush() {
         beginOperation(true);

Modified: 
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
 Wed Apr  3 15:51:52 2013
@@ -347,6 +347,11 @@ public class QueryImpl<X> extends Abstra
 
        private boolean pushQueryFetchPlan() {
                boolean fcPushed = false;
+               if (_hintHandler != null) {
+                       FetchConfiguration fc = _fetch == null ? null : 
((FetchPlanImpl)_fetch).getDelegate();
+                       _em.pushFetchPlan(fc);
+                       return true;
+               }
                if (_fetch != null && _hintHandler != null) {
                        switch (_fetch.getReadLockMode()) {
                        case PESSIMISTIC_READ:
@@ -528,6 +533,7 @@ public class QueryImpl<X> extends Abstra
      * cache. 
      */
     private boolean preExecute(Map params) {
+       
         PreparedQueryCache cache = _em.getPreparedQueryCache();
         if (cache == null) {
             return false;

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
 Wed Apr  3 15:51:52 2013
@@ -152,6 +152,12 @@ public class DistributedBrokerImpl exten
     @Override
     public void beginStore() {
     }
+    
+    @Override
+    protected void flush(int reason) {
+       setStatusFlag(2 << 8);
+       super.flush(reason);
+    }
 
     /**
      * Overrides to target specific slices for find() calls.

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
 Wed Apr  3 15:51:52 2013
@@ -293,6 +293,20 @@ class DistributedJDBCStoreManager extend
         }
     }
     
+    @Override
+    public void commit() {
+       for (SliceStoreManager slice : _slices) {
+               slice.commit();
+       }
+    }
+    
+    @Override
+    public void rollback() {
+       for (SliceStoreManager slice : _slices) {
+               slice.rollback();
+       }
+    }
+    
     /**
      * Collect the current versions of the given StateManagers.
      */

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
 Wed Apr  3 15:51:52 2013
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.Broker;
 import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.kernel.ExpressionStoreQuery;
 import org.apache.openjpa.kernel.FetchConfiguration;
@@ -118,15 +119,14 @@ class DistributedStoreQuery extends JDBC
                 */
                public ResultObjectProvider executeQuery(StoreQuery q,
                                final Object[] params, final Range range) {
-                       List<Future<ResultObjectProvider>> futures = 
-                               new ArrayList<Future<ResultObjectProvider>>();
+                       List<Future<ResultObjectProvider>> futures = new 
ArrayList<Future<ResultObjectProvider>>();
             final List<Executor> usedExecutors = new ArrayList<Executor>();
-                       final List<ResultObjectProvider> rops = 
-                               new ArrayList<ResultObjectProvider>();
+                       final List<ResultObjectProvider> rops = new 
ArrayList<ResultObjectProvider>();
                        List<SliceStoreManager> targets = findTargets();
                        QueryContext ctx = q.getContext();
                        boolean isReplicated = containsReplicated(ctx);
             ExecutorService threadPool = SliceThread.getPool();
+           
                        for (int i = 0; i < owner._queries.size(); i++) {
                 // if replicated, then execute only on single slice
                                if (isReplicated && !usedExecutors.isEmpty()) {
@@ -135,16 +135,12 @@ class DistributedStoreQuery extends JDBC
                 StoreManager sm = owner.getDistributedStore().getSlice(i);
                                if (!targets.contains(sm))
                                        continue;
-                               StoreQuery query = owner._queries.get(i);
-                               Executor executor = executors.get(i);
-                               if (!targets.contains(sm))
-                                       continue;
-                               usedExecutors.add(executor);
                 QueryExecutor call = new QueryExecutor();
-                call.executor = executor;
-                call.query = query;
+                call.executor = executors.get(i);
+                call.query = owner._queries.get(i);
                 call.params = params;
                 call.range = range;
+                               usedExecutors.add(call.executor);
                 futures.add(threadPool.submit(call));
                        }
                        for (Future<ResultObjectProvider> future : futures) {
@@ -157,16 +153,14 @@ class DistributedStoreQuery extends JDBC
                                }
                        }
                        
-                       ResultObjectProvider[] tmp = rops
-                    .toArray(new ResultObjectProvider[rops.size()]);
+                       ResultObjectProvider[] tmp = rops.toArray(new 
ResultObjectProvider[rops.size()]);
                        ResultObjectProvider result = null;
                        boolean[] ascending = getAscending(q);
                        boolean isAscending = ascending.length > 0;
                        boolean isAggregate = ctx.isAggregate();
                        boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
                        if (isAggregate) {
-                               result = new UniqueResultObjectProvider(tmp, q,
-                                               getQueryExpressions());
+                               result = new UniqueResultObjectProvider(tmp, q, 
getQueryExpressions());
                        } else if (isAscending) {
                 result = new OrderingMergedResultObjectProvider(tmp, ascending,
                     usedExecutors.toArray(new Executor[usedExecutors.size()]),
@@ -175,8 +169,7 @@ class DistributedStoreQuery extends JDBC
                                result = new MergedResultObjectProvider(tmp);
                        }
                        if (hasRange) {
-                result = new RangeResultObjectProvider(result,
-                        ctx.getStartRange(), ctx.getEndRange());
+                result = new RangeResultObjectProvider(result, 
ctx.getStartRange(), ctx.getEndRange());
                        }
                        return result;
                }
@@ -201,16 +194,18 @@ class DistributedStoreQuery extends JDBC
                }
 
                public Number executeDelete(StoreQuery q, Object[] params) {
-                       Iterator<StoreQuery> qs = owner._queries.iterator();
-                       List<Future<Number>> futures = null;
+                       List<Future<Number>> futures = new 
ArrayList<Future<Number>>();
                        int result = 0;
             ExecutorService threadPool = SliceThread.getPool();
-                       for (Executor ex : executors) {
-                               if (futures == null)
-                    futures = new ArrayList<Future<Number>>();
+                       List<SliceStoreManager> targets = findTargets();
+                       for (int i = 0; i < owner._queries.size(); i++) {
+                StoreManager sm = owner.getDistributedStore().getSlice(i);
+                               if (!targets.contains(sm))
+                                       continue;
+                               
                                DeleteExecutor call = new DeleteExecutor();
-                               call.executor = ex;
-                               call.query = qs.next();
+                               call.executor = executors.get(i);
+                               call.query = owner._queries.get(i);
                                call.params = params;
                                futures.add(threadPool.submit(call));
                        }
@@ -256,8 +251,7 @@ class DistributedStoreQuery extends JDBC
                }
 
                List<SliceStoreManager> findTargets() {
-                       FetchConfiguration fetch = owner.getContext()
-                                       .getFetchConfiguration();
+                   FetchConfiguration fetch = 
owner.getContext().getFetchConfiguration();
                        return owner.getDistributedStore().getTargets(fetch);
                }
                

Added: 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java?rev=1464082&view=auto
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
 (added)
+++ 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
 Wed Apr  3 15:51:52 2013
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+
+
+import org.apache.openjpa.kernel.BrokerFactory;
+import org.apache.openjpa.persistence.JPAFacadeHelper;
+import org.apache.openjpa.slice.DistributedBrokerFactory;
+import org.apache.openjpa.slice.SlicePersistence;
+import org.apache.openjpa.slice.policy.UniformDistributionPolicy;
+
+/**
+ * Tests delete-by-query.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class TestBulkDelete extends SliceTestCase {
+       private static int SLICES = 3;
+       private static List<String> SLICE_NAMES;
+       
+       @Override
+       protected String getPersistenceUnitName() {
+               return "slice";
+       }
+       public void setUp() throws Exception {
+               super.setUp(PObject.class, CLEAR_TABLES,
+                               "openjpa.slice.DistributionPolicy", 
UniformDistributionPolicy.class.getName());
+               
+       }
+
+       public void tearDown() throws Exception {
+               System.err.println("Delete all instances from all slices");
+               EntityManager em = emf.createEntityManager();
+               em.getTransaction().begin();
+               String delete = "delete from PObject p";
+               int m = em.createQuery(delete).executeUpdate();
+               em.getTransaction().commit();
+               super.tearDown();
+       }
+       
+       /**
+        * Creates N instances that are distributed in 3 slices.
+        * Deletes all instances from only one slice.
+        */
+       public void testBulkDelete() {
+               EntityManager em = emf.createEntityManager();
+               em.getTransaction().begin();
+               DistributedConfiguration conf = 
(DistributedConfiguration)emf.getConfiguration();
+               SLICE_NAMES = conf.getActiveSliceNames();
+               SLICES = SLICE_NAMES.size();
+               assertTrue(SLICES > 1);
+               int M = 4; // no of instances in each slice
+               int N = SLICES*M; // total number of instances in all 3 slices
+               
+               for (int i = 0; i < N; i++) {
+                       PObject pc = new PObject();
+                       em.persist(pc);
+               }
+               em.getTransaction().commit();
+               String jpql = "select count(p) from PObject p";
+               long total = em.createQuery(jpql, Long.class).getSingleResult();
+               assertEquals(N, total);
+               
+               for (int i = 0; i < SLICES; i++) {
+                       System.err.println("Query only on slice [" + 
SLICE_NAMES.get(i) + "]");
+                       long count = em.createQuery(jpql,Long.class)
+                                      .setHint(SlicePersistence.HINT_TARGET, 
SLICE_NAMES.get(i))
+                                      .getSingleResult();
+                       assertEquals(M, count);
+               }
+               
+               em.getTransaction().begin();
+               System.err.println("Delete only from slice [" + 
SLICE_NAMES.get(0) + "]");
+               String delete = "delete from PObject p";
+               int m = em.createQuery(delete)
+                 .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(0))
+                 .executeUpdate();
+               assertEquals(M, m);
+               em.getTransaction().commit();
+       }
+}

Propchange: 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java?rev=1464082&view=auto
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
 (added)
+++ 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
 Wed Apr  3 15:51:52 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.policy;
+
+import java.util.List;
+
+import org.apache.openjpa.slice.DistributionPolicy;
+import org.apache.openjpa.slice.PObject;
+
+
+/**
+ * Distributes the instances uniformly among the available slices
+ * based on the integral value of the persistence identifier.
+ * <br>
+ * Given {@code M} slices and {@code N} instances whose identity
+ * value is uniformly distributed, this policy will persist these
+ * instances such that
+ * <LI>each slice will have N/M instances
+ * <LI>the identity of the instances in the {@code i}-th slice 
+ * will be divisible by {@code i}.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class UniformDistributionPolicy implements DistributionPolicy {
+
+       @Override
+       public String distribute(Object pc, List<String> slices, Object 
context) {
+               int N = slices.size();
+               for (int i = N; i > 0; i--) {
+                       PObject p = (PObject)pc;
+                       if (p.getId()%i == 0) return slices.get(i-1);
+               }
+               return slices.get(0);
+       }
+
+}

Propchange: 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: openjpa/trunk/scripts/test.bat
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/scripts/test.bat?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/scripts/test.bat (original)
+++ openjpa/trunk/scripts/test.bat Wed Apr  3 15:51:52 2013
@@ -18,7 +18,6 @@
 @rem
 
 @setlocal
-pushd openjpa-persistence-jdbc
-mvn test -Dtest=%1 %2 %3 %4
+mvn test -DfailIfNoTests=false -Dbuild.enhance=false -Dtest=%1 %2 %3 %4
 popd
 @endlocal


Reply via email to