IGNITE-5686 Endless partition eviction during node shutdown
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a668a224 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a668a224 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a668a224 Branch: refs/heads/ignite-2.1 Commit: a668a224a8c69afd618667fa19309f1b7a0c8ca3 Parents: 4f3b69c Author: Igor Seliverstov <gvvinbl...@gmail.com> Authored: Tue Jul 11 10:40:56 2017 +0200 Committer: Igor Seliverstov <gvvinbl...@gmail.com> Committed: Tue Jul 11 10:40:56 2017 +0200 ---------------------------------------------------------------------- .../src/main/scala/org/apache/ignite/spark/IgniteRDD.scala | 6 +++--- .../main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala | 6 +++++- .../org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java | 5 ----- 3 files changed, 8 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 81509d0..78e2223 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -60,11 +60,11 @@ class IgniteRDD[K, V] ( val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index) - val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index) + val cur = cache.query(qry) - val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator() + TaskContext.get().addTaskCompletionListener((_) â cur.close()) - new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry â { + new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](cur.iterator(), entry â { (entry.getKey, entry.getValue) }) } http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala index f074572..f386f26 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala @@ -33,7 +33,11 @@ class IgniteSqlRDD[R: ClassTag, T, K, V]( keepBinary: Boolean ) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) { override def compute(split: Partition, context: TaskContext): Iterator[R] = { - new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv) + val cur = ensureCache().query(qry) + + TaskContext.get().addTaskCompletionListener((_) â cur.close()) + + new IgniteQueryIterator[T, R](cur.iterator(), conv) } override protected def getPartitions: Array[Partition] = { http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java index 5477d43..49bb1ac 100644 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java @@ -100,11 +100,6 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-5690"); - } - - /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); }