Sergey Antonov created IGNITE-12774:
---------------------------------------
Summary: Transaction hungs after too many open files NIO exception
Key: IGNITE-12774
URL: https://issues.apache.org/jira/browse/IGNITE-12774
Project: Ignite
Issue Type: Bug
Reporter: Sergey Antonov
Assignee: Sergey Antonov
Transaction hung after “Open too many files” error and never been finished.
{code:java}
import java.net.SocketException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
public class TooManyOpenFilesTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setFailureHandler(new StopNodeOrHaltFailureHandler())
.setCommunicationSpi(new TooManyOpenFilesTcpCommunicationSpi())
.setConsistentId(igniteInstanceName);
}
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTest();
}
public void test() throws Exception {
IgniteEx crd = startGrids(3);
crd.cluster().active(true);
crd.getOrCreateCache(new
CacheConfiguration<>().setName(DEFAULT_CACHE_NAME).setAtomicityMode(TRANSACTIONAL).setBackups(1).setCacheMode(PARTITIONED));
TooManyOpenFilesTcpCommunicationSpi spi =
(TooManyOpenFilesTcpCommunicationSpi)grid(2).context().config().getCommunicationSpi();
try (Transaction tx =
grid(1).transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.REPEATABLE_READ)) {
IgniteCache<Object, Object> cache =
grid(1).cache(DEFAULT_CACHE_NAME);
cache.put(1, 1);
spi.throwException.set(true);
cache.put(2, 2);
cache.put(3, 2);
cache.put(4, 2);
// hungs here.
tx.commit();
}
for (int i=0; i < 3 ; i++) {
assertEquals(1, grid(i).cache(DEFAULT_CACHE_NAME).get(1));
assertEquals(2, grid(i).cache(DEFAULT_CACHE_NAME).get(2));
}
}
private static class TooManyOpenFilesTcpCommunicationSpi extends
TcpCommunicationSpi {
private final AtomicBoolean throwException = new AtomicBoolean();
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws
IgniteSpiException {
if (throwException.get())
throw getException(node);
super.sendMessage(node, msg);
}
/** {@inheritDoc} */
@Override public void sendMessage(
ClusterNode node,
Message msg,
IgniteInClosure<IgniteException> ackC
) throws IgniteSpiException {
if (throwException.get())
throw getException(node);
super.sendMessage(node, msg, ackC);
}
private IgniteSpiException getException(ClusterNode node) {
String checkedExceptionMsg = "Failed to connect to node (is node
still alive?). " +
"Make sure that each ComputeTask and cache Transaction has a
timeout set " +
"in order to prevent parties from waiting forever in case of
network issues " +
"[nodeId=" + node.id() + ", addrs=null]";
return new IgniteSpiException("Failed to send message to remote
node: " + node.id(), new IgniteCheckedException(checkedExceptionMsg, new
SocketException("Too many open files")));
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)