This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4630628701a KAFKA-18144 Move the storage exceptions out of the core
module (#18021)
4630628701a is described below
commit 4630628701a5f2b153aa9d87bbbae26a18743200
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Dec 7 15:43:00 2024 +0100
KAFKA-18144 Move the storage exceptions out of the core module (#18021)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 1 +
core/src/main/scala/kafka/cluster/Partition.scala | 3 +--
.../common/UnexpectedAppendOffsetException.scala | 29 ----------------------
core/src/main/scala/kafka/log/UnifiedLog.scala | 5 ++--
.../scala/unit/kafka/cluster/PartitionTest.scala | 3 +--
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 5 ++--
.../log/UnexpectedAppendOffsetException.java | 19 ++++++++++++--
.../CustomMetadataSizeLimitExceededException.java | 4 +--
.../internals/log/OffsetsOutOfOrderException.java | 18 ++++++++------
9 files changed, 36 insertions(+), 51 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index fbe26e8a673..6e95e36020b 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -55,6 +55,7 @@ import
org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics;
import
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import
org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index a76104c79ce..5de2ebb4667 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,6 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
-import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.log.remote.RemoteLogManager
@@ -47,7 +46,7 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin,
FetchDataInfo, Lead
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
-import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
+import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
UnexpectedAppendOffsetException}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level
diff --git
a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
deleted file mode 100644
index e719a93006d..00000000000
--- a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Indicates the follower or the future replica received records from the
leader (or current
- * replica) with first offset less than expected next offset.
- * @param firstOffset The first offset of the records to append
- * @param lastOffset The last offset of the records to append
- */
-class UnexpectedAppendOffsetException(val message: String,
- val firstOffset: Long,
- val lastOffset: Long) extends
RuntimeException(message) {
-}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index db3fe936312..925c9d057c4 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -17,7 +17,6 @@
package kafka.log
-import kafka.common.{OffsetsOutOfOrderException,
UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.utils._
import org.apache.kafka.common.errors._
@@ -35,12 +34,12 @@ import
org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.storage.log.FetchIsolation
+import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.Scheduler
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange,
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator,
ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig,
RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog = [...]
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange,
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator,
OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager,
ProducerStateManagerConfig, RollParams, SegmentDeletionReason, Ver [...]
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics,
BrokerTopicStats}
import java.io.{File, IOException}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 0cb298eda1c..2ad21666dd0 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -18,7 +18,6 @@ package kafka.cluster
import java.net.InetAddress
import com.yammer.metrics.core.Metric
-import kafka.common.UnexpectedAppendOffsetException
import kafka.log._
import kafka.server._
import kafka.utils._
@@ -60,7 +59,7 @@ import
org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
-import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
+import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 819f4a2533a..4f188e25b1c 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -17,7 +17,6 @@
package kafka.log
-import kafka.common.{OffsetsOutOfOrderException,
UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.{DelayedRemoteListOffsets, KafkaConfig}
import kafka.utils.TestUtils
@@ -38,11 +37,11 @@ import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMe
import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
-import org.apache.kafka.server.storage.log.FetchIsolation
+import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot,
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason,
ProducerStateManager, ProducerStateManagerConfig, RecordValidationException,
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot,
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics,
BrokerTopicStats}
import org.junit.jupiter.api.Assertions._
diff --git
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
b/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java
similarity index 53%
copy from
core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
copy to
server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java
index c893f3488de..652b6745ca0 100644
---
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
+++
b/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java
@@ -14,7 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.log.remote;
+package org.apache.kafka.server.storage.log;
-class CustomMetadataSizeLimitExceededException extends Exception {
+public class UnexpectedAppendOffsetException extends RuntimeException {
+
+ public final long firstOffset;
+ public final long lastOffset;
+
+ /**
+ * Indicates the follower or the future replica received records from the
leader (or current
+ * replica) with first offset less than expected next offset.
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset The last offset of the records to append
+ */
+ public UnexpectedAppendOffsetException(String message, long firstOffset,
long lastOffset) {
+ super(message);
+ this.firstOffset = firstOffset;
+ this.lastOffset = lastOffset;
+ }
}
diff --git
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java
similarity index 86%
rename from
core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
rename to
storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java
index c893f3488de..98462d93b09 100644
---
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.log.remote;
+package org.apache.kafka.server.log.remote.storage;
-class CustomMetadataSizeLimitExceededException extends Exception {
+public class CustomMetadataSizeLimitExceededException extends Exception {
}
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java
similarity index 69%
rename from core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
rename to
storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java
index f8daaa4a181..39f8d494979 100644
--- a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java
@@ -1,11 +1,11 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package kafka.common
+package org.apache.kafka.storage.internals.log;
/**
* Indicates the follower received records with non-monotonically increasing
offsets
*/
-class OffsetsOutOfOrderException(message: String) extends
RuntimeException(message) {
-}
+public class OffsetsOutOfOrderException extends RuntimeException {
+ public OffsetsOutOfOrderException(String message) {
+ super(message);
+ }
+}