This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 75fb57ed6e3 Update error messages when supervisor's checkpoint state
is invalid (#16208)
75fb57ed6e3 is described below
commit 75fb57ed6e36f2c30fc8aec070dca4762823a9c2
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Apr 3 10:34:17 2024 -0700
Update error messages when supervisor's checkpoint state is invalid (#16208)
* Update error message when topic messages.
Suggest resetting the supervisor when the topic changes instead of changing
the supervisor name which is actually making a new supervisor.
* Update
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Co-authored-by: Kashif Faraz <[email protected]>
* Cleanup
* Remove log and include oldCommitMetadataFromDb
* Fix test
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
.../SegmentTransactionalInsertActionTest.java | 8 +-
.../IndexerSQLMetadataStorageCoordinator.java | 88 ++++++++++------------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 35 ++++++---
3 files changed, 72 insertions(+), 59 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 6f8e827c705..847354706ba 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -151,7 +152,12 @@ public class SegmentTransactionalInsertActionTest
);
Assert.assertEquals(
- SegmentPublishResult.fail("java.lang.RuntimeException: Failed to
update the metadata Store. The new start metadata is ahead of last commited end
state."),
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "The new start metadata state[ObjectMetadata{theObject=[1]}]
is ahead of the last commited end"
+ + " state[null]. Try resetting the supervisor."
+ ).toString()
+ ),
result
);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e2addccbcb9..d364299d21d 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -33,6 +33,7 @@ import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -445,41 +446,33 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
try {
return connector.retryTransaction(
- new TransactionCallback<SegmentPublishResult>()
- {
- @Override
- public SegmentPublishResult inTransaction(
- final Handle handle,
- final TransactionStatus transactionStatus
- ) throws Exception
- {
- // Set definitelyNotUpdated back to false upon retrying.
- definitelyNotUpdated.set(false);
+ (handle, transactionStatus) -> {
+ // Set definitelyNotUpdated back to false upon retrying.
+ definitelyNotUpdated.set(false);
- if (startMetadata != null) {
- final DataStoreMetadataUpdateResult result =
updateDataSourceMetadataWithHandle(
- handle,
- dataSource,
- startMetadata,
- endMetadata
- );
-
- if (result.isFailed()) {
- // Metadata was definitely not updated.
- transactionStatus.setRollbackOnly();
- definitelyNotUpdated.set(true);
-
- if (result.canRetry()) {
- throw new RetryTransactionException(result.getErrorMsg());
- } else {
- throw new RuntimeException(result.getErrorMsg());
- }
+ if (startMetadata != null) {
+ final DataStoreMetadataUpdateResult result =
updateDataSourceMetadataWithHandle(
+ handle,
+ dataSource,
+ startMetadata,
+ endMetadata
+ );
+
+ if (result.isFailed()) {
+ // Metadata was definitely not updated.
+ transactionStatus.setRollbackOnly();
+ definitelyNotUpdated.set(true);
+
+ if (result.canRetry()) {
+ throw new RetryTransactionException(result.getErrorMsg());
+ } else {
+ throw InvalidInput.exception(result.getErrorMsg());
}
}
-
- final Set<DataSegment> inserted =
announceHistoricalSegmentBatch(handle, segments, usedSegments);
- return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
+
+ final Set<DataSegment> inserted =
announceHistoricalSegmentBatch(handle, segments, usedSegments);
+ return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
},
3,
getSqlMetadataMaxRetry()
@@ -2395,17 +2388,19 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
final boolean startMetadataMatchesExisting;
- int startMetadataGreaterThanExisting = 0;
+ boolean startMetadataGreaterThanExisting = false;
if (oldCommitMetadataFromDb == null) {
startMetadataMatchesExisting = startMetadata.isValidStart();
- startMetadataGreaterThanExisting = 1;
+ startMetadataGreaterThanExisting = true;
} else {
// Checking against the last committed metadata.
- // If the new start sequence number is greater than the end sequence
number of last commit compareTo() function will return 1,
- // 0 in all other cases. It might be because multiple tasks are
publishing the sequence at around same time.
+ // If the new start sequence number is greater than the end sequence
number of the last commit,
+ // compareTo() will return 1 and 0 in all other cases. This can happen
if multiple tasks are publishing the
+ // sequence around the same time.
if (startMetadata instanceof Comparable) {
- startMetadataGreaterThanExisting = ((Comparable)
startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
+ startMetadataGreaterThanExisting = ((Comparable)
startMetadata.asStartMetadata())
+
.compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0;
}
// Converting the last one into start metadata for checking since only
the same type of metadata can be matched.
@@ -2415,25 +2410,20 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
startMetadataMatchesExisting =
startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}
- if (startMetadataGreaterThanExisting == 1 &&
!startMetadataMatchesExisting) {
- // Offset stored in StartMetadata is Greater than the last commited
metadata,
- // Then retry multiple task might be trying to publish the segment for
same partitions.
- log.info("Failed to update the metadata Store. The new start metadata:
[%s] is ahead of last commited end state: [%s].",
- startMetadata,
- oldCommitMetadataFromDb);
+ if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
+ // Offsets stored in startMetadata is greater than the last commited
metadata.
return new DataStoreMetadataUpdateResult(true, false,
- "Failed to update the metadata Store. The new start metadata is
ahead of last commited end state."
+ "The new start metadata state[%s] is ahead of the last commited"
+ + " end state[%s]. Try resetting the supervisor.", startMetadata,
oldCommitMetadataFromDb
);
}
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
- return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
- "Inconsistent metadata state. This can happen if you update input
topic in a spec without changing " +
- "the supervisor name. Stored state: [%s], Target state: [%s].",
- oldCommitMetadataFromDb,
- startMetadata
- ));
+ return new DataStoreMetadataUpdateResult(true, false,
+ "Inconsistency between stored metadata state[%s] and target
state[%s]. Try resetting the supervisor.",
+ oldCommitMetadataFromDb, startMetadata
+ );
}
// Only endOffsets should be stored in metadata store
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 03de72b96fb..7b6fb4d11a2 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -935,7 +936,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException:
Failed to update the metadata Store. The new start metadata is ahead of last
commited end state."), result1);
+ Assert.assertEquals(
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "The new start metadata
state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited"
+ + " end state[null]. Try resetting the supervisor."
+ ).toString()),
+ result1
+ );
// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
@@ -956,10 +964,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException:
Inconsistent metadata state. This can " +
- "happen if you update input topic in a spec without changing the
supervisor name. " +
- "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
- "Target state: [ObjectMetadata{theObject=null}]."), result2);
+ Assert.assertEquals(
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}]"
+ + " and target state[ObjectMetadata{theObject=null}]. Try
resetting the supervisor."
+ ).toString()
+ ),
+ result2
+ );
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
@@ -1026,10 +1039,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException:
Inconsistent metadata state. This can " +
- "happen if you update input topic in a spec without changing the
supervisor name. " +
- "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
- "Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
+ Assert.assertEquals(
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}] and "
+ + "target state[ObjectMetadata{theObject={foo=qux}}]. Try
resetting the supervisor."
+ ).toString()),
+ result2
+ );
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]