[ https://issues.apache.org/jira/browse/IGNITE-12349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pavel Pereslegin resolved IGNITE-12349. --------------------------------------- Resolution: Cannot Reproduce I think this has been fixed in IGNITE-13098. > File transmission can cause the cluster to freeze. > -------------------------------------------------- > > Key: IGNITE-12349 > URL: https://issues.apache.org/jira/browse/IGNITE-12349 > Project: Ignite > Issue Type: Bug > Affects Versions: 2.8 > Reporter: Pavel Pereslegin > Assignee: Maxim Muzafarov > Priority: Critical > > When we initiating file transmission - a timeout object with mutable endTime > is added to the timeout processor "queue" (see > TcpCommunicationSpi#openChannel). > Since endTime is mutable, a timeout for this object will never occur, > moreover, at some point, this object will be the first in the "queue" and > TimeoutProcessor will stop working at all. > Reproducer > {code:java} > public class FileTransmissionTimeoutProcessorTest extends > GridCommonAbstractTest { > @After > public void after() throws Exception { > cleanPersistenceDir(); > stopAllGrids(); > } > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String > igniteInstanceName) throws Exception { > return super.getConfiguration(igniteInstanceName) > .setDataStorageConfiguration(new DataStorageConfiguration() > .setDefaultDataRegionConfiguration(new > DataRegionConfiguration() > .setPersistenceEnabled(true) > .setMaxSize(500L * 1024 * 1024))) > .setCacheConfiguration(new CacheConfiguration<Integer, > Integer>(DEFAULT_CACHE_NAME)); > } > @Test > public void testChannelTimeoutObject() throws Exception { > IgniteEx snd = startGrid(0); > IgniteEx rcv = startGrid(1); > // Do some transfer between nodes. > initiateFileTransfer(snd, rcv); > GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>(); > // Add new timeout object after file transmission timeout object. > snd.context().timeout().addTimeoutObject(new > GridTimeoutObjectAdapter(DFLT_CONN_TIMEOUT + 1_000) { > @Override public void onTimeout() { > fut.onDone(true); > } > }); > // The timeout processor will hang on the file transfer timeout > object and will never complete the remaining tasks. > boolean success = fut.get(DFLT_CONN_TIMEOUT + 30_000); > assertTrue(success); > } > /** */ > private void initiateFileTransfer(IgniteEx snd, IgniteEx rcv) throws > IOException, IgniteCheckedException, InterruptedException { > snd.cluster().active(true); > awaitPartitionMapExchange(); > try (IgniteDataStreamer<Integer, Integer> dataStreamer = > snd.dataStreamer(DEFAULT_CACHE_NAME)) { > dataStreamer.allowOverwrite(true); > for (int i = 0; i < 10_000; i++) > dataStreamer.addData(i, i + DEFAULT_CACHE_NAME.hashCode()); > } > Map<String, Long> fileSizes = new HashMap<>(); > Map<String, Integer> fileCrcs = new HashMap<>(); > Map<String, Serializable> fileParams = new HashMap<>(); > > assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode())); > File tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), > "ctmp", true); > > rcv.context().io().addTransmissionHandler(GridTopic.TOPIC_CACHE.topic("test", > 0), new TransmissionHandler() { > @Override public void onException(UUID nodeId, Throwable err) { > // No-op. > } > @Override public String filePath(UUID nodeId, TransmissionMeta > fileMeta) { > return new File(tempStore, fileMeta.name()).getAbsolutePath(); > } > @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, > TransmissionMeta initMeta) { > return null; > } > @Override public Consumer<File> fileHandler(UUID nodeId, > TransmissionMeta initMeta) { > return new Consumer<File>() { > @Override public void accept(File file) { > assertTrue(fileSizes.containsKey(file.getName())); > // Save all params. > fileParams.putAll(initMeta.params()); > } > }; > } > }); > IgniteInternalCache<Object, Object> defCache = > snd.cachex(DEFAULT_CACHE_NAME); > File cacheDirIg0 = ((FilePageStoreManager)(defCache).context() > .shared() > .pageStore()).cacheWorkDir(defCache.configuration()); > File[] cacheParts = cacheDirIg0.listFiles(new FilenameFilter() { > @Override public boolean accept(File dir, String name) { > return name.endsWith(FILE_SUFFIX); > } > }); > for (File file : cacheParts) { > fileSizes.put(file.getName(), file.length()); > fileCrcs.put(file.getName(), FastCrc.calcCrc(file)); > } > try (GridIoManager.TransmissionSender sender = snd.context() > .io() > .openTransmissionSender(rcv.localNode().id(), > GridTopic.TOPIC_CACHE.topic("test", 0))) { > // Iterate over cache partition cacheParts. > for (File file : cacheParts) { > Map<String, Serializable> params = new HashMap<>(); > params.put(file.getName(), file.hashCode()); > sender.send(file, params, TransmissionPolicy.FILE); > } > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)