This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-25590 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 00ea001d6a34834b7258befaf577fd90ffcaddb9 Author: Kirill Tkalenko <[email protected]> AuthorDate: Wed Aug 20 13:18:20 2025 +0300 IGNITE-25590 wip --- .../internal/thread/IgniteThreadFactory.java | 15 +++++- .../checkpoint/CheckpointReadWriteLock.java | 2 +- .../persistence/checkpoint/CheckpointWorkflow.java | 9 ++-- .../persistence/checkpoint/Checkpointer.java | 4 +- .../checkpoint/IgniteCheckpointThread.java | 57 ++++++++++++++++++++ .../checkpoint/IgniteCheckpointThreadFactory.java | 61 ++++++++++++++++++++++ 6 files changed, 139 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThreadFactory.java index 72eb8b5da13..a40e6e36b35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThreadFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThreadFactory.java @@ -44,7 +44,7 @@ public class IgniteThreadFactory implements ThreadFactory { /** * Constructor. */ - private IgniteThreadFactory(String nodeName, String poolName, boolean daemon, IgniteLogger log, ThreadOperation[] allowedOperations) { + protected IgniteThreadFactory(String nodeName, String poolName, boolean daemon, IgniteLogger log, ThreadOperation[] allowedOperations) { this(IgniteThread.threadPrefix(nodeName, poolName), daemon, log, allowedOperations); } @@ -60,7 +60,7 @@ public class IgniteThreadFactory implements ThreadFactory { @Override public IgniteThread newThread(Runnable r) { - IgniteThread t = new IgniteThread(prefix + counter.getAndIncrement(), r, allowedOperations); + IgniteThread t = createIgniteThread(prefix + counter.getAndIncrement(), r, allowedOperations); t.setDaemon(this.daemon); t.setUncaughtExceptionHandler(exHnd); @@ -68,6 +68,17 @@ public class IgniteThreadFactory implements ThreadFactory { return t; } + /** + * Creates ignite thread with given name. + * + * @param finalName Name of thread. + * @param r Runnable to execute. + * @param allowedOperations Operations which this thread allows to execute. + */ + protected IgniteThread createIgniteThread(String finalName, Runnable r, ThreadOperation... allowedOperations) { + return new IgniteThread(finalName, r, allowedOperations); + } + /** * Returns the prefix used for thread names. * diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java index aec7425315a..9f609efcf8f 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java @@ -125,7 +125,7 @@ public class CheckpointReadWriteLock { public boolean checkpointLockIsHeldByThread() { return isWriteLockHeldByCurrentThread() || checkpointReadLockHoldCount.get() > 0 - || Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX); + || Thread.currentThread() instanceof IgniteCheckpointThread; } /** diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java index fc255fcfbdc..c1c95fe7fe1 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java @@ -59,7 +59,6 @@ import org.apache.ignite.internal.pagememory.PageMemory; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.thread.IgniteThread; -import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.CollectionUtils; import org.jetbrains.annotations.Nullable; @@ -146,8 +145,12 @@ class CheckpointWorkflow { if (checkpointThreads > 1) { callbackListenerThreadPool = Executors.newFixedThreadPool( checkpointThreads, - // TODO IGNITE-25590 Add node name. - IgniteThreadFactory.createWithFixedPrefix(CHECKPOINT_RUNNER_THREAD_PREFIX + "-io", false, LOG) + IgniteCheckpointThreadFactory.create( + igniteInstanceName, + CHECKPOINT_RUNNER_THREAD_PREFIX + "-callback-listener", + false, + LOG + ) ); } else { callbackListenerThreadPool = null; diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index db6106d3b57..29fb968d8b9 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -69,7 +69,6 @@ import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStor import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; import org.apache.ignite.internal.thread.IgniteThread; -import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue; import org.apache.ignite.internal.util.worker.IgniteWorker; import org.apache.ignite.internal.util.worker.WorkProgressDispatcher; @@ -235,8 +234,7 @@ public class Checkpointer extends IgniteWorker { 0L, MILLISECONDS, new LinkedBlockingQueue<>(), - // TODO IGNITE-25590 Add node name. - IgniteThreadFactory.createWithFixedPrefix(CHECKPOINT_RUNNER_THREAD_PREFIX + "-io", false, log) + IgniteCheckpointThreadFactory.create(igniteInstanceName, CHECKPOINT_RUNNER_THREAD_PREFIX + "-io", false, log) ); } else { checkpointWritePagesPool = null; diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteCheckpointThread.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteCheckpointThread.java new file mode 100644 index 00000000000..1b4336c9dad --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteCheckpointThread.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.checkpoint; + +import org.apache.ignite.internal.thread.IgniteThread; +import org.apache.ignite.internal.thread.ThreadOperation; +import org.apache.ignite.internal.util.worker.IgniteWorker; + +/** Extension for {@link CheckpointReadWriteLock}. */ +class IgniteCheckpointThread extends IgniteThread { + /** + * Creates thread with given worker. + * + * @param worker Worker to create thread with. + */ + IgniteCheckpointThread(IgniteWorker worker) { + super(worker); + } + + /** + * Creates thread with given name for a given Ignite instance. + * + * @param nodeName Name of the Ignite instance this thread is created for. + * @param threadName Name of thread (will be added to the node name to form final name). + * @param r Runnable to execute. + * @param allowedOperations Operations which this thread allows to execute. + */ + IgniteCheckpointThread(String nodeName, String threadName, Runnable r, ThreadOperation... allowedOperations) { + super(nodeName, threadName, r, allowedOperations); + } + + /** + * Creates thread with given name. + * + * @param finalName Name of thread. + * @param r Runnable to execute. + * @param allowedOperations Operations which this thread allows to execute. + */ + IgniteCheckpointThread(String finalName, Runnable r, ThreadOperation... allowedOperations) { + super(finalName, r, allowedOperations); + } +} diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteCheckpointThreadFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteCheckpointThreadFactory.java new file mode 100644 index 00000000000..af51009c395 --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteCheckpointThreadFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.checkpoint; + +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.IgniteThreadFactory; +import org.apache.ignite.internal.thread.ThreadOperation; + +/** Extension for creating {@link IgniteCheckpointThread}. */ +class IgniteCheckpointThreadFactory extends IgniteThreadFactory { + /** Constructor. */ + IgniteCheckpointThreadFactory( + String nodeName, + String poolName, + boolean daemon, + IgniteLogger log, + ThreadOperation[] allowedOperations + ) { + super(nodeName, poolName, daemon, log, allowedOperations); + } + + /** + * Creates a thread factory based on a node's name and a name of the pool. + * + * @param nodeName Node name. + * @param poolName Pool name. + * @param daemon Whether threads created by the factory should be daemon or not. + * @param logger Logger. + * @param allowedOperations Operations that are allowed to be executed on threads produced by this factory. + * @return Thread factory. + */ + public static IgniteCheckpointThreadFactory create( + String nodeName, + String poolName, + boolean daemon, + IgniteLogger logger, + ThreadOperation... allowedOperations + ) { + return new IgniteCheckpointThreadFactory(nodeName, poolName, daemon, logger, allowedOperations); + } + + @Override + protected IgniteCheckpointThread createIgniteThread(String finalName, Runnable r, ThreadOperation... allowedOperations) { + return new IgniteCheckpointThread(finalName, r, allowedOperations); + } +}
