[ 
https://issues.apache.org/jira/browse/IGNITE-12349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Pereslegin resolved IGNITE-12349.
---------------------------------------
    Resolution: Cannot Reproduce

I think this has been fixed in IGNITE-13098.

> File transmission can cause the cluster to freeze.
> --------------------------------------------------
>
>                 Key: IGNITE-12349
>                 URL: https://issues.apache.org/jira/browse/IGNITE-12349
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.8
>            Reporter: Pavel Pereslegin
>            Assignee: Maxim Muzafarov
>            Priority: Critical
>
> When we initiating file transmission - a timeout object with mutable endTime 
> is added to the timeout processor "queue" (see 
> TcpCommunicationSpi#openChannel).
> Since endTime is mutable, a timeout for this object will never occur, 
> moreover, at some point, this object will be the first in the "queue" and 
> TimeoutProcessor will stop working at all.
> Reproducer
> {code:java}
> public class FileTransmissionTimeoutProcessorTest extends 
> GridCommonAbstractTest {
>     @After
>     public void after() throws Exception {
>         cleanPersistenceDir();
>         stopAllGrids();
>     }
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         return super.getConfiguration(igniteInstanceName)
>             .setDataStorageConfiguration(new DataStorageConfiguration()
>                 .setDefaultDataRegionConfiguration(new 
> DataRegionConfiguration()
>                     .setPersistenceEnabled(true)
>                     .setMaxSize(500L * 1024 * 1024)))
>             .setCacheConfiguration(new CacheConfiguration<Integer, 
> Integer>(DEFAULT_CACHE_NAME));
>     }
>     @Test
>     public void testChannelTimeoutObject() throws Exception {
>         IgniteEx snd = startGrid(0);
>         IgniteEx rcv = startGrid(1);
>         // Do some transfer between nodes.
>         initiateFileTransfer(snd, rcv);
>         GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
>         // Add new timeout object after file transmission timeout object.
>         snd.context().timeout().addTimeoutObject(new 
> GridTimeoutObjectAdapter(DFLT_CONN_TIMEOUT + 1_000) {
>             @Override public void onTimeout() {
>                 fut.onDone(true);
>             }
>         });
>         // The timeout processor will hang on the file transfer timeout 
> object and will never complete the remaining tasks.
>         boolean success = fut.get(DFLT_CONN_TIMEOUT + 30_000);
>         assertTrue(success);
>     }
>     /** */
>     private void initiateFileTransfer(IgniteEx snd, IgniteEx rcv) throws 
> IOException, IgniteCheckedException, InterruptedException {
>         snd.cluster().active(true);
>         awaitPartitionMapExchange();
>         try (IgniteDataStreamer<Integer, Integer> dataStreamer = 
> snd.dataStreamer(DEFAULT_CACHE_NAME)) {
>             dataStreamer.allowOverwrite(true);
>             for (int i = 0; i < 10_000; i++)
>                 dataStreamer.addData(i, i + DEFAULT_CACHE_NAME.hashCode());
>         }
>         Map<String, Long> fileSizes = new HashMap<>();
>         Map<String, Integer> fileCrcs = new HashMap<>();
>         Map<String, Serializable> fileParams = new HashMap<>();
>         
> assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
>         File tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), 
> "ctmp", true);
>         
> rcv.context().io().addTransmissionHandler(GridTopic.TOPIC_CACHE.topic("test", 
> 0), new TransmissionHandler() {
>             @Override public void onException(UUID nodeId, Throwable err) {
>                 // No-op.
>             }
>             @Override public String filePath(UUID nodeId, TransmissionMeta 
> fileMeta) {
>                 return new File(tempStore, fileMeta.name()).getAbsolutePath();
>             }
>             @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, 
> TransmissionMeta initMeta) {
>                 return null;
>             }
>             @Override public Consumer<File> fileHandler(UUID nodeId, 
> TransmissionMeta initMeta) {
>                 return new Consumer<File>() {
>                     @Override public void accept(File file) {
>                         assertTrue(fileSizes.containsKey(file.getName()));
>                         // Save all params.
>                         fileParams.putAll(initMeta.params());
>                     }
>                 };
>             }
>         });
>         IgniteInternalCache<Object, Object> defCache = 
> snd.cachex(DEFAULT_CACHE_NAME);
>         File cacheDirIg0 = ((FilePageStoreManager)(defCache).context()
>             .shared()
>             .pageStore()).cacheWorkDir(defCache.configuration());
>         File[] cacheParts = cacheDirIg0.listFiles(new FilenameFilter() {
>             @Override public boolean accept(File dir, String name) {
>                 return name.endsWith(FILE_SUFFIX);
>             }
>         });
>         for (File file : cacheParts) {
>             fileSizes.put(file.getName(), file.length());
>             fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
>         }
>         try (GridIoManager.TransmissionSender sender = snd.context()
>             .io()
>             .openTransmissionSender(rcv.localNode().id(), 
> GridTopic.TOPIC_CACHE.topic("test", 0))) {
>             // Iterate over cache partition cacheParts.
>             for (File file : cacheParts) {
>                 Map<String, Serializable> params = new HashMap<>();
>                 params.put(file.getName(), file.hashCode());
>                 sender.send(file, params, TransmissionPolicy.FILE);
>             }
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to