This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new 4ad7e96 add 'stopped' check and handling to HttpLoadQueuePeon load
and drop segment methods (#5555) (#5960)
4ad7e96 is described below
commit 4ad7e965c5b66ccf9bcca18202803ca73b6d8383
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 11:23:21 2018 -0700
add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment
methods (#5555) (#5960)
* add stopped check and handling to HttpLoadQueuePeon load and drop segment
methods
* fix unrelated timeout :(
* revert unintended change
* PR feedback: change logging
* fix dumb
---
.../server/coordinator/HttpLoadQueuePeon.java | 50 ++++---
.../coordinator/CuratorDruidCoordinatorTest.java | 2 +-
.../server/coordinator/HttpLoadQueuePeonTest.java | 151 ++++++++++++---------
3 files changed, 121 insertions(+), 82 deletions(-)
diff --git
a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
index dbeeb73..ece1d48 100644
--- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -28,16 +28,16 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import io.druid.java.util.http.client.response.ClientResponse;
import io.druid.java.util.http.client.response.InputStreamResponseHandler;
-import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
@@ -61,7 +61,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
@@ -261,6 +260,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
public void onFailure(Throwable t)
{
try {
+ responseHandler.description = t.toString();
logRequestFailure(t);
}
finally {
@@ -333,20 +333,15 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
new Duration(config.getHttpLoadQueuePeonRepeatDelay()),
- new Callable<ScheduledExecutors.Signal>()
- {
- @Override
- public ScheduledExecutors.Signal call()
- {
- if (!stopped) {
- doSegmentManagement();
- }
+ () -> {
+ if (!stopped) {
+ doSegmentManagement();
+ }
- if (stopped) {
- return ScheduledExecutors.Signal.STOP;
- } else {
- return ScheduledExecutors.Signal.REPEAT;
- }
+ if (stopped) {
+ return ScheduledExecutors.Signal.STOP;
+ } else {
+ return ScheduledExecutors.Signal.REPEAT;
}
}
);
@@ -364,11 +359,11 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
stopped = true;
for (SegmentHolder holder : segmentsToDrop.values()) {
- holder.requestSucceeded();
+ holder.requestFailed("Stopping load queue peon.");
}
for (SegmentHolder holder : segmentsToLoad.values()) {
- holder.requestSucceeded();
+ holder.requestFailed("Stopping load queue peon.");
}
segmentsToDrop.clear();
@@ -382,6 +377,16 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
public void loadSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
+ if (stopped) {
+ log.warn(
+ "Server[%s] cannot load segment[%s] because load queue peon is
stopped.",
+ serverId,
+ segment.getIdentifier()
+ );
+ callback.execute();
+ return;
+ }
+
SegmentHolder holder = segmentsToLoad.get(segment);
if (holder == null) {
@@ -398,6 +403,15 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
public void dropSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
+ if (stopped) {
+ log.warn(
+ "Server[%s] cannot drop segment[%s] because load queue peon is
stopped.",
+ serverId,
+ segment.getIdentifier()
+ );
+ callback.execute();
+ return;
+ }
SegmentHolder holder = segmentsToDrop.get(segment);
if (holder == null) {
diff --git
a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 9447c7b..0ed48ac 100644
---
a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++
b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -233,7 +233,7 @@ public class CuratorDruidCoordinatorTest extends
CuratorTestBase
tearDownServerAndCurator();
}
- @Test(timeout = 5_000)
+ @Test(timeout = 10_000)
public void testMoveSegment() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
diff --git
a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 72fb9a3..c238835 100644
---
a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++
b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -24,14 +24,14 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.HttpResponseHandler;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.HttpResponseHandler;
import io.druid.server.ServerTestHelper;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentLoadDropHandler;
@@ -57,40 +57,92 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class HttpLoadQueuePeonTest
{
+ final DataSegment segment1 = new DataSegment(
+ "test1", Intervals.of("2014/2015"), "v1",
+ null, null, null, null, 0, 0
+ );
+
+ final DataSegment segment2 = new DataSegment(
+ "test2", Intervals.of("2014/2015"), "v1",
+ null, null, null, null, 0, 0
+ );
+
+ final DataSegment segment3 = new DataSegment(
+ "test3", Intervals.of("2014/2015"), "v1",
+ null, null, null, null, 0, 0
+ );
+
+ final DataSegment segment4 = new DataSegment(
+ "test4", Intervals.of("2014/2015"), "v1",
+ null, null, null, null, 0, 0
+ );
+
+ final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 10,
+ null,
+ false,
+ false,
+ Duration.ZERO
+ )
+ {
+ @Override
+ public int getHttpLoadQueuePeonBatchSize()
+ {
+ return 2;
+ }
+ };
+
@Test(timeout = 10000)
public void testSimple() throws Exception
{
- final DataSegment segment1 = new DataSegment(
- "test1", Intervals.of("2014/2015"), "v1",
- null, null, null, null, 0, 0
+ HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
+ "http://dummy:4000",
+ ServerTestHelper.MAPPER,
+ new TestHttpClient(),
+ config,
+ Executors.newScheduledThreadPool(
+ 2,
+ Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
+ ),
+ Execs.singleThreaded("HttpLoadQueuePeonTest")
);
- final DataSegment segment2 = new DataSegment(
- "test2", Intervals.of("2014/2015"), "v1",
- null, null, null, null, 0, 0
- );
+ httpLoadQueuePeon.start();
- final DataSegment segment3 = new DataSegment(
- "test3", Intervals.of("2014/2015"), "v1",
- null, null, null, null, 0, 0
+ Map<String, CountDownLatch> latches = ImmutableMap.of(
+ segment1.getIdentifier(), new CountDownLatch(1),
+ segment2.getIdentifier(), new CountDownLatch(1),
+ segment3.getIdentifier(), new CountDownLatch(1),
+ segment4.getIdentifier(), new CountDownLatch(1)
);
- final DataSegment segment4 = new DataSegment(
- "test4", Intervals.of("2014/2015"), "v1",
- null, null, null, null, 0, 0
- );
+ httpLoadQueuePeon.dropSegment(segment1, () ->
latches.get(segment1.getIdentifier()).countDown());
+ httpLoadQueuePeon.loadSegment(segment2, () ->
latches.get(segment2.getIdentifier()).countDown());
+ httpLoadQueuePeon.dropSegment(segment3, () ->
latches.get(segment3.getIdentifier()).countDown());
+ httpLoadQueuePeon.loadSegment(segment4, () ->
latches.get(segment4.getIdentifier()).countDown());
+
+ latches.get(segment1.getIdentifier()).await();
+ latches.get(segment2.getIdentifier()).await();
+ latches.get(segment3.getIdentifier()).await();
+ latches.get(segment4.getIdentifier()).await();
+ httpLoadQueuePeon.stop();
+ }
+
+ @Test(timeout = 10000)
+ public void testLoadDropAfterStop() throws Exception
+ {
HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
ServerTestHelper.MAPPER,
new TestHttpClient(),
- new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10,
null, false, false, Duration.ZERO) {
- @Override
- public int getHttpLoadQueuePeonBatchSize()
- {
- return 2;
- }
- },
+ config,
Executors.newScheduledThreadPool(
2,
Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
@@ -107,48 +159,16 @@ public class HttpLoadQueuePeonTest
segment4.getIdentifier(), new CountDownLatch(1)
);
- httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback()
- {
- @Override
- public void execute()
- {
- latches.get(segment1.getIdentifier()).countDown();
- }
- });
-
- httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback()
- {
- @Override
- public void execute()
- {
- latches.get(segment2.getIdentifier()).countDown();
- }
- });
-
- httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback()
- {
- @Override
- public void execute()
- {
- latches.get(segment3.getIdentifier()).countDown();
- }
- });
-
- httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback()
- {
- @Override
- public void execute()
- {
- latches.get(segment4.getIdentifier()).countDown();
- }
- });
-
+ httpLoadQueuePeon.dropSegment(segment1, () ->
latches.get(segment1.getIdentifier()).countDown());
+ httpLoadQueuePeon.loadSegment(segment2, () ->
latches.get(segment2.getIdentifier()).countDown());
latches.get(segment1.getIdentifier()).await();
latches.get(segment2.getIdentifier()).await();
+ httpLoadQueuePeon.stop();
+ httpLoadQueuePeon.dropSegment(segment3, () ->
latches.get(segment3.getIdentifier()).countDown());
+ httpLoadQueuePeon.loadSegment(segment4, () ->
latches.get(segment4.getIdentifier()).countDown());
latches.get(segment3.getIdentifier()).await();
latches.get(segment4.getIdentifier()).await();
- httpLoadQueuePeon.stop();
}
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
@@ -191,12 +211,17 @@ public class HttpLoadQueuePeonTest
httpResponseHandler.handleResponse(httpResponse);
try {
List<DataSegmentChangeRequest> changeRequests =
ServerTestHelper.MAPPER.readValue(
- request.getContent().array(), new
TypeReference<List<DataSegmentChangeRequest>>() {}
+ request.getContent().array(), new
TypeReference<List<DataSegmentChangeRequest>>()
+ {
+ }
);
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>
statuses = new ArrayList<>(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
- statuses.add(new
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr,
SegmentLoadDropHandler.Status.SUCCESS));
+ statuses.add(new
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
+ cr,
+ SegmentLoadDropHandler.Status.SUCCESS
+ ));
}
return (ListenableFuture) Futures.immediateFuture(
new ByteArrayInputStream(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]