kowshik commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r621909685



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RecordSerde;
+
+/**
+ * This is an implementation of {@code RecordSerde} with {@link 
ApiMessageAndVersion} but implementors need to implement
+ * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for 
the given {@code apiKey}.
+ *
+ * This can be used as the underlying serialization mechanism for any metadata 
kind of log storage.
+ * <p></p>
+ * Serialization format for the given {@code ApiMessageAndVersion} is like 
below:

Review comment:
       Could we drop the word `like` ?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RecordSerde;
+
+/**
+ * This is an implementation of {@code RecordSerde} with {@link 
ApiMessageAndVersion} but implementors need to implement
+ * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for 
the given {@code apiKey}.
+ *
+ * This can be used as the underlying serialization mechanism for any metadata 
kind of log storage.

Review comment:
       The grammar looks a bit off here: `... mechanism for any metadata kind 
of log storage.` 

##########
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteMetadata.java
##########
@@ -22,32 +22,30 @@
 import java.util.Objects;
 
 /**
- * This class represents the metadata about the remote partition. It can be 
updated with {@link 
RemoteLogMetadataManager#putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata)}.
+ * This class represents the metadata about the remote partition. It can be 
created/updated with {@link 
RemoteLogMetadataManager#putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata)}.
  * Possible state transitions are mentioned at {@link 
RemotePartitionDeleteState}.
  */
 @InterfaceStability.Evolving
-public class RemotePartitionDeleteMetadata {
+public class RemotePartitionDeleteMetadata extends RemoteLogMetadata {
 
     private final TopicIdPartition topicIdPartition;
     private final RemotePartitionDeleteState state;
-    private final long eventTimestamp;
-    private final int brokerId;
 
     /**
+     * Creates an instance of this class with the given metadata.

Review comment:
       We could drop this line as the intent looks obvious to me.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.metadata.AbstractApiMessageSerde;
+import java.nio.ByteBuffer;
+
+/**
+ * This class class provides conversion of {@code ApiMessageAndVersion} to 
bytes and vice versa.. This can be used as serialization protocol for any

Review comment:
       typo: `class class`

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {

Review comment:
       Does this class have unit test coverage?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+public class RemoteLogSegmentMetadataUpdateTransform implements 
RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> {

Review comment:
       Does this class have unit test coverage?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {
+    private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
+    private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+    private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+    private final Map<String, Short> remoteLogStorageClassToApiKey;
+    private final Map<Short, RemoteLogMetadataTransform> keyToTransform;
+    private final BytesApiMessageSerde bytesApiMessageSerde;
+
+    public RemoteLogMetadataSerde() {
+        remoteLogStorageClassToApiKey = 
createRemoteLogStorageClassToApiKeyMap();
+        keyToTransform = createRemoteLogMetadataTransforms();
+        bytesApiMessageSerde = new BytesApiMessageSerde() {
+            @Override
+            public ApiMessage apiMessageFor(short apiKey) {
+                return newApiMessage(apiKey);
+            }
+        };
+    }
+
+    protected ApiMessage newApiMessage(short apiKey) {
+        return MetadataRecordType.fromId(apiKey).newMetadataRecord();
+    }
+
+    protected Map<Short, RemoteLogMetadataTransform> 
createRemoteLogMetadataTransforms() {
+        Map<Short, RemoteLogMetadataTransform> map = new HashMap<>();
+        map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataTransform());
+        map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateTransform());
+        map.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataTransform());
+        return map;
+    }
+
+    protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
+        Map<String, Short> map = new HashMap<>();
+        map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
+        map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
+        map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
+        return map;
+    }
+
+    public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
+        Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
+        if (apiKey == null) {
+            throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
+                                                       + " does not exist.");
+        }
+
+        @SuppressWarnings("unchecked")
+        ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+
+        return bytesApiMessageSerde.serialize(apiMessageAndVersion);
+    }
+
+    public RemoteLogMetadata deserialize(byte[] data) {
+        ApiMessageAndVersion apiMessageAndVersion = 
bytesApiMessageSerde.deserialize(data);
+
+        return 
remoteLogMetadataTransform(apiMessageAndVersion.message().apiKey()).fromApiMessageAndVersion(apiMessageAndVersion);
+    }
+
+    private RemoteLogMetadataTransform remoteLogMetadataTransform(short 
apiKey) {
+        RemoteLogMetadataTransform metadataTransform = 
keyToTransform.get(apiKey);
+        if (metadataTransform == null) {
+            throw new IllegalArgumentException("RemoteLogMetadataTransform for 
apikey: " + apiKey + " does not exist.");
+        }
+
+        return metadataTransform;
+    }
+

Review comment:
       nit: can remove extra line

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {
+    private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
+    private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+    private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+    private final Map<String, Short> remoteLogStorageClassToApiKey;
+    private final Map<Short, RemoteLogMetadataTransform> keyToTransform;
+    private final BytesApiMessageSerde bytesApiMessageSerde;
+
+    public RemoteLogMetadataSerde() {
+        remoteLogStorageClassToApiKey = 
createRemoteLogStorageClassToApiKeyMap();

Review comment:
       Hmm, why not instead allow the two maps: `remoteLogStorageClassToApiKey 
` and `keyToTransform` to be injected via an overloaded version of the 
constructor? As it stands, it feels overkill to me to ask the user to 
specialize this class just to inject these.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {
+    private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
+    private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+    private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+    private final Map<String, Short> remoteLogStorageClassToApiKey;
+    private final Map<Short, RemoteLogMetadataTransform> keyToTransform;
+    private final BytesApiMessageSerde bytesApiMessageSerde;
+
+    public RemoteLogMetadataSerde() {
+        remoteLogStorageClassToApiKey = 
createRemoteLogStorageClassToApiKeyMap();
+        keyToTransform = createRemoteLogMetadataTransforms();
+        bytesApiMessageSerde = new BytesApiMessageSerde() {
+            @Override
+            public ApiMessage apiMessageFor(short apiKey) {
+                return newApiMessage(apiKey);
+            }
+        };
+    }
+
+    protected ApiMessage newApiMessage(short apiKey) {
+        return MetadataRecordType.fromId(apiKey).newMetadataRecord();
+    }
+
+    protected Map<Short, RemoteLogMetadataTransform> 
createRemoteLogMetadataTransforms() {
+        Map<Short, RemoteLogMetadataTransform> map = new HashMap<>();
+        map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataTransform());
+        map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateTransform());
+        map.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataTransform());
+        return map;
+    }
+
+    protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
+        Map<String, Short> map = new HashMap<>();
+        map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
+        map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
+        map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
+        return map;
+    }
+
+    public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
+        Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());

Review comment:
       This 2-way map lookup feels a bit complex to me. It appears here the 
requirement is that you need the `apiKey` corresponding to the provided 
`RemoteLogMetadata`. To make it simpler, why not provide a `short apiKey()` 
abstract method in the `RemoteLogMetadata` base class and then ask the 
specializing classes to implement it? You will use then use the 
`remoteLogMetadata.apiKey()` method to get the apiKey here. This will avoid the 
need to maintain 2 maps within this class, you will only need the 
`remoteLogMetadataTransform` map.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.metadata.AbstractApiMessageSerde;
+import java.nio.ByteBuffer;
+
+/**
+ * This class class provides conversion of {@code ApiMessageAndVersion} to 
bytes and vice versa.. This can be used as serialization protocol for any
+ * metadata records derived of {@code ApiMessage}s. It internally uses {@link 
AbstractApiMessageSerde} for serialization/deserialization
+ * mechanism.
+ * <p></p>
+ * Implementors need to extend this class and implement {@link 
#apiMessageFor(short)} method to return a respective
+ * {@code ApiMessage} for the given {@code apiKey}. This is required to 
deserialize the bytes to build the respective
+ * {@code ApiMessage} instance.
+ */
+public abstract class BytesApiMessageSerde {

Review comment:
       Why do we need this functionality in a separate class? Currently the 
only user of this class seems to be `RemoteLogMetadataSerde`. Instead if we 
just merge the implementation of this class into `RemoteLogMetadataSerde`, it 
looks simpler to me.

##########
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##########
@@ -270,9 +247,9 @@ public String toString() {
                "remoteLogSegmentId=" + remoteLogSegmentId +
                ", startOffset=" + startOffset +
                ", endOffset=" + endOffset +
-               ", brokerId=" + brokerId +
+               ", brokerId=" + brokerId() +

Review comment:
       Perhaps it is worth considering using `StringBuilder` here given that we 
are doing a number of string concatenations (~10) now.
   
   https://docs.oracle.com/javase/7/docs/api/java/lang/StringBuilder.html 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to