[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-25 Thread GitBox


cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r658722233



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance 
running within a {@link KafkaStreams} application.

Review comment:
   ```suggestion
* Metadata of a Kafka Streams client.
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance 
running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+/**
+ * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured 
for the streams
+ * instance, which is typically host/port
+ *
+ * @return {@link HostInfo} corresponding to the streams instance
+ */
+HostInfo hostInfo();
+
+/**
+ * State stores owned by the instance as an active replica
+ *
+ * @return set of active state store names
+ */
+Set stateStoreNames();
+
+/**
+ * Topic partitions consumed by the instance as an active replica
+ *
+ * @return set of active topic partitions
+ */

Review comment:
   ```suggestion
   /**
* Source topic partitions of the active tasks of the Streams client.
*
* @return source topic partitions of the active tasks
*/
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} 
application.
+ */
+public interface TaskMetadata {
+
+/**
+ * This function will return a {@link TaskId} with basic task metadata
+ *
+ * @return the basic task metadata such as subtopology and partition id
+ */

Review comment:
   

[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-24 Thread GitBox


cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657813645



##
File path: docs/streams/upgrade-guide.html
##
@@ -121,10 +121,23 @@ Streams API
 
 The public topicGroupId and partition fields 
on TaskId have been deprecated and replaced with getters. Please migrate to 
using the new TaskId.subtopology()
 (which replaces topicGroupId) and 
TaskId.partition() APIs instead. Also, the 
TaskId#readFrom and TaskId#writeTo methods have been 
deprecated
-and will be removed, as they were never intended for public use. 
Finally, we have deprecated the TaskMetadata.taskId() method as 
well as the TaskMetadata constructor.
-These have been replaced with APIs that better represent the task id 
as an actual TaskId object instead of a String. Please migrate to 
the new TaskMetadata#getTaskId
-method. See https://cwiki.apache.org/confluence/x/vYTOCg;>KIP-740 for more 
details.
+and will be removed, as they were never intended for public use. We 
have also deprecated the 
org.apache.kafka.streams.processsor.TaskMetadata class and 
introduced a new interface
+org.apache.kafka.streams.TaskMetadata to be used instead. 
This change was introduced to better reflect the fact that 
TaskMetadata was not meant to be instantiated outside
+of Kafka codebase.
+Please note that the new TaskMetadata offers APIs that 
better represent the task id as an actual TaskId object instead of 
a String. Please migrate to the new
+org.apache.kafka.streams.TaskMetadata which offers these 
better methods, for example, by using the new 
ThreadMetadata#activeTasks and 
ThreadMetadata#standbyTasks.
+org.apache.kafka.streams.processor.ThreadMetadata class 
is also now deprecated and the newly introduced interface 
org.apache.kafka.streams.ThreadMetadata is to be used instead. In 
this new ThreadMetadata
+interface, any reference to the deprecated TaskMetadata 
is replaced by the new interface.
+Finally, also 
org.apache.kafka.streams.state.StreamsMetadata has been 
deprecated. Please migrate to the new 
org.apache.kafka.streams.StreamsMetadata.
+We have deprecated several methods under 
org.apache.kafka.streams.KafkaStreams that returned the 
aforementioned deprecated classes:
 
+
+Users of KafkaStreams#allMetadata are meant to 
migrate to the new KafkaStreams#allRunningMetadata.
+Users of KafkaStreams#allMetadataForStore(String) are 
meant to migrate to the new 
KafkaStreams#allMetadataForGivenStore(String).
+Users of KafkaStreams#localThreadsMetadata are meant 
to migrate to the new KafkaStreams#threadsMetadata.

Review comment:
   You need to adapt this text to the new method names.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+public static final TaskId TASK_ID = new TaskId(1, 2);
+public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+public static final Set TOPIC_PARTITIONS = mkSet(TP_0, 
TP_1);
+public static final Map COMMITTED_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+public static final Map END_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+public static 

[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r656890122



##
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} 
application.
+ */
+public interface TaskMetadata {
+
+/**
+ * @return the basic task metadata such as subtopology and partition id
+ */
+TaskId taskId();
+
+/**
+ * This function will return a set of the current TopicPartitions
+ */
+Set topicPartitions();

Review comment:
   Could you please use javadoc mark-up like `@return` and `@param` for the 
docs? Here and for the other methods.

##
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+/**
+ * @return the state of the Thread
+ */
+String threadState();
+
+/**
+ * @return the name of the Thread
+ */
+String threadName();
+
+/**
+ * This function will return the set of the {@link TaskMetadata} for the 
current active tasks
+ */
+Set activeTasks();

Review comment:
   Could you use javadoc mark-up for the docs? Here and for the other 
methods.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+private final String threadName;
+
+private final String threadState;
+
+private final Set activeTasks;
+
+private final Set standbyTasks;
+
+private final String mainConsumerClientId;
+
+private final String