This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 75fb57ed6e3 Update error messages when supervisor's checkpoint state 
is invalid (#16208)
75fb57ed6e3 is described below

commit 75fb57ed6e36f2c30fc8aec070dca4762823a9c2
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Apr 3 10:34:17 2024 -0700

    Update error messages when supervisor's checkpoint state is invalid (#16208)
    
    * Update error message when topic messages.
    
    Suggest resetting the supervisor when the topic changes instead of changing
    the supervisor name which is actually making a new supervisor.
    
    * Update 
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * Cleanup
    
    * Remove log and include oldCommitMetadataFromDb
    
    * Fix test
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../SegmentTransactionalInsertActionTest.java      |  8 +-
 .../IndexerSQLMetadataStorageCoordinator.java      | 88 ++++++++++------------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 35 ++++++---
 3 files changed, 72 insertions(+), 59 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 6f8e827c705..847354706ba 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -151,7 +152,12 @@ public class SegmentTransactionalInsertActionTest
     );
 
     Assert.assertEquals(
-        SegmentPublishResult.fail("java.lang.RuntimeException: Failed to 
update the metadata Store. The new start metadata is ahead of last commited end 
state."),
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "The new start metadata state[ObjectMetadata{theObject=[1]}] 
is ahead of the last commited end"
+                + " state[null]. Try resetting the supervisor."
+            ).toString()
+        ),
         result
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e2addccbcb9..d364299d21d 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -33,6 +33,7 @@ import com.google.common.hash.Hashing;
 import com.google.common.io.BaseEncoding;
 import com.google.inject.Inject;
 import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -445,41 +446,33 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     try {
       return connector.retryTransaction(
-          new TransactionCallback<SegmentPublishResult>()
-          {
-            @Override
-            public SegmentPublishResult inTransaction(
-                final Handle handle,
-                final TransactionStatus transactionStatus
-            ) throws Exception
-            {
-              // Set definitelyNotUpdated back to false upon retrying.
-              definitelyNotUpdated.set(false);
+          (handle, transactionStatus) -> {
+            // Set definitelyNotUpdated back to false upon retrying.
+            definitelyNotUpdated.set(false);
 
-              if (startMetadata != null) {
-                final DataStoreMetadataUpdateResult result = 
updateDataSourceMetadataWithHandle(
-                    handle,
-                    dataSource,
-                    startMetadata,
-                    endMetadata
-                );
-
-                if (result.isFailed()) {
-                  // Metadata was definitely not updated.
-                  transactionStatus.setRollbackOnly();
-                  definitelyNotUpdated.set(true);
-
-                  if (result.canRetry()) {
-                    throw new RetryTransactionException(result.getErrorMsg());
-                  } else {
-                    throw new RuntimeException(result.getErrorMsg());
-                  }
+            if (startMetadata != null) {
+              final DataStoreMetadataUpdateResult result = 
updateDataSourceMetadataWithHandle(
+                  handle,
+                  dataSource,
+                  startMetadata,
+                  endMetadata
+              );
+
+              if (result.isFailed()) {
+                // Metadata was definitely not updated.
+                transactionStatus.setRollbackOnly();
+                definitelyNotUpdated.set(true);
+
+                if (result.canRetry()) {
+                  throw new RetryTransactionException(result.getErrorMsg());
+                } else {
+                  throw InvalidInput.exception(result.getErrorMsg());
                 }
               }
-
-              final Set<DataSegment> inserted = 
announceHistoricalSegmentBatch(handle, segments, usedSegments);
-              return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
             }
+
+            final Set<DataSegment> inserted = 
announceHistoricalSegmentBatch(handle, segments, usedSegments);
+            return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
           },
           3,
           getSqlMetadataMaxRetry()
@@ -2395,17 +2388,19 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     final boolean startMetadataMatchesExisting;
-    int startMetadataGreaterThanExisting = 0;
+    boolean startMetadataGreaterThanExisting = false;
 
     if (oldCommitMetadataFromDb == null) {
       startMetadataMatchesExisting = startMetadata.isValidStart();
-      startMetadataGreaterThanExisting = 1;
+      startMetadataGreaterThanExisting = true;
     } else {
       // Checking against the last committed metadata.
-      // If the new start sequence number is greater than the end sequence 
number of last commit compareTo() function will return 1,
-      // 0 in all other cases. It might be because multiple tasks are 
publishing the sequence at around same time.
+      // If the new start sequence number is greater than the end sequence 
number of the last commit,
+      // compareTo() will return 1 and 0 in all other cases. This can happen 
if multiple tasks are publishing the
+      // sequence around the same time.
       if (startMetadata instanceof Comparable) {
-        startMetadataGreaterThanExisting = ((Comparable) 
startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
+        startMetadataGreaterThanExisting = ((Comparable) 
startMetadata.asStartMetadata())
+                                               
.compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0;
       }
 
       // Converting the last one into start metadata for checking since only 
the same type of metadata can be matched.
@@ -2415,25 +2410,20 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       startMetadataMatchesExisting = 
startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
     }
 
-    if (startMetadataGreaterThanExisting == 1 && 
!startMetadataMatchesExisting) {
-      // Offset stored in StartMetadata is Greater than the last commited 
metadata,
-      // Then retry multiple task might be trying to publish the segment for 
same partitions.
-      log.info("Failed to update the metadata Store. The new start metadata: 
[%s] is ahead of last commited end state: [%s].",
-          startMetadata,
-          oldCommitMetadataFromDb);
+    if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
+      // Offsets stored in startMetadata is greater than the last commited 
metadata.
       return new DataStoreMetadataUpdateResult(true, false,
-          "Failed to update the metadata Store. The new start metadata is 
ahead of last commited end state."
+          "The new start metadata state[%s] is ahead of the last commited"
+          + " end state[%s]. Try resetting the supervisor.", startMetadata, 
oldCommitMetadataFromDb
       );
     }
 
     if (!startMetadataMatchesExisting) {
       // Not in the desired start state.
-      return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
-          "Inconsistent metadata state. This can happen if you update input 
topic in a spec without changing " +
-          "the supervisor name. Stored state: [%s], Target state: [%s].",
-          oldCommitMetadataFromDb,
-          startMetadata
-      ));
+      return new DataStoreMetadataUpdateResult(true, false,
+          "Inconsistency between stored metadata state[%s] and target 
state[%s]. Try resetting the supervisor.",
+          oldCommitMetadataFromDb, startMetadata
+      );
     }
 
     // Only endOffsets should be stored in metadata store
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 03de72b96fb..7b6fb4d11a2 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.hash.Hashing;
 import com.google.common.io.BaseEncoding;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -935,7 +936,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: 
Failed to update the metadata Store. The new start metadata is ahead of last 
commited end state."), result1);
+    Assert.assertEquals(
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "The new start metadata 
state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited"
+                + " end state[null]. Try resetting the supervisor."
+            ).toString()),
+        result1
+    );
 
     // Should only be tried once.
     Assert.assertEquals(1, metadataUpdateCounter.get());
@@ -956,10 +964,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: 
Inconsistent metadata state. This can " +
-        "happen if you update input topic in a spec without changing the 
supervisor name. " +
-        "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
-        "Target state: [ObjectMetadata{theObject=null}]."), result2);
+    Assert.assertEquals(
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}]"
+                + " and target state[ObjectMetadata{theObject=null}]. Try 
resetting the supervisor."
+            ).toString()
+        ),
+        result2
+    );
 
     // Should only be tried once per call.
     Assert.assertEquals(2, metadataUpdateCounter.get());
@@ -1026,10 +1039,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(ImmutableMap.of("foo", "qux")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: 
Inconsistent metadata state. This can " +
-        "happen if you update input topic in a spec without changing the 
supervisor name. " +
-        "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
-        "Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
+    Assert.assertEquals(
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}] and "
+                + "target state[ObjectMetadata{theObject={foo=qux}}]. Try 
resetting the supervisor."
+            ).toString()),
+        result2
+    );
 
     // Should only be tried once per call.
     Assert.assertEquals(2, metadataUpdateCounter.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to