This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new de01de1 IGNITE-13540 Fix: exchange worker, waiting for new task from queue, considered as blocked. (#8324) de01de1 is described below commit de01de1e7ea791e004043bc54e0dd89c54d9e28d Author: Ivan Daschinskiy <ivanda...@gmail.com> AuthorDate: Wed Oct 7 16:57:10 2020 +0300 IGNITE-13540 Fix: exchange worker, waiting for new task from queue, considered as blocked. (#8324) --- .../cache/GridCachePartitionExchangeManager.java | 4 +- .../failure/ExchangeWorkerWaitingForTaskTest.java | 77 ++++++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fde6895..2a5cdfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -3253,11 +3253,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (isCancelled()) Thread.currentThread().interrupt(); - updateHeartbeat(); + blockingSectionBegin(); task = futQ.poll(timeout, MILLISECONDS); - updateHeartbeat(); + blockingSectionEnd(); if (task == null) continue; // Main while loop. diff --git a/modules/core/src/test/java/org/apache/ignite/failure/ExchangeWorkerWaitingForTaskTest.java b/modules/core/src/test/java/org/apache/ignite/failure/ExchangeWorkerWaitingForTaskTest.java new file mode 100644 index 0000000..ae15980 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/failure/ExchangeWorkerWaitingForTaskTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.failure; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Tests that taking new exchange task doesn't interpreted as blocked exchange thread. + * If network timeout, used as default poll timeout in exchange worker queue, is greater than + * system thread worker blocked timeout and exchange task is fast (few milliseconds), + * exchange thread, waiting for tasks from queue, should not be considered as blocked. + */ +public class ExchangeWorkerWaitingForTaskTest extends GridCommonAbstractTest { + /** */ + private static final long SYSTEM_WORKER_BLOCKED_TIMEOUT = 3000; + + /** */ + private final CompletableFuture<Void> falseBlockedExchangeFuture = new CompletableFuture<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setNetworkTimeout(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT); + cfg.setSystemWorkerBlockedTimeout(SYSTEM_WORKER_BLOCKED_TIMEOUT); + + cfg.setFailureHandler(new FailureHandler() { + @Override public boolean onFailure(Ignite ignite, FailureContext ctx) { + if (ctx.type() == FailureType.SYSTEM_WORKER_BLOCKED) { + String msg = ctx.error().getMessage(); + + if (msg.contains("partition-exchanger")) + falseBlockedExchangeFuture.complete(null); + } + + return false; + } + }); + + return cfg; + } + + /** */ + @Test + public void testHandlerNotReportFalseBlocking() throws Exception { + startGrid(1); + startGrid(2); + + GridTestUtils.assertThrows( + log(), + () -> falseBlockedExchangeFuture.get(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT, TimeUnit.MILLISECONDS), + TimeoutException.class, + null); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 0cffd3f..d548e38 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import org.apache.ignite.ClassPathContentLoggingTest; import org.apache.ignite.GridSuppressedExceptionSelfTest; import org.apache.ignite.events.ClusterActivationStartedEventTest; +import org.apache.ignite.failure.ExchangeWorkerWaitingForTaskTest; import org.apache.ignite.failure.FailureHandlerTriggeredTest; import org.apache.ignite.failure.OomFailureHandlerTest; import org.apache.ignite.failure.StopNodeFailureHandlerTest; @@ -248,6 +249,7 @@ import org.junit.runners.Suite; TransactionIntegrityWithSystemWorkerDeathTest.class, FailureProcessorLoggingTest.class, FailureProcessorThreadDumpThrottlingTest.class, + ExchangeWorkerWaitingForTaskTest.class, AtomicOperationsInTxTest.class,