[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.
junrao commented on a change in pull request #10173: URL: https://github.com/apache/kafka/pull/10173#discussion_r585934097 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. + * + * This class can be plugged in to Kafka cluster by adding the implementation class as + * remote.log.metadata.manager.class.name property value. There is an inbuilt implementation backed by + * topic storage in the local cluster. This is used as the default implementation if + * remote.log.metadata.manager.class.name is not configured. + * + * + * remote.log.metadata.manager.class.path property is about the class path of the RemoteLogStorageManager + * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded + * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this + * parameter is same with the standard Java class path string. + * + * + * remote.log.metadata.manager.listener.name property is about listener name of the local broker to which + * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other + * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener. + * + * "cluster.id", "broker.id" and all other properties prefixed with "remote.log.metadata." are passed when + * {@link #configure(Map)} is invoked on this instance. + * + */ +@InterfaceStability.Evolving +public interface RemoteLogMetadataManager extends Configurable, Closeable { Review comment: The KIP has the following method and is missing in the PR. `void updateRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)` ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Closeable; +import java.io.InputStream; + +/** + * This interface provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote + * storage. + * + * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId} + * which is universally unique even for the same topic partition and offsets. + * + * {@link RemoteLogSegmentMetadata} is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on + * {@link RemoteStorageManager} with the respective {@link RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is + * responsible for storing and fetching metadata about the remote
[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.
junrao commented on a change in pull request #10173: URL: https://github.com/apache/kafka/pull/10173#discussion_r585109788 ## File path: clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.io.Serializable; +import java.util.Objects; +import java.util.UUID; + +/** + * This represents universally unique identifier with topic id for a topic partition. This makes sure that topics + * recreated with the same name will always have unique topic identifiers. + */ +public class TopicIdPartition implements Serializable { Review comment: Could we consolidate this with the TopicIdPartition in BrokersToIsrs? ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * It describes the metadata about a topic partition's remote log segment in the remote storage. This is uniquely + * represented with {@link RemoteLogSegmentId}. + * + * New instance is always created with the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}. This can be + * updated by applying {@link RemoteLogSegmentMetadataUpdate} for the respective {@link RemoteLogSegmentId} of the + * {@code RemoteLogSegmentMetadata}. + */ +@InterfaceStability.Evolving +public class RemoteLogSegmentMetadata implements Serializable { Review comment: We typically don't use java serialization. Is Serializable needed? Ditto in a few other classes. ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentId.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Serializable; +import java.util.Objects; +import java.util.UUID; + +/** + * This class represents a universally unique identifier associated to a topic partition's log segment. This will be + * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegmentData(RemoteLogSegmentMetadata, LogSegmentData)}. + * Once it is