This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b5f94509c8 Update the Supervisor endpoint to not restart the 
Supervisor if the spec was unmodified (#17707)
4b5f94509c8 is described below

commit 4b5f94509c88594baf5560ebb707aa85836c0a92
Author: aho135 <[email protected]>
AuthorDate: Tue Mar 4 19:33:18 2025 -0800

    Update the Supervisor endpoint to not restart the Supervisor if the spec 
was unmodified (#17707)
    
    Add an optional query parameter called skipRestartIfUnmodified to the
    /druid/indexer/v1/supervisor endpoint. Callers can set 
skipRestartIfUnmodified=true
    to not restart the supervisor if the spec is unchanged.
    
    Example:
    
    curl -X POST --header "Content-Type: application/json" -d @supervisor.json
    localhost:8888/druid/indexer/v1/supervisor?skipRestartIfUnmodified=true
---
 docs/api-reference/supervisor-api.md               | 58 ++++++++++++++++++
 .../overlord/supervisor/SupervisorManager.java     | 54 ++++++++++++++--
 .../overlord/supervisor/SupervisorResource.java    |  9 ++-
 .../overlord/supervisor/SupervisorManagerTest.java | 36 ++++++++---
 .../supervisor/SupervisorResourceTest.java         | 71 +++++++++++++++++++---
 .../SeekableStreamIndexTaskTestBase.java           |  2 +-
 .../druid/metadata/SQLMetadataRuleManager.java     |  4 ++
 .../druid/metadata/SQLMetadataRuleManagerTest.java | 46 ++++++++++++++
 8 files changed, 259 insertions(+), 21 deletions(-)

diff --git a/docs/api-reference/supervisor-api.md 
b/docs/api-reference/supervisor-api.md
index 16afdd924f0..e43470f0e54 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -2353,6 +2353,64 @@ Content-Length: 1359
 </TabItem>
 </Tabs>
 
+#### Sample request with `skipRestartIfUnmodified`
+
+The following example sets the `skipRestartIfUnmodified` flag to true. With 
this flag set to true, the Supervisor will only restart if there has been a 
modification to the SupervisorSpec. If left unset, the flag defaults to false.
+```shell
+curl 
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor?skipRestartIfUnmodified=true";
 \
+--header 'Content-Type: application/json' \
+--data '{
+    "type": "kafka",
+    "spec": {
+        "ioConfig": {
+            "type": "kafka",
+            "consumerProperties": {
+                "bootstrap.servers": "localhost:9094"
+            },
+            "topic": "social_media",
+            "inputFormat": {
+                "type": "json"
+            },
+            "useEarliestOffset": true
+        },
+        "tuningConfig": {
+            "type": "kafka"
+        },
+        "dataSchema": {
+            "dataSource": "social_media",
+            "timestampSpec": {
+                "column": "__time",
+                "format": "iso"
+            },
+            "dimensionsSpec": {
+                "dimensions": [
+                    "username",
+                    "post_title",
+                    {
+                        "type": "long",
+                        "name": "views"
+                    },
+                    {
+                        "type": "long",
+                        "name": "upvotes"
+                    },
+                    {
+                        "type": "long",
+                        "name": "comments"
+                    },
+                    "edited"
+                ]
+            },
+            "granularitySpec": {
+                "queryGranularity": "none",
+                "rollup": false,
+                "segmentGranularity": "hour"
+            }
+        }
+    }
+}'
+```
+
 #### Sample response
 
 <details>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 731ddcaa136..85b6d392550 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -19,12 +19,15 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -42,6 +45,7 @@ import 
org.apache.druid.segment.incremental.ParseExceptionReport;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,10 +66,12 @@ public class SupervisorManager
   private final Object lock = new Object();
 
   private volatile boolean started = false;
+  private final ObjectMapper jsonMapper;
 
   @Inject
-  public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager)
+  public SupervisorManager(@Json ObjectMapper jsonMapper, 
MetadataSupervisorManager metadataSupervisorManager)
   {
+    this.jsonMapper = jsonMapper;
     this.metadataSupervisorManager = metadataSupervisorManager;
   }
 
@@ -152,6 +158,12 @@ public class SupervisorManager
     return true;
   }
 
+  /**
+   * Creates or updates a supervisor and then starts it.
+   * If no change has been made to the supervisor spec, it is only restarted.
+   *
+   * @return true if the supervisor was updated, false otherwise
+   */
   public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
   {
     Preconditions.checkState(started, "SupervisorManager not started");
@@ -161,11 +173,40 @@ public class SupervisorManager
 
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
+      final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
       possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
-      return createAndStartSupervisorInternal(spec, true);
+      createAndStartSupervisorInternal(spec, shouldUpdateSpec);
+      return shouldUpdateSpec;
     }
   }
 
+  /**
+   * Checks whether the submitted SupervisorSpec differs from the current spec 
in SupervisorManager's supervisor list.
+   * This is used in SupervisorResource specPost to determine whether the 
Supervisor needs to be restarted
+   * @param spec The spec submitted
+   * @return boolean - true only if the spec has been modified, false otherwise
+   */
+  public boolean shouldUpdateSupervisor(SupervisorSpec spec)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(spec, "spec");
+    Preconditions.checkNotNull(spec.getId(), "spec.getId()");
+    Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      try {
+        byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
+        Pair<Supervisor, SupervisorSpec> currentSupervisor = 
supervisors.get(spec.getId());
+        return currentSupervisor == null
+               || !Arrays.equals(specAsBytes, 
jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
+      }
+      catch (JsonProcessingException ex) {
+        log.warn("Failed to write spec as bytes for spec_id[%s]", 
spec.getId());
+      }
+    }
+    return true;
+  }
+
   public boolean stopAndRemoveSupervisor(String id)
   {
     Preconditions.checkState(started, "SupervisorManager not started");
@@ -363,8 +404,13 @@ public class SupervisorManager
       return true;
     }
     catch (Exception e) {
-      log.error(e, "Failed to upgrade pending segment[%s] to new pending 
segment[%s] on Supervisor[%s].",
-                upgradedPendingSegment.getUpgradedFromSegmentId(), 
upgradedPendingSegment.getId().getVersion(), supervisorId);
+      log.error(
+          e,
+          "Failed to upgrade pending segment[%s] to new pending segment[%s] on 
Supervisor[%s].",
+          upgradedPendingSegment.getUpgradedFromSegmentId(),
+          upgradedPendingSegment.getId().getVersion(),
+          supervisorId
+      );
     }
     return false;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 3190835c3e6..354804239f5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -119,7 +119,11 @@ public class SupervisorResource
   @POST
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
-  public Response specPost(final SupervisorSpec spec, @Context final 
HttpServletRequest req)
+  public Response specPost(
+      final SupervisorSpec spec,
+      @QueryParam("skipRestartIfUnmodified") Boolean skipRestartIfUnmodified,
+      @Context final HttpServletRequest req
+  )
   {
     return asLeaderWithSupervisorManager(
         manager -> {
@@ -151,6 +155,9 @@ public class SupervisorResource
           if (!authResult.allowAccessWithNoRestriction()) {
             throw new ForbiddenException(authResult.getErrorMessage());
           }
+          if (Boolean.TRUE.equals(skipRestartIfUnmodified) && 
!manager.shouldUpdateSupervisor(spec)) {
+            return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+          }
 
           manager.createOrUpdateAndStartSupervisor(spec);
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 97877052fa0..e7276c6aead 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -33,6 +34,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe
 import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.metadata.MetadataSupervisorManager;
@@ -60,6 +62,8 @@ import java.util.Map;
 @RunWith(EasyMockRunner.class)
 public class SupervisorManagerTest extends EasyMockSupport
 {
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
   @Mock
   private MetadataSupervisorManager metadataSupervisorManager;
 
@@ -80,7 +84,7 @@ public class SupervisorManagerTest extends EasyMockSupport
   @Before
   public void setUp()
   {
-    manager = new SupervisorManager(metadataSupervisorManager);
+    manager = new SupervisorManager(MAPPER, metadataSupervisorManager);
   }
 
   @Test
@@ -109,7 +113,6 @@ public class SupervisorManagerTest extends EasyMockSupport
     verifyAll();
 
     resetAll();
-    metadataSupervisorManager.insert("id1", spec2);
     supervisor2.start();
     supervisor1.stop(true);
     replayAll();
@@ -175,6 +178,23 @@ public class SupervisorManagerTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testShouldUpdateSupervisor()
+  {
+    SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1);
+    SupervisorSpec spec2 = new TestSupervisorSpec("id2", supervisor2);
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id1", spec
+    );
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    supervisor1.start();
+    replayAll();
+    manager.start();
+    Assert.assertFalse(manager.shouldUpdateSupervisor(spec));
+    Assert.assertTrue(manager.shouldUpdateSupervisor(spec2));
+    Assert.assertTrue(manager.shouldUpdateSupervisor(new 
NoopSupervisorSpec("id1", null)));
+  }
+
   @Test
   public void testStopAndRemoveSupervisorNotStarted()
   {
@@ -533,7 +553,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", 
ImmutableList.of("noopDS"));
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
-    SeekableStreamSupervisorSpec suspendedSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisorSpec suspendedSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
     Supervisor suspendedSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
     
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
     EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
@@ -544,7 +564,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     EasyMock.replay(suspendedSpec);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
-    SeekableStreamSupervisorSpec activeSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisorSpec activeSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
     Supervisor activeSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
     EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
     EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
@@ -555,7 +575,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     EasyMock.replay(activeSpec);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
-    SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
     Supervisor activeSupervisorWithConcurrentLocks = 
EasyMock.mock(SeekableStreamSupervisor.class);
     
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
     
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
@@ -570,7 +590,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     EasyMock.replay(activeSpecWithConcurrentLocks);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
-    SeekableStreamSupervisorSpec activeAppendSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisorSpec activeAppendSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
     Supervisor activeAppendSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
     
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
     
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
@@ -585,7 +605,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     // A supervisor with useConcurrentLocks set to false explicitly must not 
use an append lock
-    SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
     Supervisor supervisorWithUseConcurrentLocksFalse = 
EasyMock.mock(SeekableStreamSupervisor.class);
     
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
     
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
@@ -639,7 +659,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", 
ImmutableList.of("noopDS"));
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
-    SeekableStreamSupervisorSpec streamingSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisorSpec streamingSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
     SeekableStreamSupervisor streamSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
     streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
     EasyMock.expectLastCall().once();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index f07c6c13ab8..febc959b0d1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -161,7 +161,7 @@ public class SupervisorResourceTest extends EasyMockSupport
 
     replayAll();
 
-    Response response = supervisorResource.specPost(spec, request);
+    Response response = supervisorResource.specPost(spec, false, request);
     verifyAll();
 
     Assert.assertEquals(200, response.getStatus());
@@ -171,12 +171,61 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.specPost(spec, request);
+    response = supervisorResource.specPost(spec, false, request);
     verifyAll();
 
     Assert.assertEquals(503, response.getStatus());
   }
 
+  @Test
+  public void testSpecPost_whenSkipRestartIfUnmodifiedIsTrue()
+  {
+    SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null)
+    {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return List.of("datasource1");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.shouldUpdateSupervisor(spec)).andReturn(false);
+
+    setupMockRequest();
+
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+    replayAll();
+
+    Response response = supervisorResource.specPost(spec, true, request);
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+
+    resetAll();
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.shouldUpdateSupervisor(spec)).andReturn(true);
+    
EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(spec)).andReturn(true);
+
+    setupMockRequest();
+    setupMockRequestForAudit();
+
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+    auditManager.doAudit(EasyMock.anyObject());
+    EasyMock.expectLastCall().once();
+
+    replayAll();
+
+    response = supervisorResource.specPost(spec, true, request);
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+  }
+
   @Test
   public void testSpecPostWithInputSourceSecurityEnabledAuthorized()
   {
@@ -201,7 +250,7 @@ public class SupervisorResourceTest extends EasyMockSupport
     EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
     replayAll();
 
-    Response response = supervisorResource.specPost(spec, request);
+    Response response = supervisorResource.specPost(spec, false, request);
     verifyAll();
 
     Assert.assertEquals(200, response.getStatus());
@@ -211,10 +260,17 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.specPost(spec, request);
+    response = supervisorResource.specPost(spec, false, request);
     verifyAll();
 
     Assert.assertEquals(503, response.getStatus());
+
+    resetAll();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+    replayAll();
+    response = supervisorResource.specPost(spec, null, request);
+    Assert.assertEquals(503, response.getStatus());
+    verifyAll();
   }
 
   @Test
@@ -230,7 +286,7 @@ public class SupervisorResourceTest extends EasyMockSupport
       }
     };
 
-    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@@ -238,10 +294,11 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     ).atLeastOnce();
     request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
     EasyMock.expectLastCall().anyTimes();
-    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+    
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true).times(2);
     replayAll();
 
-    Assert.assertThrows(ForbiddenException.class, () -> 
supervisorResource.specPost(spec, request));
+    Assert.assertThrows(ForbiddenException.class, () -> 
supervisorResource.specPost(spec, false, request));
+    Assert.assertThrows(ForbiddenException.class, () -> 
supervisorResource.specPost(spec, null, request));
     verifyAll();
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 1144a77de36..214c024b454 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -609,7 +609,7 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
         taskStorage,
         metadataStorageCoordinator,
         emitter,
-        new SupervisorManager(null)
+        new SupervisorManager(OBJECT_MAPPER, null)
         {
           @Override
           public boolean checkPointDataSourceMetadata(
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
index b83f77183aa..847b1eba6fd 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
@@ -336,6 +336,10 @@ public class SQLMetadataRuleManager implements 
MetadataRuleManager
     final String ruleString;
     try {
       ruleString = jsonMapper.writeValueAsString(newRules);
+      if 
(ruleString.equals(jsonMapper.writeValueAsString(rules.get().get(dataSource)))) 
{
+        log.info("Retention rules unchanged for datasource[%s] with 
rules[%s]", dataSource, ruleString);
+        return true;
+      }
       log.info("Updating datasource[%s] with rules[%s] as per [%s]", 
dataSource, ruleString, auditInfo);
     }
     catch (JsonProcessingException e) {
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
index 82a87dd15a9..b3110912509 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
@@ -115,6 +115,52 @@ public class SQLMetadataRuleManagerTest
     Assert.assertEquals(rules.get(0), 
allRules.get(TestDataSource.WIKI).get(0));
   }
 
+  @Test
+  public void testAuditEntryNotCreatedWhenRetentionRulesUnchanged()
+  {
+    List<Rule> rules = Collections.singletonList(
+        new IntervalLoadRule(
+            Intervals.of("2015-01-01/2015-02-01"),
+            ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+            null
+        )
+    );
+    ruleManager.overrideRule(TestDataSource.WIKI, rules, 
createAuditInfo("override rule"));
+    ruleManager.overrideRule(TestDataSource.WIKI, rules, 
createAuditInfo("override rule"));
+    ruleManager.overrideRule(TestDataSource.WIKI, rules, 
createAuditInfo("override rule"));
+    Assert.assertEquals(1, auditManager.fetchAuditHistory(TestDataSource.WIKI, 
"rules", 3).size());
+  }
+
+  @Test
+  public void testAuditEntryCreatedWhenRetentionRulesChanged()
+  {
+    List<Rule> rules = Collections.singletonList(
+        new IntervalLoadRule(
+            Intervals.of("2015-01-01/2015-02-01"),
+            ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+            null
+        )
+    );
+    ruleManager.overrideRule(TestDataSource.WIKI, rules, 
createAuditInfo("override rule"));
+    rules = Collections.singletonList(
+        new IntervalLoadRule(
+            Intervals.of("2015-01-01/2015-02-02"),
+            ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+            null
+        )
+    );
+    ruleManager.overrideRule(TestDataSource.WIKI, rules, 
createAuditInfo("override rule"));
+    rules = Collections.singletonList(
+        new IntervalLoadRule(
+            Intervals.of("2015-01-01/2015-02-03"),
+            ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+            null
+        )
+    );
+    ruleManager.overrideRule(TestDataSource.WIKI, rules, 
createAuditInfo("override rule"));
+    Assert.assertEquals(3, auditManager.fetchAuditHistory(TestDataSource.WIKI, 
"rules", 3).size());
+  }
+
   @Test
   public void testOverrideRuleWithNull()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to