This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4630628701a KAFKA-18144 Move the storage exceptions out of the core 
module (#18021)
4630628701a is described below

commit 4630628701a5f2b153aa9d87bbbae26a18743200
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Dec 7 15:43:00 2024 +0100

    KAFKA-18144 Move the storage exceptions out of the core module (#18021)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  1 +
 core/src/main/scala/kafka/cluster/Partition.scala  |  3 +--
 .../common/UnexpectedAppendOffsetException.scala   | 29 ----------------------
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  5 ++--
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  3 +--
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |  5 ++--
 .../log/UnexpectedAppendOffsetException.java       | 19 ++++++++++++--
 .../CustomMetadataSizeLimitExceededException.java  |  4 +--
 .../internals/log/OffsetsOutOfOrderException.java  | 18 ++++++++------
 9 files changed, 36 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index fbe26e8a673..6e95e36020b 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -55,6 +55,7 @@ import 
org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
 import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
 import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics;
 import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException;
 import org.apache.kafka.server.log.remote.storage.LogSegmentData;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index a76104c79ce..5de2ebb4667 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,6 @@ package kafka.cluster
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.Optional
 import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
-import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.log.remote.RemoteLogManager
@@ -47,7 +46,7 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, 
FetchDataInfo, Lead
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
-import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
+import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
UnexpectedAppendOffsetException}
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.slf4j.event.Level
 
diff --git 
a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala 
b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
deleted file mode 100644
index e719a93006d..00000000000
--- a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Indicates the follower or the future replica received records from the 
leader (or current
- * replica) with first offset less than expected next offset. 
- * @param firstOffset The first offset of the records to append
- * @param lastOffset  The last offset of the records to append
- */
-class UnexpectedAppendOffsetException(val message: String,
-                                      val firstOffset: Long,
-                                      val lastOffset: Long) extends 
RuntimeException(message) {
-}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index db3fe936312..925c9d057c4 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
 import kafka.utils._
 import org.apache.kafka.common.errors._
@@ -35,12 +34,12 @@ import 
org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.storage.log.FetchIsolation
+import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffsetException}
 import org.apache.kafka.server.util.Scheduler
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, 
ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, 
RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog = [...]
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, 
OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, 
ProducerStateManagerConfig, RollParams, SegmentDeletionReason, Ver [...]
 import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, 
BrokerTopicStats}
 
 import java.io.{File, IOException}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 0cb298eda1c..2ad21666dd0 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -18,7 +18,6 @@ package kafka.cluster
 
 import java.net.InetAddress
 import com.yammer.metrics.core.Metric
-import kafka.common.UnexpectedAppendOffsetException
 import kafka.log._
 import kafka.server._
 import kafka.utils._
@@ -60,7 +59,7 @@ import 
org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
-import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
+import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
UnexpectedAppendOffsetException}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 819f4a2533a..4f188e25b1c 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
 import kafka.server.{DelayedRemoteListOffsets, KafkaConfig}
 import kafka.utils.TestUtils
@@ -38,11 +37,11 @@ import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMe
 import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
-import org.apache.kafka.server.storage.log.FetchIsolation
+import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffsetException}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, 
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, 
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, 
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
RecordValidationException, VerificationGuard}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, 
BrokerTopicStats}
 import org.junit.jupiter.api.Assertions._
diff --git 
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
 
b/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java
similarity index 53%
copy from 
core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
copy to 
server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java
index c893f3488de..652b6745ca0 100644
--- 
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java
@@ -14,7 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.log.remote;
+package org.apache.kafka.server.storage.log;
 
-class CustomMetadataSizeLimitExceededException extends Exception {
+public class UnexpectedAppendOffsetException extends RuntimeException {
+
+    public final long firstOffset;
+    public final long lastOffset;
+
+    /**
+     * Indicates the follower or the future replica received records from the 
leader (or current
+     * replica) with first offset less than expected next offset.
+     * @param firstOffset The first offset of the records to append
+     * @param lastOffset  The last offset of the records to append
+     */
+    public UnexpectedAppendOffsetException(String message, long firstOffset, 
long lastOffset) {
+        super(message);
+        this.firstOffset = firstOffset;
+        this.lastOffset = lastOffset;
+    }
 }
diff --git 
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java
similarity index 86%
rename from 
core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
rename to 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java
index c893f3488de..98462d93b09 100644
--- 
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.log.remote;
+package org.apache.kafka.server.log.remote.storage;
 
-class CustomMetadataSizeLimitExceededException extends Exception {
+public class CustomMetadataSizeLimitExceededException extends Exception {
 }
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java
similarity index 69%
rename from core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
rename to 
storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java
index f8daaa4a181..39f8d494979 100644
--- a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java
@@ -1,11 +1,11 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
+ * the License. You may obtain a copy of the License at
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package kafka.common
+package org.apache.kafka.storage.internals.log;
 
 /**
  * Indicates the follower received records with non-monotonically increasing 
offsets
  */
-class OffsetsOutOfOrderException(message: String) extends 
RuntimeException(message) {
-}
+public class OffsetsOutOfOrderException extends RuntimeException {
 
+    public OffsetsOutOfOrderException(String message) {
+        super(message);
+    }
+}

Reply via email to