[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
cadonna commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r658722233 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.state.HostInfo; + +import java.util.Set; + +/** + * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application. Review comment: ```suggestion * Metadata of a Kafka Streams client. ``` ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.state.HostInfo; + +import java.util.Set; + +/** + * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application. + */ +public interface StreamsMetadata { + +/** + * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams + * instance, which is typically host/port + * + * @return {@link HostInfo} corresponding to the streams instance + */ +HostInfo hostInfo(); + +/** + * State stores owned by the instance as an active replica + * + * @return set of active state store names + */ +Set stateStoreNames(); + +/** + * Topic partitions consumed by the instance as an active replica + * + * @return set of active topic partitions + */ Review comment: ```suggestion /** * Source topic partitions of the active tasks of the Streams client. * * @return source topic partitions of the active tasks */ ``` ## File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + + +/** + * Represents the state of a single task running within a {@link KafkaStreams} application. + */ +public interface TaskMetadata { + +/** + * This function will return a {@link TaskId} with basic task metadata + * + * @return the basic task metadata such as subtopology and partition id + */ Review comment:
[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
cadonna commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r657813645 ## File path: docs/streams/upgrade-guide.html ## @@ -121,10 +121,23 @@ Streams API The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new TaskId.subtopology() (which replaces topicGroupId) and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated -and will be removed, as they were never intended for public use. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadata constructor. -These have been replaced with APIs that better represent the task id as an actual TaskId object instead of a String. Please migrate to the new TaskMetadata#getTaskId -method. See https://cwiki.apache.org/confluence/x/vYTOCg;>KIP-740 for more details. +and will be removed, as they were never intended for public use. We have also deprecated the org.apache.kafka.streams.processsor.TaskMetadata class and introduced a new interface +org.apache.kafka.streams.TaskMetadata to be used instead. This change was introduced to better reflect the fact that TaskMetadata was not meant to be instantiated outside +of Kafka codebase. +Please note that the new TaskMetadata offers APIs that better represent the task id as an actual TaskId object instead of a String. Please migrate to the new +org.apache.kafka.streams.TaskMetadata which offers these better methods, for example, by using the new ThreadMetadata#activeTasks and ThreadMetadata#standbyTasks. +org.apache.kafka.streams.processor.ThreadMetadata class is also now deprecated and the newly introduced interface org.apache.kafka.streams.ThreadMetadata is to be used instead. In this new ThreadMetadata +interface, any reference to the deprecated TaskMetadata is replaced by the new interface. +Finally, also org.apache.kafka.streams.state.StreamsMetadata has been deprecated. Please migrate to the new org.apache.kafka.streams.StreamsMetadata. +We have deprecated several methods under org.apache.kafka.streams.KafkaStreams that returned the aforementioned deprecated classes: + +Users of KafkaStreams#allMetadata are meant to migrate to the new KafkaStreams#allRunningMetadata. +Users of KafkaStreams#allMetadataForStore(String) are meant to migrate to the new KafkaStreams#allMetadataForGivenStore(String). +Users of KafkaStreams#localThreadsMetadata are meant to migrate to the new KafkaStreams#threadsMetadata. Review comment: You need to adapt this text to the new method names. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.TaskMetadata; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + + +public class TaskMetadataImplTest { + +public static final TaskId TASK_ID = new TaskId(1, 2); +public static final TopicPartition TP_0 = new TopicPartition("t", 0); +public static final TopicPartition TP_1 = new TopicPartition("t", 1); +public static final Set TOPIC_PARTITIONS = mkSet(TP_0, TP_1); +public static final Map COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L)); +public static final Map END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L)); +public static
[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API
cadonna commented on a change in pull request #10840: URL: https://github.com/apache/kafka/pull/10840#discussion_r656890122 ## File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + + +/** + * Represents the state of a single task running within a {@link KafkaStreams} application. + */ +public interface TaskMetadata { + +/** + * @return the basic task metadata such as subtopology and partition id + */ +TaskId taskId(); + +/** + * This function will return a set of the current TopicPartitions + */ +Set topicPartitions(); Review comment: Could you please use javadoc mark-up like `@return` and `@param` for the docs? Here and for the other methods. ## File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.Set; + +/** + * Represents the state of a single thread running within a {@link KafkaStreams} application. + */ +public interface ThreadMetadata { + + +/** + * @return the state of the Thread + */ +String threadState(); + +/** + * @return the name of the Thread + */ +String threadName(); + +/** + * This function will return the set of the {@link TaskMetadata} for the current active tasks + */ +Set activeTasks(); Review comment: Could you use javadoc mark-up for the docs? Here and for the other methods. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.TaskMetadata; +import org.apache.kafka.streams.ThreadMetadata; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * Represents the state of a single thread running within a {@link KafkaStreams} application. + */ +public class ThreadMetadataImpl implements ThreadMetadata { + +private final String threadName; + +private final String threadState; + +private final Set activeTasks; + +private final Set standbyTasks; + +private final String mainConsumerClientId; + +private final String