[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-02 Thread GitBox


junrao commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585934097



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface provides storing and fetching remote log segment metadata 
with strongly consistent semantics.
+ * 
+ * This class can be plugged in to Kafka cluster by adding the implementation 
class as
+ * remote.log.metadata.manager.class.name property value. There 
is an inbuilt implementation backed by
+ * topic storage in the local cluster. This is used as the default 
implementation if
+ * remote.log.metadata.manager.class.name is not configured.
+ * 
+ * 
+ * remote.log.metadata.manager.class.path property is about the 
class path of the RemoteLogStorageManager
+ * implementation. If specified, the RemoteLogStorageManager implementation 
and its dependent libraries will be loaded
+ * by a dedicated classloader which searches this class path before the Kafka 
broker class path. The syntax of this
+ * parameter is same with the standard Java class path string.
+ * 
+ * 
+ * remote.log.metadata.manager.listener.name property is about 
listener name of the local broker to which
+ * it should get connected if needed by RemoteLogMetadataManager 
implementation. When this is configured all other
+ * required properties can be passed as properties with prefix of 
'remote.log.metadata.manager.listener.
+ * 
+ * "cluster.id", "broker.id" and all other properties prefixed with 
"remote.log.metadata." are passed when
+ * {@link #configure(Map)} is invoked on this instance.
+ * 
+ */
+@InterfaceStability.Evolving
+public interface RemoteLogMetadataManager extends Configurable, Closeable {

Review comment:
   The KIP has the following method and is missing in the PR.
   
   `void updateRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)`

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.io.InputStream;
+
+/**
+ * This interface provides the lifecycle of remote log segments that includes 
copy, fetch, and delete from remote
+ * storage.
+ * 
+ * Each upload or copy of a segment is initiated with {@link 
RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
+ * which is universally unique even for the same topic partition and offsets.
+ * 
+ * {@link RemoteLogSegmentMetadata} is stored in {@link 
RemoteLogMetadataManager} before and after copy/delete operations on
+ * {@link RemoteStorageManager} with the respective {@link 
RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is
+ * responsible for storing and fetching metadata about the remote 

[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


junrao commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585109788



##
File path: clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * This represents universally unique identifier with topic id for a topic 
partition. This makes sure that topics
+ * recreated with the same name will always have unique topic identifiers.
+ */
+public class TopicIdPartition implements Serializable {

Review comment:
   Could we consolidate this with the TopicIdPartition in BrokersToIsrs?

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * It describes the metadata about a topic partition's remote log segment in 
the remote storage. This is uniquely
+ * represented with {@link RemoteLogSegmentId}.
+ * 
+ * New instance is always created with the state as {@link 
RemoteLogSegmentState#COPY_SEGMENT_STARTED}. This can be
+ * updated by applying {@link RemoteLogSegmentMetadataUpdate} for the 
respective {@link RemoteLogSegmentId} of the
+ * {@code RemoteLogSegmentMetadata}.
+ */
+@InterfaceStability.Evolving
+public class RemoteLogSegmentMetadata implements Serializable {

Review comment:
   We typically don't use java serialization. Is Serializable needed? Ditto 
in a few other classes.

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentId.java
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * This class represents a universally unique identifier associated to a topic 
partition's log segment. This will be
+ * regenerated for every attempt of copying a specific log segment in {@link 
RemoteStorageManager#copyLogSegmentData(RemoteLogSegmentMetadata, 
LogSegmentData)}.
+ * Once it is