xtern commented on a change in pull request #9047: URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java ########## @@ -81,134 +90,125 @@ public DmsDataWriterWorker( super(igniteInstanceName, "dms-writer", log); this.lock = lock; this.errorHnd = errorHnd; + + pauseTask = new FutureTask<>(() -> AWAIT); + // Completed future task. + pauseTask.run(); + + // Put restore task to the queue, so it will be executed on worker start. + updateQueue.offer(newDmsTask(this::restore)); } /** */ public void setMetaStorage(ReadWriteMetastorage metastorage) { this.metastorage = metastorage; } - /** */ - public void update(DistributedMetaStorageHistoryItem histItem) { - updateQueue.offer(histItem); + /** + * @return Future which will be completed will all the tasks prior to the pause task completed. + */ + public Future<?> flush() { + return pauseTask; } - /** */ - public void update(DistributedMetaStorageClusterNodeData fullNodeData) { - assert fullNodeData.fullData != null; - assert fullNodeData.hist != null; - - updateQueue.clear(); + /** + * @param compFut Future which should be completed when worker may proceed with updates. + */ + public void pause(IgniteInternalFuture<?> compFut) { + latch = new CountDownLatch(1); - updateQueue.offer(fullNodeData); - } + updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT)); - /** */ - public void removeHistItem(long ver) { - updateQueue.offer(ver); + compFut.listen(f -> latch.countDown()); } /** */ - public void cancel(boolean halt) throws InterruptedException { - if (halt) - updateQueue.clear(); + public void update(DistributedMetaStorageHistoryItem histItem) { + updateQueue.offer(newDmsTask(() -> { + metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem); - updateQueue.offer(status = halt ? HALT : CANCEL); + workerDmsVer = workerDmsVer.nextVersion(histItem); - Thread runner = runner(); + metastorage.write(versionKey(), workerDmsVer); - if (runner != null) - runner.join(); + for (int i = 0, len = histItem.keys().length; i < len; i++) + write(histItem.keys()[i], histItem.valuesBytesArray()[i]); + })); } - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - status = CONTINUE; + /** */ + public void update(DistributedMetaStorageClusterNodeData fullNodeData) { + assert fullNodeData.fullData != null; + assert fullNodeData.hist != null; - try { - if (firstStart) { - firstStart = false; + updateQueue.clear(); - lock.lock(); + updateQueue.offer(newDmsTask(() -> { + metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE); - try { - restore(); - } - finally { - lock.unlock(); - } - } + doCleanup(); - while (true) { - Object update = updateQueue.peek(); + for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData) + metastorage.writeRaw(localKey(item.key), item.valBytes); - try { - update = updateQueue.take(); - } - catch (InterruptedException ignore) { - } + for (int i = 0, len = fullNodeData.hist.length; i < len; i++) { + DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i]; - lock.lock(); + long histItemVer = fullNodeData.ver.id() + i - (len - 1); - try { - // process update - if (update instanceof DistributedMetaStorageHistoryItem) - applyUpdate((DistributedMetaStorageHistoryItem)update); - else if (update instanceof DistributedMetaStorageClusterNodeData) { - DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update; + metastorage.write(historyItemKey(histItemVer), histItem); + } - metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE); + metastorage.write(versionKey(), fullNodeData.ver); - doCleanup(); + workerDmsVer = fullNodeData.ver; - for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData) - metastorage.writeRaw(localKey(item.key), item.valBytes); + metastorage.remove(cleanupGuardKey()); + })); + } - for (int i = 0, len = fullNodeData.hist.length; i < len; i++) { - DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i]; + /** */ + public void removeHistItem(long ver) { + updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver)))); + } - long histItemVer = fullNodeData.ver.id() + i - (len - 1); + /** */ + public void cancel(boolean halt) { + if (halt) + updateQueue.clear(); - metastorage.write(historyItemKey(histItemVer), histItem); - } + updateQueue.offer(new FutureTask<>(() -> STOP)); - metastorage.write(versionKey(), fullNodeData.ver); + U.join(runner(), log); + } - workerDmsVer = fullNodeData.ver; + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + while (true) { + try { + curTask = updateQueue.take(); + } + catch (InterruptedException ignore) { + } - metastorage.remove(cleanupGuardKey()); - } - else if (update instanceof Long) { - long ver = (Long)update; + curTask.run(); Review comment: Since ``curTask`` is currently a class field we might get an NPE or run the previous task twice (if ``take()`` throws InterruptedException) and that's not good either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org