This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4b5f94509c8 Update the Supervisor endpoint to not restart the
Supervisor if the spec was unmodified (#17707)
4b5f94509c8 is described below
commit 4b5f94509c88594baf5560ebb707aa85836c0a92
Author: aho135 <[email protected]>
AuthorDate: Tue Mar 4 19:33:18 2025 -0800
Update the Supervisor endpoint to not restart the Supervisor if the spec
was unmodified (#17707)
Add an optional query parameter called skipRestartIfUnmodified to the
/druid/indexer/v1/supervisor endpoint. Callers can set
skipRestartIfUnmodified=true
to not restart the supervisor if the spec is unchanged.
Example:
curl -X POST --header "Content-Type: application/json" -d @supervisor.json
localhost:8888/druid/indexer/v1/supervisor?skipRestartIfUnmodified=true
---
docs/api-reference/supervisor-api.md | 58 ++++++++++++++++++
.../overlord/supervisor/SupervisorManager.java | 54 ++++++++++++++--
.../overlord/supervisor/SupervisorResource.java | 9 ++-
.../overlord/supervisor/SupervisorManagerTest.java | 36 ++++++++---
.../supervisor/SupervisorResourceTest.java | 71 +++++++++++++++++++---
.../SeekableStreamIndexTaskTestBase.java | 2 +-
.../druid/metadata/SQLMetadataRuleManager.java | 4 ++
.../druid/metadata/SQLMetadataRuleManagerTest.java | 46 ++++++++++++++
8 files changed, 259 insertions(+), 21 deletions(-)
diff --git a/docs/api-reference/supervisor-api.md
b/docs/api-reference/supervisor-api.md
index 16afdd924f0..e43470f0e54 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -2353,6 +2353,64 @@ Content-Length: 1359
</TabItem>
</Tabs>
+#### Sample request with `skipRestartIfUnmodified`
+
+The following example sets the `skipRestartIfUnmodified` flag to true. With
this flag set to true, the Supervisor will only restart if there has been a
modification to the SupervisorSpec. If left unset, the flag defaults to false.
+```shell
+curl
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor?skipRestartIfUnmodified=true"
\
+--header 'Content-Type: application/json' \
+--data '{
+ "type": "kafka",
+ "spec": {
+ "ioConfig": {
+ "type": "kafka",
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9094"
+ },
+ "topic": "social_media",
+ "inputFormat": {
+ "type": "json"
+ },
+ "useEarliestOffset": true
+ },
+ "tuningConfig": {
+ "type": "kafka"
+ },
+ "dataSchema": {
+ "dataSource": "social_media",
+ "timestampSpec": {
+ "column": "__time",
+ "format": "iso"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "username",
+ "post_title",
+ {
+ "type": "long",
+ "name": "views"
+ },
+ {
+ "type": "long",
+ "name": "upvotes"
+ },
+ {
+ "type": "long",
+ "name": "comments"
+ },
+ "edited"
+ ]
+ },
+ "granularitySpec": {
+ "queryGranularity": "none",
+ "rollup": false,
+ "segmentGranularity": "hour"
+ }
+ }
+ }
+}'
+```
+
#### Sample response
<details>
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 731ddcaa136..85b6d392550 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -19,12 +19,15 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -42,6 +45,7 @@ import
org.apache.druid.segment.incremental.ParseExceptionReport;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -62,10 +66,12 @@ public class SupervisorManager
private final Object lock = new Object();
private volatile boolean started = false;
+ private final ObjectMapper jsonMapper;
@Inject
- public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager)
+ public SupervisorManager(@Json ObjectMapper jsonMapper,
MetadataSupervisorManager metadataSupervisorManager)
{
+ this.jsonMapper = jsonMapper;
this.metadataSupervisorManager = metadataSupervisorManager;
}
@@ -152,6 +158,12 @@ public class SupervisorManager
return true;
}
+ /**
+ * Creates or updates a supervisor and then starts it.
+ * If no change has been made to the supervisor spec, it is only restarted.
+ *
+ * @return true if the supervisor was updated, false otherwise
+ */
public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
@@ -161,11 +173,40 @@ public class SupervisorManager
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
+ final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
- return createAndStartSupervisorInternal(spec, true);
+ createAndStartSupervisorInternal(spec, shouldUpdateSpec);
+ return shouldUpdateSpec;
}
}
+ /**
+ * Checks whether the submitted SupervisorSpec differs from the current spec
in SupervisorManager's supervisor list.
+ * This is used in SupervisorResource specPost to determine whether the
Supervisor needs to be restarted
+ * @param spec The spec submitted
+ * @return boolean - true only if the spec has been modified, false otherwise
+ */
+ public boolean shouldUpdateSupervisor(SupervisorSpec spec)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(spec, "spec");
+ Preconditions.checkNotNull(spec.getId(), "spec.getId()");
+ Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ try {
+ byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
+ Pair<Supervisor, SupervisorSpec> currentSupervisor =
supervisors.get(spec.getId());
+ return currentSupervisor == null
+ || !Arrays.equals(specAsBytes,
jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
+ }
+ catch (JsonProcessingException ex) {
+ log.warn("Failed to write spec as bytes for spec_id[%s]",
spec.getId());
+ }
+ }
+ return true;
+ }
+
public boolean stopAndRemoveSupervisor(String id)
{
Preconditions.checkState(started, "SupervisorManager not started");
@@ -363,8 +404,13 @@ public class SupervisorManager
return true;
}
catch (Exception e) {
- log.error(e, "Failed to upgrade pending segment[%s] to new pending
segment[%s] on Supervisor[%s].",
- upgradedPendingSegment.getUpgradedFromSegmentId(),
upgradedPendingSegment.getId().getVersion(), supervisorId);
+ log.error(
+ e,
+ "Failed to upgrade pending segment[%s] to new pending segment[%s] on
Supervisor[%s].",
+ upgradedPendingSegment.getUpgradedFromSegmentId(),
+ upgradedPendingSegment.getId().getVersion(),
+ supervisorId
+ );
}
return false;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 3190835c3e6..354804239f5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -119,7 +119,11 @@ public class SupervisorResource
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public Response specPost(final SupervisorSpec spec, @Context final
HttpServletRequest req)
+ public Response specPost(
+ final SupervisorSpec spec,
+ @QueryParam("skipRestartIfUnmodified") Boolean skipRestartIfUnmodified,
+ @Context final HttpServletRequest req
+ )
{
return asLeaderWithSupervisorManager(
manager -> {
@@ -151,6 +155,9 @@ public class SupervisorResource
if (!authResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(authResult.getErrorMessage());
}
+ if (Boolean.TRUE.equals(skipRestartIfUnmodified) &&
!manager.shouldUpdateSupervisor(spec)) {
+ return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+ }
manager.createOrUpdateAndStartSupervisor(spec);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 97877052fa0..e7276c6aead 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -33,6 +34,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe
import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataSupervisorManager;
@@ -60,6 +62,8 @@ import java.util.Map;
@RunWith(EasyMockRunner.class)
public class SupervisorManagerTest extends EasyMockSupport
{
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
@Mock
private MetadataSupervisorManager metadataSupervisorManager;
@@ -80,7 +84,7 @@ public class SupervisorManagerTest extends EasyMockSupport
@Before
public void setUp()
{
- manager = new SupervisorManager(metadataSupervisorManager);
+ manager = new SupervisorManager(MAPPER, metadataSupervisorManager);
}
@Test
@@ -109,7 +113,6 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
resetAll();
- metadataSupervisorManager.insert("id1", spec2);
supervisor2.start();
supervisor1.stop(true);
replayAll();
@@ -175,6 +178,23 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testShouldUpdateSupervisor()
+ {
+ SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1);
+ SupervisorSpec spec2 = new TestSupervisorSpec("id2", supervisor2);
+ Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+ "id1", spec
+ );
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+ supervisor1.start();
+ replayAll();
+ manager.start();
+ Assert.assertFalse(manager.shouldUpdateSupervisor(spec));
+ Assert.assertTrue(manager.shouldUpdateSupervisor(spec2));
+ Assert.assertTrue(manager.shouldUpdateSupervisor(new
NoopSupervisorSpec("id1", null)));
+ }
+
@Test
public void testStopAndRemoveSupervisorNotStarted()
{
@@ -533,7 +553,7 @@ public class SupervisorManagerTest extends EasyMockSupport
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop",
ImmutableList.of("noopDS"));
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
- SeekableStreamSupervisorSpec suspendedSpec =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ SeekableStreamSupervisorSpec suspendedSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
Supervisor suspendedSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
@@ -544,7 +564,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.replay(suspendedSpec);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
- SeekableStreamSupervisorSpec activeSpec =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ SeekableStreamSupervisorSpec activeSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
Supervisor activeSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
@@ -555,7 +575,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.replay(activeSpec);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
- SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
Supervisor activeSupervisorWithConcurrentLocks =
EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
@@ -570,7 +590,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.replay(activeSpecWithConcurrentLocks);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
- SeekableStreamSupervisorSpec activeAppendSpec =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ SeekableStreamSupervisorSpec activeAppendSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
Supervisor activeAppendSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
@@ -585,7 +605,7 @@ public class SupervisorManagerTest extends EasyMockSupport
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
// A supervisor with useConcurrentLocks set to false explicitly must not
use an append lock
- SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
Supervisor supervisorWithUseConcurrentLocksFalse =
EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
@@ -639,7 +659,7 @@ public class SupervisorManagerTest extends EasyMockSupport
NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop",
ImmutableList.of("noopDS"));
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
- SeekableStreamSupervisorSpec streamingSpec =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ SeekableStreamSupervisorSpec streamingSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
SeekableStreamSupervisor streamSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
EasyMock.expectLastCall().once();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index f07c6c13ab8..febc959b0d1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -161,7 +161,7 @@ public class SupervisorResourceTest extends EasyMockSupport
replayAll();
- Response response = supervisorResource.specPost(spec, request);
+ Response response = supervisorResource.specPost(spec, false, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
@@ -171,12 +171,61 @@ public class SupervisorResourceTest extends
EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.specPost(spec, request);
+ response = supervisorResource.specPost(spec, false, request);
verifyAll();
Assert.assertEquals(503, response.getStatus());
}
+ @Test
+ public void testSpecPost_whenSkipRestartIfUnmodifiedIsTrue()
+ {
+ SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null)
+ {
+
+ @Override
+ public List<String> getDataSources()
+ {
+ return List.of("datasource1");
+ }
+ };
+
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(supervisorManager.shouldUpdateSupervisor(spec)).andReturn(false);
+
+ setupMockRequest();
+
+ EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+ replayAll();
+
+ Response response = supervisorResource.specPost(spec, true, request);
+ verifyAll();
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+
+ resetAll();
+
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(supervisorManager.shouldUpdateSupervisor(spec)).andReturn(true);
+
EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(spec)).andReturn(true);
+
+ setupMockRequest();
+ setupMockRequestForAudit();
+
+ EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+ auditManager.doAudit(EasyMock.anyObject());
+ EasyMock.expectLastCall().once();
+
+ replayAll();
+
+ response = supervisorResource.specPost(spec, true, request);
+ verifyAll();
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+ }
+
@Test
public void testSpecPostWithInputSourceSecurityEnabledAuthorized()
{
@@ -201,7 +250,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
replayAll();
- Response response = supervisorResource.specPost(spec, request);
+ Response response = supervisorResource.specPost(spec, false, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
@@ -211,10 +260,17 @@ public class SupervisorResourceTest extends
EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.specPost(spec, request);
+ response = supervisorResource.specPost(spec, false, request);
verifyAll();
Assert.assertEquals(503, response.getStatus());
+
+ resetAll();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+ replayAll();
+ response = supervisorResource.specPost(spec, null, request);
+ Assert.assertEquals(503, response.getStatus());
+ verifyAll();
}
@Test
@@ -230,7 +286,7 @@ public class SupervisorResourceTest extends EasyMockSupport
}
};
-
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@@ -238,10 +294,11 @@ public class SupervisorResourceTest extends
EasyMockSupport
).atLeastOnce();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true).times(2);
replayAll();
- Assert.assertThrows(ForbiddenException.class, () ->
supervisorResource.specPost(spec, request));
+ Assert.assertThrows(ForbiddenException.class, () ->
supervisorResource.specPost(spec, false, request));
+ Assert.assertThrows(ForbiddenException.class, () ->
supervisorResource.specPost(spec, null, request));
verifyAll();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 1144a77de36..214c024b454 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -609,7 +609,7 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
taskStorage,
metadataStorageCoordinator,
emitter,
- new SupervisorManager(null)
+ new SupervisorManager(OBJECT_MAPPER, null)
{
@Override
public boolean checkPointDataSourceMetadata(
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
index b83f77183aa..847b1eba6fd 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
@@ -336,6 +336,10 @@ public class SQLMetadataRuleManager implements
MetadataRuleManager
final String ruleString;
try {
ruleString = jsonMapper.writeValueAsString(newRules);
+ if
(ruleString.equals(jsonMapper.writeValueAsString(rules.get().get(dataSource))))
{
+ log.info("Retention rules unchanged for datasource[%s] with
rules[%s]", dataSource, ruleString);
+ return true;
+ }
log.info("Updating datasource[%s] with rules[%s] as per [%s]",
dataSource, ruleString, auditInfo);
}
catch (JsonProcessingException e) {
diff --git
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
index 82a87dd15a9..b3110912509 100644
---
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
@@ -115,6 +115,52 @@ public class SQLMetadataRuleManagerTest
Assert.assertEquals(rules.get(0),
allRules.get(TestDataSource.WIKI).get(0));
}
+ @Test
+ public void testAuditEntryNotCreatedWhenRetentionRulesUnchanged()
+ {
+ List<Rule> rules = Collections.singletonList(
+ new IntervalLoadRule(
+ Intervals.of("2015-01-01/2015-02-01"),
+ ImmutableMap.of(DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS),
+ null
+ )
+ );
+ ruleManager.overrideRule(TestDataSource.WIKI, rules,
createAuditInfo("override rule"));
+ ruleManager.overrideRule(TestDataSource.WIKI, rules,
createAuditInfo("override rule"));
+ ruleManager.overrideRule(TestDataSource.WIKI, rules,
createAuditInfo("override rule"));
+ Assert.assertEquals(1, auditManager.fetchAuditHistory(TestDataSource.WIKI,
"rules", 3).size());
+ }
+
+ @Test
+ public void testAuditEntryCreatedWhenRetentionRulesChanged()
+ {
+ List<Rule> rules = Collections.singletonList(
+ new IntervalLoadRule(
+ Intervals.of("2015-01-01/2015-02-01"),
+ ImmutableMap.of(DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS),
+ null
+ )
+ );
+ ruleManager.overrideRule(TestDataSource.WIKI, rules,
createAuditInfo("override rule"));
+ rules = Collections.singletonList(
+ new IntervalLoadRule(
+ Intervals.of("2015-01-01/2015-02-02"),
+ ImmutableMap.of(DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS),
+ null
+ )
+ );
+ ruleManager.overrideRule(TestDataSource.WIKI, rules,
createAuditInfo("override rule"));
+ rules = Collections.singletonList(
+ new IntervalLoadRule(
+ Intervals.of("2015-01-01/2015-02-03"),
+ ImmutableMap.of(DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS),
+ null
+ )
+ );
+ ruleManager.overrideRule(TestDataSource.WIKI, rules,
createAuditInfo("override rule"));
+ Assert.assertEquals(3, auditManager.fetchAuditHistory(TestDataSource.WIKI,
"rules", 3).size());
+ }
+
@Test
public void testOverrideRuleWithNull()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]