[
https://issues.apache.org/jira/browse/IGNITE-28662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mikhail Petrov updated IGNITE-28662:
------------------------------------
Description:
Consider a cluster of 3 nodes - node0, node1, node2
1. node1 and node2 receive put request from a thin client. node1 is "primary"
for cache keys received by node2 and node2 is "primary" for cache keys received
by node1. Both of them begin operations execution and wait for them to complete.
2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on
both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin
client operations to complete.
3. node1 and node2 fail to process cache request for some reason (a cache
interceptor raised an exceception)
4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed keys
to each other because they are both stopping (see GridCacheIoManager#onSend).
This message is an indication to the "near" node that some keys could not be
processed and the operation should be terminated with an exception.
5. node1 and node2 are unable to complete the cache operations received from
the thin client (both of them will never receive GridNearAtomicUpdateResponse
or NODE_LEFT event for the primary node ) -> they are unable to complete the
stop procedure
Reproducer:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import javax.cache.Cache;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
/** */
public class NodeStopForeverBlockedByAtomicCacheOperationsTest extends
GridCommonAbstractTest {
/** */
public static final int NODE_1_FIRST_KEY = 1;
/** */
public static final int NODE_1_SECOND_KEY = 4;
/** */
public static final int NODE_2_FIRST_KEY = 2;
/** */
public static final int NODE_2_SECOND_KEY = 5;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setCommunicationSpi(new TestRecordingCommunicationSpi())
.setUserAttributes(singletonMap(IDX_ATTR,
getTestIgniteInstanceIndex(igniteInstanceName)));
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/** */
@Test
public void testCacheEntriesProcessingFailureCausedByNodeStop() throws
Exception {
startGridsMultiThreaded(3);
TestInterceptor.putStartedLatch = new CountDownLatch(2);
TestInterceptor.putUnblockedLatch = new CountDownLatch(1);
grid(0).createCache(createTestCacheConfiguration());
try (
IgniteClient cli1 = Ignition.startClient(new
ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10801"));
IgniteClient cli2 = Ignition.startClient(new
ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10802"))
) {
IgniteInternalFuture<Object> putFut1 = GridTestUtils.runAsync(() ->
cli1.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(2)));
IgniteInternalFuture<Object> putFut2 = GridTestUtils.runAsync(() ->
cli2.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(1)));
assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(),
MILLISECONDS));
IgniteInternalFuture<Object> stopFut1 = GridTestUtils.runAsync(()
-> stopGrid(1));
IgniteInternalFuture<Object> stopFut2 = GridTestUtils.runAsync(()
-> stopGrid(2));
try {
TestInterceptor.putUnblockedLatch.countDown();
stopFut1.get(getTestTimeout());
stopFut2.get(getTestTimeout());
putFut1.get(getTestTimeout());
putFut2.get(getTestTimeout());
}
catch (CachePartialUpdateException e) {
assertTrue(e.getMessage().contains("Failed to update keys
(retry update if possible)"));
}
}
}
/** */
private CacheConfiguration<Integer, Integer> createTestCacheConfiguration()
{
return new CacheConfiguration<Integer, Integer>()
.setName(DEFAULT_CACHE_NAME)
.setAtomicityMode(ATOMIC)
.setWriteSynchronizationMode(FULL_SYNC)
.setBackups(2)
.setAffinity(new GridCacheModuloAffinityFunction(3, 2))
.setInterceptor(new TestInterceptor());
}
/** */
private Map<Integer, Integer> createKeysForNode(int nodeIdx) {
Map<Integer, Integer> data = new TreeMap<>();
if (nodeIdx == 2) {
data.put(NODE_2_FIRST_KEY, 2);
data.put(NODE_2_SECOND_KEY, 5);
}
else {
data.put(NODE_1_FIRST_KEY, 1);
data.put(NODE_1_SECOND_KEY, 4);
}
return data;
}
/** */
public static final class TestInterceptor implements
CacheInterceptor<Integer, Integer> {
/** */
public static CountDownLatch putStartedLatch;
/** */
public static CountDownLatch putUnblockedLatch;
/** {@inheritDoc} */
@Override public @Nullable Integer onGet(Integer key, @Nullable Integer
val) {
return val;
}
/** {@inheritDoc} */
@Override public @Nullable Integer onBeforePut(Cache.Entry<Integer,
Integer> entry, Integer newVal) {
if (entry.getKey() == NODE_1_FIRST_KEY || entry.getKey() ==
NODE_2_FIRST_KEY) {
putStartedLatch.countDown();
try {
assertTrue(putUnblockedLatch.await(10000, MILLISECONDS));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteException(e);
}
}
else
throw new IgniteException("expected");
return newVal;
}
/** {@inheritDoc} */
@Override public void onAfterPut(Cache.Entry<Integer, Integer> entry) {
// No-op.
}
/** {@inheritDoc} */
@Override public @Nullable IgniteBiTuple<Boolean, Integer>
onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
return null;
}
/** {@inheritDoc} */
@Override public void onAfterRemove(Cache.Entry<Integer, Integer>
entry) {
// No-op.
}
}
}
{code}
was:
Consider a cluster of 3 nodes - node0, node1, node2
1. node1 and node2 receive put request from a thin client. node1 is "primary"
for cache keys received by node2 and node2 is "primary" for cache keys received
by node1. Both of them begin operations execution and wait for them to complete.
2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on
both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin
client operations to complete.
3. node1 and node2 fail to process cache request for some reason (a cache
interceptor raised an exceception)
4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed keys
to each other because they are both stopping (see GridCacheIoManager#onSend).
This message is an indication to the "near" node that some keys could not be
processed and the operation should be terminated with an exception.
5. node1 and node2 are unable to complete the cache operations received from
the thin client (both of them will never receive GridNearAtomicUpdateResponse
or NODE_LEFT event for the primary node ) -> they are unable to complete the
stop procedure
> Node stop may be infinitely blocked by atomic cache operations invoked from
> thin client
> ---------------------------------------------------------------------------------------
>
> Key: IGNITE-28662
> URL: https://issues.apache.org/jira/browse/IGNITE-28662
> Project: Ignite
> Issue Type: Bug
> Reporter: Mikhail Petrov
> Priority: Major
>
> Consider a cluster of 3 nodes - node0, node1, node2
> 1. node1 and node2 receive put request from a thin client. node1 is "primary"
> for cache keys received by node2 and node2 is "primary" for cache keys
> received by node1. Both of them begin operations execution and wait for them
> to complete.
> 2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on
> both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin
> client operations to complete.
> 3. node1 and node2 fail to process cache request for some reason (a cache
> interceptor raised an exceception)
> 4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed
> keys to each other because they are both stopping (see
> GridCacheIoManager#onSend). This message is an indication to the "near" node
> that some keys could not be processed and the operation should be terminated
> with an exception.
> 5. node1 and node2 are unable to complete the cache operations received from
> the thin client (both of them will never receive GridNearAtomicUpdateResponse
> or NODE_LEFT event for the primary node ) -> they are unable to complete the
> stop procedure
> Reproducer:
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.ignite;
> import java.util.Map;
> import java.util.TreeMap;
> import java.util.concurrent.CountDownLatch;
> import javax.cache.Cache;
> import org.apache.ignite.cache.CacheInterceptor;
> import org.apache.ignite.cache.CachePartialUpdateException;
> import org.apache.ignite.client.IgniteClient;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.ClientConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.internal.IgniteInternalFuture;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.jetbrains.annotations.Nullable;
> import org.junit.Test;
> import static java.util.Collections.singletonMap;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
> import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
> import static
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
> /** */
> public class NodeStopForeverBlockedByAtomicCacheOperationsTest extends
> GridCommonAbstractTest {
> /** */
> public static final int NODE_1_FIRST_KEY = 1;
> /** */
> public static final int NODE_1_SECOND_KEY = 4;
> /** */
> public static final int NODE_2_FIRST_KEY = 2;
> /** */
> public static final int NODE_2_SECOND_KEY = 5;
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String
> igniteInstanceName) throws Exception {
> return super.getConfiguration(igniteInstanceName)
> .setCommunicationSpi(new TestRecordingCommunicationSpi())
> .setUserAttributes(singletonMap(IDX_ATTR,
> getTestIgniteInstanceIndex(igniteInstanceName)));
> }
> /** {@inheritDoc} */
> @Override protected void afterTest() throws Exception {
> super.afterTest();
> stopAllGrids();
> }
> /** */
> @Test
> public void testCacheEntriesProcessingFailureCausedByNodeStop() throws
> Exception {
> startGridsMultiThreaded(3);
> TestInterceptor.putStartedLatch = new CountDownLatch(2);
> TestInterceptor.putUnblockedLatch = new CountDownLatch(1);
> grid(0).createCache(createTestCacheConfiguration());
> try (
> IgniteClient cli1 = Ignition.startClient(new
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10801"));
> IgniteClient cli2 = Ignition.startClient(new
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10802"))
> ) {
> IgniteInternalFuture<Object> putFut1 = GridTestUtils.runAsync(()
> -> cli1.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(2)));
> IgniteInternalFuture<Object> putFut2 = GridTestUtils.runAsync(()
> -> cli2.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(1)));
>
> assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(),
> MILLISECONDS));
> IgniteInternalFuture<Object> stopFut1 = GridTestUtils.runAsync(()
> -> stopGrid(1));
> IgniteInternalFuture<Object> stopFut2 = GridTestUtils.runAsync(()
> -> stopGrid(2));
> try {
> TestInterceptor.putUnblockedLatch.countDown();
> stopFut1.get(getTestTimeout());
> stopFut2.get(getTestTimeout());
> putFut1.get(getTestTimeout());
> putFut2.get(getTestTimeout());
> }
> catch (CachePartialUpdateException e) {
> assertTrue(e.getMessage().contains("Failed to update keys
> (retry update if possible)"));
> }
> }
> }
> /** */
> private CacheConfiguration<Integer, Integer>
> createTestCacheConfiguration() {
> return new CacheConfiguration<Integer, Integer>()
> .setName(DEFAULT_CACHE_NAME)
> .setAtomicityMode(ATOMIC)
> .setWriteSynchronizationMode(FULL_SYNC)
> .setBackups(2)
> .setAffinity(new GridCacheModuloAffinityFunction(3, 2))
> .setInterceptor(new TestInterceptor());
> }
> /** */
> private Map<Integer, Integer> createKeysForNode(int nodeIdx) {
> Map<Integer, Integer> data = new TreeMap<>();
> if (nodeIdx == 2) {
> data.put(NODE_2_FIRST_KEY, 2);
> data.put(NODE_2_SECOND_KEY, 5);
> }
> else {
> data.put(NODE_1_FIRST_KEY, 1);
> data.put(NODE_1_SECOND_KEY, 4);
> }
> return data;
> }
> /** */
> public static final class TestInterceptor implements
> CacheInterceptor<Integer, Integer> {
> /** */
> public static CountDownLatch putStartedLatch;
> /** */
> public static CountDownLatch putUnblockedLatch;
> /** {@inheritDoc} */
> @Override public @Nullable Integer onGet(Integer key, @Nullable
> Integer val) {
> return val;
> }
> /** {@inheritDoc} */
> @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer,
> Integer> entry, Integer newVal) {
> if (entry.getKey() == NODE_1_FIRST_KEY || entry.getKey() ==
> NODE_2_FIRST_KEY) {
> putStartedLatch.countDown();
> try {
> assertTrue(putUnblockedLatch.await(10000, MILLISECONDS));
> }
> catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> throw new IgniteException(e);
> }
> }
> else
> throw new IgniteException("expected");
> return newVal;
> }
> /** {@inheritDoc} */
> @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry)
> {
> // No-op.
> }
> /** {@inheritDoc} */
> @Override public @Nullable IgniteBiTuple<Boolean, Integer>
> onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
> return null;
> }
> /** {@inheritDoc} */
> @Override public void onAfterRemove(Cache.Entry<Integer, Integer>
> entry) {
> // No-op.
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)