ignite-2982 Unwrap offheap object before pass it to indexing (this closes #693)
(cherry picked from commit b4a5d376608fdc36ab0a5dd8157834771a006d37)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/847399a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/847399a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/847399a8

Branch: refs/heads/ignite-3163
Commit: 847399a8db0ef2a1cae596c51dae2c55a501dca9
Parents: 0428018
Author: sboikov <[email protected]>
Authored: Fri May 13 10:20:00 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon May 16 10:19:35 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   2 +-
 .../processors/cache/GridCacheProcessor.java    |   2 +-
 .../processors/query/h2/opt/GridH2Table.java    |   1 -
 .../CacheOffheapBatchIndexingBaseTest.java      | 313 +++++++++++++++++++
 .../CacheOffheapBatchIndexingMultiTypeTest.java | 108 +++++++
 ...CacheOffheapBatchIndexingSingleTypeTest.java | 161 ++++++++++
 .../IgniteCacheQuerySelfTestSuite2.java         |   4 +
 7 files changed, 588 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0f7482a..8262113 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3886,7 +3886,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
 
             if (qryMgr.enabled())
-                qryMgr.remove(key(), prevVal);
+                qryMgr.remove(key(), 
(CacheObject)cctx.unwrapTemporary(prevVal));
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2b5a718..414a915 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -325,7 +325,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             perf.add("Disable near cache (set 'nearConfiguration' to null)", 
cfg.getNearConfiguration() == null);
 
             if (cfg.getAffinity() != null)
-                perf.add("Decrease number of backups (set 'keyBackups' to 0)", 
cfg.getBackups() == 0);
+                perf.add("Decrease number of backups (set 'backups' to 0)", 
cfg.getBackups() == 0);
         }
 
         // Suppress warning if at least one ATOMIC cache found.

http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index f1e5b16..bea4dd8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingBaseTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingBaseTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingBaseTest.java
new file mode 100644
index 0000000..b550b1e
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingBaseTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests various cache operations with indexing enabled.
+ */
+public abstract class CacheOffheapBatchIndexingBaseTest extends 
GridCommonAbstractTest {
+    /** */
+    private final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * Load data into cache
+     *
+     * @param name Cache name.
+     */
+    protected void preload(String name) {
+        try (IgniteDataStreamer<Object, Object> streamer = 
ignite(0).dataStreamer(name)) {
+            for (int i = 0; i < 30_000; i++) {
+                if (i % 2 == 0)
+                    streamer.addData(i, new Person(i, i + 1, 
String.valueOf(i), String.valueOf(i + 1), salary(i)));
+                else
+                    streamer.addData(i, new Organization(i, 
String.valueOf(i)));
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param base Base.
+     * @return Salary.
+     */
+    protected double salary(int base) {
+        return base * 100.;
+    }
+
+    /**
+     * @param onHeapRowCacheSize on heap row cache size.
+     * @param indexedTypes indexed types for cache.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(int 
onHeapRowCacheSize, Class<?>[] indexedTypes) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setMemoryMode(OFFHEAP_TIERED);
+        ccfg.setSqlOnheapRowCacheSize(onHeapRowCacheSize);
+        ccfg.setIndexedTypes(indexedTypes);
+
+        return ccfg;
+    }
+
+    /**
+     * Ignite cache value class.
+     */
+    protected static class Person implements Binarylizable {
+        /** Person ID. */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /** Organization ID. */
+        @QuerySqlField(index = true)
+        private int orgId;
+
+        /** First name (not-indexed). */
+        @QuerySqlField
+        private String firstName;
+
+        /** Last name (not indexed). */
+        @QuerySqlField
+        private String lastName;
+
+        /** Salary. */
+        @QuerySqlField(index = true)
+        private double salary;
+
+        /**
+         * Constructs empty person.
+         */
+        public Person() {
+            // No-op.
+        }
+
+        /**
+         * Constructs person record.
+         *
+         * @param id Person ID.
+         * @param orgId Organization ID.
+         * @param firstName First name.
+         * @param lastName Last name.
+         * @param salary Salary.
+         */
+        public Person(int id, int orgId, String firstName, String lastName, 
double salary) {
+            this.id = id;
+            this.orgId = orgId;
+            this.firstName = firstName;
+            this.lastName = lastName;
+            this.salary = salary;
+        }
+
+        /**
+         * @return Person id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Person id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Organization id.
+         */
+        public int getOrganizationId() {
+            return orgId;
+        }
+
+        /**
+         * @param orgId Organization id.
+         */
+        public void setOrganizationId(int orgId) {
+            this.orgId = orgId;
+        }
+
+        /**
+         * @return Person first name.
+         */
+        public String getFirstName() {
+            return firstName;
+        }
+
+        /**
+         * @param firstName Person first name.
+         */
+        public void setFirstName(String firstName) {
+            this.firstName = firstName;
+        }
+
+        /**
+         * @return Person last name.
+         */
+        public String getLastName() {
+            return lastName;
+        }
+
+        /**
+         * @param lastName Person last name.
+         */
+        public void setLastName(String lastName) {
+            this.lastName = lastName;
+        }
+
+        /**
+         * @return Salary.
+         */
+        public double getSalary() {
+            return salary;
+        }
+
+        /**
+         * @param salary Salary.
+         */
+        public void setSalary(double salary) {
+            this.salary = salary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeBinary(BinaryWriter writer) throws 
BinaryObjectException {
+            writer.writeInt("id", id);
+            writer.writeInt("orgId", orgId);
+            writer.writeString("firstName", firstName);
+            writer.writeString("lastName", lastName);
+            writer.writeDouble("salary", salary);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readBinary(BinaryReader reader) throws 
BinaryObjectException {
+            id = reader.readInt("id");
+            orgId = reader.readInt("orgId");
+            firstName = reader.readString("firstName");
+            lastName = reader.readString("lastName");
+            salary = reader.readDouble("salary");
+        }
+    }
+
+    /**
+     * Ignite cache value class with indexed field.
+     */
+    protected static class Organization implements Binarylizable {
+        /** Organization ID. */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /** Organization name. */
+        @QuerySqlField(index = true)
+        private String name;
+
+        /**
+         * Constructs empty organization.
+         */
+        public Organization() {
+            // No-op.
+        }
+
+        /**
+         * Constructs organization with given ID.
+         *
+         * @param id Organization ID.
+         * @param name Organization name.
+         */
+        public Organization(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * @return Organization id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Organization id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Organization name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name Organization name.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeBinary(BinaryWriter writer) throws 
BinaryObjectException {
+            writer.writeInt("id", id);
+            writer.writeString("name", name);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readBinary(BinaryReader reader) throws 
BinaryObjectException {
+            id = reader.readInt("id");
+            name = reader.readString("name");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingMultiTypeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingMultiTypeTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingMultiTypeTest.java
new file mode 100644
index 0000000..602fc1c
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingMultiTypeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Tests various cache operations with indexing enabled.
+ * Cache contain multiple types.
+ */
+public class CacheOffheapBatchIndexingMultiTypeTest extends 
CacheOffheapBatchIndexingBaseTest {
+    /**
+     * Tests putAll with multiple indexed entities and streamer pre-loading 
with low off-heap cache size.
+     */
+    public void testPutAllMultupleEntitiesAndStreamer() {
+        doStreamerBatchTest(50, 1_000, new Class<?>[] {
+            Integer.class, CacheOffheapBatchIndexingBaseTest.Person.class,
+            Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class},
+            1,
+            true);
+    }
+
+    /**
+     * Tests putAll with multiple indexed entities and streamer preloading 
with default off-heap cache size.
+     */
+    public void testPutAllMultupleEntitiesAndStreamerDfltOffHeapRowCacheSize() 
{
+        doStreamerBatchTest(50, 1_000, new Class<?>[] {
+            Integer.class, CacheOffheapBatchIndexingBaseTest.Person.class,
+            Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class},
+            CacheConfiguration.DFLT_SQL_ONHEAP_ROW_CACHE_SIZE,
+            true);
+    }
+
+    /**
+     * Tests putAll after with streamer batch load with one entity.
+     */
+    public void testPuAllSingleEntity() {
+        doStreamerBatchTest(50,
+            1_000,
+            new Class<?>[] {Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class},
+            1,
+            false);
+    }
+
+    /**
+     * @param iterations Number of iterations.
+     * @param entitiesCnt Number of entities to put.
+     * @param entityClasses Entity classes.
+     * @param onHeapRowCacheSize Cache size.
+     * @param preloadInStreamer Data preload flag.
+     */
+    private void doStreamerBatchTest(int iterations,
+        int entitiesCnt,
+        Class<?>[] entityClasses,
+        int onHeapRowCacheSize,
+        boolean preloadInStreamer) {
+        Ignite ignite = grid(0);
+
+        final IgniteCache<Object, Object> cache =
+            ignite.createCache(cacheConfiguration(onHeapRowCacheSize, 
entityClasses));
+
+        try {
+            if (preloadInStreamer)
+                preload(cache.getName());
+
+            while (iterations-- >= 0) {
+                Map<Integer, Person> putMap1 = new TreeMap<>();
+
+                for (int i = 0; i < entitiesCnt; i++)
+                    putMap1.put(i, new Person(i, i + 1, String.valueOf(i), 
String.valueOf(i + 1), salary(i)));
+
+                cache.putAll(putMap1);
+
+                Map<Integer, Organization> putMap2 = new TreeMap<>();
+
+                for (int i = entitiesCnt / 2; i < entitiesCnt * 3 / 2; i++) {
+                    cache.remove(i);
+
+                    putMap2.put(i, new Organization(i, String.valueOf(i)));
+                }
+
+                cache.putAll(putMap2);
+            }
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingSingleTypeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingSingleTypeTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingSingleTypeTest.java
new file mode 100644
index 0000000..c59e5fe
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapBatchIndexingSingleTypeTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Tests various cache operations with indexing enabled.
+ * Cache contains single type.
+ */
+public class CacheOffheapBatchIndexingSingleTypeTest extends 
CacheOffheapBatchIndexingBaseTest {
+    /**
+     * Tests removal using EntryProcessor.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchRemove() throws Exception {
+        Ignite ignite = grid(0);
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(1,
+            new Class<?>[] {Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class});
+
+        final IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            int iterations = 50;
+
+            while (iterations-- >= 0) {
+                int total = 1000;
+
+                for (int id = 0; id < total; id++)
+                    cache.put(id, new 
CacheOffheapBatchIndexingBaseTest.Organization(id, "Organization " + id));
+
+                cache.invoke(0, new CacheEntryProcessor<Object, Object, 
Object>() {
+                    @Override public Object process(MutableEntry<Object, 
Object> entry, Object... args) {
+                        entry.remove();
+
+                        return null;
+                    }
+                });
+
+                QueryCursor<List<?>> q = cache.query(new 
SqlFieldsQuery("select _key,_val from Organization where id=0"));
+
+                assertEquals(0, q.getAll().size());
+
+                q = cache.query(new SqlFieldsQuery("select _key,_val from 
Organization where id=1"));
+
+                assertEquals(1, q.getAll().size());
+
+                assertEquals(total - 1, cache.size());
+
+                cache.removeAll();
+            }
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     *
+     */
+    public void testPutAllAndStreamer() {
+        doStreamerBatchTest(50,
+            1_000,
+            new Class<?>[] {Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class},
+            1,
+            true);
+    }
+
+    /**
+     *
+     */
+    public void testPutAllAndStreamerDfltOffHeapRowCacheSize() {
+        doStreamerBatchTest(50,
+            1_000,
+            new Class<?>[] {Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class},
+            CacheConfiguration.DFLT_SQL_ONHEAP_ROW_CACHE_SIZE,
+            true);
+    }
+
+    /**
+     *
+     */
+    public void testPuAllSingleEntity() {
+        doStreamerBatchTest(50,
+            1_000,
+            new Class<?>[] {Integer.class, 
CacheOffheapBatchIndexingBaseTest.Organization.class},
+            1,
+            false);
+    }
+
+    /**
+     * @param iterations Number of iterations.
+     * @param entitiesCnt Number of entities to put.
+     * @param entityClasses Entity classes.
+     * @param onHeapRowCacheSize Cache size.
+     * @param preloadInStreamer Data preload flag.
+     */
+    private void doStreamerBatchTest(int iterations,
+        int entitiesCnt,
+        Class<?>[] entityClasses,
+        int onHeapRowCacheSize,
+        boolean preloadInStreamer) {
+        Ignite ignite = grid(0);
+
+        final IgniteCache<Object, Object> cache =
+            ignite.createCache(cacheConfiguration(onHeapRowCacheSize, 
entityClasses));
+
+        try {
+            if (preloadInStreamer)
+                preload(cache.getName());
+
+            while (iterations-- >= 0) {
+                Map<Integer, Organization> putMap1 = new TreeMap<>();
+
+                for (int i = 0; i < entitiesCnt; i++)
+                    putMap1.put(i, new Organization(i, String.valueOf(i)));
+
+                cache.putAll(putMap1);
+
+                Map<Integer, Organization> putMap2 = new TreeMap<>();
+
+                for (int i = entitiesCnt / 2; i < entitiesCnt * 3 / 2; i++) {
+                    cache.remove(i);
+
+                    putMap2.put(i, new Organization(i, String.valueOf(i)));
+                }
+
+                cache.putAll(putMap2);
+            }
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/847399a8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 4b83f69..e48467e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import 
org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest;
+import 
org.apache.ignite.internal.processors.cache.CacheOffheapBatchIndexingSingleTypeTest;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheQueryOffheapEvictDataLostTest;
@@ -100,6 +101,9 @@ public class IgniteCacheQuerySelfTestSuite2 extends 
TestSuite {
         suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class);
         suite.addTestSuite(IgniteCacheNoClassQuerySelfTest.class);
 
+        // Other.
+        suite.addTestSuite(CacheOffheapBatchIndexingSingleTypeTest.class);
+
         return suite;
     }
 }

Reply via email to