Author: ppoddar Date: Wed Apr 3 15:51:52 2013 New Revision: 1464082 URL: http://svn.apache.org/r1464082 Log: OPENJPA-2365: Support for delete-by-query. Correction for hint processing
Added: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java (with props) openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java (with props) Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java openjpa/trunk/scripts/test.bat Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java (original) +++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java Wed Apr 3 15:51:52 2013 @@ -584,7 +584,12 @@ public class JDBCStoreQuery } } } finally { - try { conn.close(); } catch (SQLException se) {} + try { + if (conn.getAutoCommit()) + conn.close(); + } catch (SQLException se) { + + } } localContext.remove(); Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java (original) +++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java Wed Apr 3 15:51:52 2013 @@ -1829,6 +1829,25 @@ public class BrokerImpl implements Broke endOperation(); } } + + /** + * Sets the given flag to the status. + * + * @since 2.3.0 + */ + protected void setStatusFlag(int flag) { + _flags |= flag; + } + + /** + * Clears the given flag from the status. + * + * @since 2.3.0 + */ + protected void clearStatusFlag(int flag) { + _flags &= ~flag; + } + public void flush() { beginOperation(true); Modified: openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java (original) +++ openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java Wed Apr 3 15:51:52 2013 @@ -347,6 +347,11 @@ public class QueryImpl<X> extends Abstra private boolean pushQueryFetchPlan() { boolean fcPushed = false; + if (_hintHandler != null) { + FetchConfiguration fc = _fetch == null ? null : ((FetchPlanImpl)_fetch).getDelegate(); + _em.pushFetchPlan(fc); + return true; + } if (_fetch != null && _hintHandler != null) { switch (_fetch.getReadLockMode()) { case PESSIMISTIC_READ: @@ -528,6 +533,7 @@ public class QueryImpl<X> extends Abstra * cache. */ private boolean preExecute(Map params) { + PreparedQueryCache cache = _em.getPreparedQueryCache(); if (cache == null) { return false; Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java (original) +++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java Wed Apr 3 15:51:52 2013 @@ -152,6 +152,12 @@ public class DistributedBrokerImpl exten @Override public void beginStore() { } + + @Override + protected void flush(int reason) { + setStatusFlag(2 << 8); + super.flush(reason); + } /** * Overrides to target specific slices for find() calls. Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java (original) +++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java Wed Apr 3 15:51:52 2013 @@ -293,6 +293,20 @@ class DistributedJDBCStoreManager extend } } + @Override + public void commit() { + for (SliceStoreManager slice : _slices) { + slice.commit(); + } + } + + @Override + public void rollback() { + for (SliceStoreManager slice : _slices) { + slice.rollback(); + } + } + /** * Collect the current versions of the given StateManagers. */ Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java (original) +++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java Wed Apr 3 15:51:52 2013 @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.openjpa.jdbc.kernel.JDBCStore; import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery; +import org.apache.openjpa.kernel.Broker; import org.apache.openjpa.kernel.BrokerImpl; import org.apache.openjpa.kernel.ExpressionStoreQuery; import org.apache.openjpa.kernel.FetchConfiguration; @@ -118,15 +119,14 @@ class DistributedStoreQuery extends JDBC */ public ResultObjectProvider executeQuery(StoreQuery q, final Object[] params, final Range range) { - List<Future<ResultObjectProvider>> futures = - new ArrayList<Future<ResultObjectProvider>>(); + List<Future<ResultObjectProvider>> futures = new ArrayList<Future<ResultObjectProvider>>(); final List<Executor> usedExecutors = new ArrayList<Executor>(); - final List<ResultObjectProvider> rops = - new ArrayList<ResultObjectProvider>(); + final List<ResultObjectProvider> rops = new ArrayList<ResultObjectProvider>(); List<SliceStoreManager> targets = findTargets(); QueryContext ctx = q.getContext(); boolean isReplicated = containsReplicated(ctx); ExecutorService threadPool = SliceThread.getPool(); + for (int i = 0; i < owner._queries.size(); i++) { // if replicated, then execute only on single slice if (isReplicated && !usedExecutors.isEmpty()) { @@ -135,16 +135,12 @@ class DistributedStoreQuery extends JDBC StoreManager sm = owner.getDistributedStore().getSlice(i); if (!targets.contains(sm)) continue; - StoreQuery query = owner._queries.get(i); - Executor executor = executors.get(i); - if (!targets.contains(sm)) - continue; - usedExecutors.add(executor); QueryExecutor call = new QueryExecutor(); - call.executor = executor; - call.query = query; + call.executor = executors.get(i); + call.query = owner._queries.get(i); call.params = params; call.range = range; + usedExecutors.add(call.executor); futures.add(threadPool.submit(call)); } for (Future<ResultObjectProvider> future : futures) { @@ -157,16 +153,14 @@ class DistributedStoreQuery extends JDBC } } - ResultObjectProvider[] tmp = rops - .toArray(new ResultObjectProvider[rops.size()]); + ResultObjectProvider[] tmp = rops.toArray(new ResultObjectProvider[rops.size()]); ResultObjectProvider result = null; boolean[] ascending = getAscending(q); boolean isAscending = ascending.length > 0; boolean isAggregate = ctx.isAggregate(); boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE; if (isAggregate) { - result = new UniqueResultObjectProvider(tmp, q, - getQueryExpressions()); + result = new UniqueResultObjectProvider(tmp, q, getQueryExpressions()); } else if (isAscending) { result = new OrderingMergedResultObjectProvider(tmp, ascending, usedExecutors.toArray(new Executor[usedExecutors.size()]), @@ -175,8 +169,7 @@ class DistributedStoreQuery extends JDBC result = new MergedResultObjectProvider(tmp); } if (hasRange) { - result = new RangeResultObjectProvider(result, - ctx.getStartRange(), ctx.getEndRange()); + result = new RangeResultObjectProvider(result, ctx.getStartRange(), ctx.getEndRange()); } return result; } @@ -201,16 +194,18 @@ class DistributedStoreQuery extends JDBC } public Number executeDelete(StoreQuery q, Object[] params) { - Iterator<StoreQuery> qs = owner._queries.iterator(); - List<Future<Number>> futures = null; + List<Future<Number>> futures = new ArrayList<Future<Number>>(); int result = 0; ExecutorService threadPool = SliceThread.getPool(); - for (Executor ex : executors) { - if (futures == null) - futures = new ArrayList<Future<Number>>(); + List<SliceStoreManager> targets = findTargets(); + for (int i = 0; i < owner._queries.size(); i++) { + StoreManager sm = owner.getDistributedStore().getSlice(i); + if (!targets.contains(sm)) + continue; + DeleteExecutor call = new DeleteExecutor(); - call.executor = ex; - call.query = qs.next(); + call.executor = executors.get(i); + call.query = owner._queries.get(i); call.params = params; futures.add(threadPool.submit(call)); } @@ -256,8 +251,7 @@ class DistributedStoreQuery extends JDBC } List<SliceStoreManager> findTargets() { - FetchConfiguration fetch = owner.getContext() - .getFetchConfiguration(); + FetchConfiguration fetch = owner.getContext().getFetchConfiguration(); return owner.getDistributedStore().getTargets(fetch); } Added: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java?rev=1464082&view=auto ============================================================================== --- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java (added) +++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java Wed Apr 3 15:51:52 2013 @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.slice; + +import java.util.List; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; + + +import org.apache.openjpa.kernel.BrokerFactory; +import org.apache.openjpa.persistence.JPAFacadeHelper; +import org.apache.openjpa.slice.DistributedBrokerFactory; +import org.apache.openjpa.slice.SlicePersistence; +import org.apache.openjpa.slice.policy.UniformDistributionPolicy; + +/** + * Tests delete-by-query. + * + * @author Pinaki Poddar + * + */ +public class TestBulkDelete extends SliceTestCase { + private static int SLICES = 3; + private static List<String> SLICE_NAMES; + + @Override + protected String getPersistenceUnitName() { + return "slice"; + } + public void setUp() throws Exception { + super.setUp(PObject.class, CLEAR_TABLES, + "openjpa.slice.DistributionPolicy", UniformDistributionPolicy.class.getName()); + + } + + public void tearDown() throws Exception { + System.err.println("Delete all instances from all slices"); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + String delete = "delete from PObject p"; + int m = em.createQuery(delete).executeUpdate(); + em.getTransaction().commit(); + super.tearDown(); + } + + /** + * Creates N instances that are distributed in 3 slices. + * Deletes all instances from only one slice. + */ + public void testBulkDelete() { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration(); + SLICE_NAMES = conf.getActiveSliceNames(); + SLICES = SLICE_NAMES.size(); + assertTrue(SLICES > 1); + int M = 4; // no of instances in each slice + int N = SLICES*M; // total number of instances in all 3 slices + + for (int i = 0; i < N; i++) { + PObject pc = new PObject(); + em.persist(pc); + } + em.getTransaction().commit(); + String jpql = "select count(p) from PObject p"; + long total = em.createQuery(jpql, Long.class).getSingleResult(); + assertEquals(N, total); + + for (int i = 0; i < SLICES; i++) { + System.err.println("Query only on slice [" + SLICE_NAMES.get(i) + "]"); + long count = em.createQuery(jpql,Long.class) + .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(i)) + .getSingleResult(); + assertEquals(M, count); + } + + em.getTransaction().begin(); + System.err.println("Delete only from slice [" + SLICE_NAMES.get(0) + "]"); + String delete = "delete from PObject p"; + int m = em.createQuery(delete) + .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(0)) + .executeUpdate(); + assertEquals(M, m); + em.getTransaction().commit(); + } +} Propchange: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java ------------------------------------------------------------------------------ svn:eol-style = native Added: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java?rev=1464082&view=auto ============================================================================== --- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java (added) +++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java Wed Apr 3 15:51:52 2013 @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.slice.policy; + +import java.util.List; + +import org.apache.openjpa.slice.DistributionPolicy; +import org.apache.openjpa.slice.PObject; + + +/** + * Distributes the instances uniformly among the available slices + * based on the integral value of the persistence identifier. + * <br> + * Given {@code M} slices and {@code N} instances whose identity + * value is uniformly distributed, this policy will persist these + * instances such that + * <LI>each slice will have N/M instances + * <LI>the identity of the instances in the {@code i}-th slice + * will be divisible by {@code i}. + * + * @author Pinaki Poddar + * + */ +public class UniformDistributionPolicy implements DistributionPolicy { + + @Override + public String distribute(Object pc, List<String> slices, Object context) { + int N = slices.size(); + for (int i = N; i > 0; i--) { + PObject p = (PObject)pc; + if (p.getId()%i == 0) return slices.get(i-1); + } + return slices.get(0); + } + +} Propchange: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: openjpa/trunk/scripts/test.bat URL: http://svn.apache.org/viewvc/openjpa/trunk/scripts/test.bat?rev=1464082&r1=1464081&r2=1464082&view=diff ============================================================================== --- openjpa/trunk/scripts/test.bat (original) +++ openjpa/trunk/scripts/test.bat Wed Apr 3 15:51:52 2013 @@ -18,7 +18,6 @@ @rem @setlocal -pushd openjpa-persistence-jdbc -mvn test -Dtest=%1 %2 %3 %4 +mvn test -DfailIfNoTests=false -Dbuild.enhance=false -Dtest=%1 %2 %3 %4 popd @endlocal