kowshik commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r621909685
########## File path: raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.metadata; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.RecordSerde; + +/** + * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement + * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}. + * + * This can be used as the underlying serialization mechanism for any metadata kind of log storage. + * <p></p> + * Serialization format for the given {@code ApiMessageAndVersion} is like below: Review comment: Could we drop the word `like` ? ########## File path: raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.metadata; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.RecordSerde; + +/** + * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement + * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}. + * + * This can be used as the underlying serialization mechanism for any metadata kind of log storage. Review comment: The grammar looks a bit off here: `... mechanism for any metadata kind of log storage.` ########## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteMetadata.java ########## @@ -22,32 +22,30 @@ import java.util.Objects; /** - * This class represents the metadata about the remote partition. It can be updated with {@link RemoteLogMetadataManager#putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata)}. + * This class represents the metadata about the remote partition. It can be created/updated with {@link RemoteLogMetadataManager#putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata)}. * Possible state transitions are mentioned at {@link RemotePartitionDeleteState}. */ @InterfaceStability.Evolving -public class RemotePartitionDeleteMetadata { +public class RemotePartitionDeleteMetadata extends RemoteLogMetadata { private final TopicIdPartition topicIdPartition; private final RemotePartitionDeleteState state; - private final long eventTimestamp; - private final int brokerId; /** + * Creates an instance of this class with the given metadata. Review comment: We could drop this line as the intent looks obvious to me. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.metadata.AbstractApiMessageSerde; +import java.nio.ByteBuffer; + +/** + * This class class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any Review comment: typo: `class class` ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> { Review comment: Does this class have unit test coverage? ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> { Review comment: Does this class have unit test coverage? ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { + private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); + private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); + private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + + private final Map<String, Short> remoteLogStorageClassToApiKey; + private final Map<Short, RemoteLogMetadataTransform> keyToTransform; + private final BytesApiMessageSerde bytesApiMessageSerde; + + public RemoteLogMetadataSerde() { + remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap(); + keyToTransform = createRemoteLogMetadataTransforms(); + bytesApiMessageSerde = new BytesApiMessageSerde() { + @Override + public ApiMessage apiMessageFor(short apiKey) { + return newApiMessage(apiKey); + } + }; + } + + protected ApiMessage newApiMessage(short apiKey) { + return MetadataRecordType.fromId(apiKey).newMetadataRecord(); + } + + protected Map<Short, RemoteLogMetadataTransform> createRemoteLogMetadataTransforms() { + Map<Short, RemoteLogMetadataTransform> map = new HashMap<>(); + map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform()); + map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform()); + map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform()); + return map; + } + + protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() { + Map<String, Short> map = new HashMap<>(); + map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); + map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); + map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); + return map; + } + + public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { + Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); + if (apiKey == null) { + throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() + + " does not exist."); + } + + @SuppressWarnings("unchecked") + ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); + + return bytesApiMessageSerde.serialize(apiMessageAndVersion); + } + + public RemoteLogMetadata deserialize(byte[] data) { + ApiMessageAndVersion apiMessageAndVersion = bytesApiMessageSerde.deserialize(data); + + return remoteLogMetadataTransform(apiMessageAndVersion.message().apiKey()).fromApiMessageAndVersion(apiMessageAndVersion); + } + + private RemoteLogMetadataTransform remoteLogMetadataTransform(short apiKey) { + RemoteLogMetadataTransform metadataTransform = keyToTransform.get(apiKey); + if (metadataTransform == null) { + throw new IllegalArgumentException("RemoteLogMetadataTransform for apikey: " + apiKey + " does not exist."); + } + + return metadataTransform; + } + Review comment: nit: can remove extra line ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { + private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); + private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); + private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + + private final Map<String, Short> remoteLogStorageClassToApiKey; + private final Map<Short, RemoteLogMetadataTransform> keyToTransform; + private final BytesApiMessageSerde bytesApiMessageSerde; + + public RemoteLogMetadataSerde() { + remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap(); Review comment: Hmm, why not instead allow the two maps: `remoteLogStorageClassToApiKey ` and `keyToTransform` to be injected via an overloaded version of the constructor? As it stands, it feels overkill to me to ask the user to specialize this class just to inject these. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { + private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); + private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); + private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + + private final Map<String, Short> remoteLogStorageClassToApiKey; + private final Map<Short, RemoteLogMetadataTransform> keyToTransform; + private final BytesApiMessageSerde bytesApiMessageSerde; + + public RemoteLogMetadataSerde() { + remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap(); + keyToTransform = createRemoteLogMetadataTransforms(); + bytesApiMessageSerde = new BytesApiMessageSerde() { + @Override + public ApiMessage apiMessageFor(short apiKey) { + return newApiMessage(apiKey); + } + }; + } + + protected ApiMessage newApiMessage(short apiKey) { + return MetadataRecordType.fromId(apiKey).newMetadataRecord(); + } + + protected Map<Short, RemoteLogMetadataTransform> createRemoteLogMetadataTransforms() { + Map<Short, RemoteLogMetadataTransform> map = new HashMap<>(); + map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform()); + map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform()); + map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform()); + return map; + } + + protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() { + Map<String, Short> map = new HashMap<>(); + map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); + map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); + map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); + return map; + } + + public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { + Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); Review comment: This 2-way map lookup feels a bit complex to me. It appears here the requirement is that you need the `apiKey` corresponding to the provided `RemoteLogMetadata`. To make it simpler, why not provide a `short apiKey()` abstract method in the `RemoteLogMetadata` base class and then ask the specializing classes to implement it? You will use then use the `remoteLogMetadata.apiKey()` method to get the apiKey here. This will avoid the need to maintain 2 maps within this class, you will only need the `remoteLogMetadataTransform` map. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.metadata.AbstractApiMessageSerde; +import java.nio.ByteBuffer; + +/** + * This class class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any + * metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization + * mechanism. + * <p></p> + * Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective + * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective + * {@code ApiMessage} instance. + */ +public abstract class BytesApiMessageSerde { Review comment: Why do we need this functionality in a separate class? Currently the only user of this class seems to be `RemoteLogMetadataSerde`. Instead if we just merge the implementation of this class into `RemoteLogMetadataSerde`, it looks simpler to me. ########## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java ########## @@ -270,9 +247,9 @@ public String toString() { "remoteLogSegmentId=" + remoteLogSegmentId + ", startOffset=" + startOffset + ", endOffset=" + endOffset + - ", brokerId=" + brokerId + + ", brokerId=" + brokerId() + Review comment: Perhaps it is worth considering using `StringBuilder` here given that we are doing a number of string concatenations (~10) now. https://docs.oracle.com/javase/7/docs/api/java/lang/StringBuilder.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org