xintongsong commented on code in PR #74: URL: https://github.com/apache/flink-agents/pull/74#discussion_r2225092506
########## runtime/src/main/java/org/apache/flink/agents/runtime/metrics/ActionMetricGroup.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.clock.SystemClock; + +/** + * ActionMetricGroup class extends FlinkAgentsMetricGroupImpl and is used to monitor and measure the + * performance metrics of executing actions. It provides metrics for the number of actions currently + * executing, the number of actions executed per second, and the execution time of actions. + */ +public class ActionMetricGroup extends FlinkAgentsMetricGroupImpl { + + private final Counter numOfActionsExecuting; Review Comment: Why do we need this? ########## api/src/main/java/org/apache/flink/agents/api/metrics/StringGauge.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.api.metrics; + +import org.apache.flink.metrics.Gauge; + +/** + * StringGauge interface is designed to measure and update string values. It extends the Gauge + * interface with a generic type of String. This interface is used to define standard operations for + * components that need to monitor and update string information. + */ +public interface StringGauge extends Gauge<String> { Review Comment: What is this and why do we need it? ########## runtime/src/main/java/org/apache/flink/agents/runtime/metrics/ActionMetricGroup.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.clock.SystemClock; + +/** + * ActionMetricGroup class extends FlinkAgentsMetricGroupImpl and is used to monitor and measure the + * performance metrics of executing actions. It provides metrics for the number of actions currently + * executing, the number of actions executed per second, and the execution time of actions. + */ +public class ActionMetricGroup extends FlinkAgentsMetricGroupImpl { + + private final Counter numOfActionsExecuting; + + private final Meter numOfActionsExecutedPerSec; + + private final Histogram actionExecutionTime; + + private long startTime; + + public ActionMetricGroup(MetricGroup parentMetricGroup) { + super(parentMetricGroup); + + this.numOfActionsExecuting = getCounter("numOfActionsExecuting"); + Counter numOfActionsExecuted = getCounter("numOfActionsExecuted"); + this.numOfActionsExecutedPerSec = + getMeter("numOfActionsExecutedPerSec", numOfActionsExecuted); + this.actionExecutionTime = getHistogram("actionExecutionTime"); + } + + /** + * Marks that an action has started executing. Increments the executing actions counter and + * records the start time. + */ + public void markActionExecuting() { + numOfActionsExecuting.inc(); + startTime = SystemClock.getInstance().relativeTimeMillis(); + } + + /** + * Marks that an action has finished executing. Decrements the executing actions counter, marks + * an event on the executed meter, and records the execution time in the histogram. + */ + public void markActionExecuted() { + long endTime = SystemClock.getInstance().relativeTimeMillis(); + numOfActionsExecuting.dec(); + numOfActionsExecutedPerSec.markEvent(); + actionExecutionTime.update(endTime - startTime); Review Comment: @codenohup Can these calculations work with async execution? ########## runtime/src/main/java/org/apache/flink/agents/runtime/metrics/ActionMetricGroup.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.clock.SystemClock; + +/** + * ActionMetricGroup class extends FlinkAgentsMetricGroupImpl and is used to monitor and measure the + * performance metrics of executing actions. It provides metrics for the number of actions currently + * executing, the number of actions executed per second, and the execution time of actions. + */ +public class ActionMetricGroup extends FlinkAgentsMetricGroupImpl { Review Comment: 1. Should not use inherit here. 2. Maybe name it `BuiltInActionMetrics` ########## runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java: ########## @@ -38,8 +40,30 @@ public class RunnerContextImpl implements RunnerContext { protected final List<Event> pendingEvents = new ArrayList<>(); protected final MapState<String, MemoryObjectImpl.MemoryItem> store; - public RunnerContextImpl(MapState<String, MemoryObjectImpl.MemoryItem> store) { + protected final FlinkAgentsMetricGroupImpl agentMetricGroup; + + protected ActionMetricGroup actionMetricGroup; + + public RunnerContextImpl( + MapState<String, MemoryObjectImpl.MemoryItem> store, + FlinkAgentsMetricGroupImpl agentMetricGroup) { this.store = store; + this.agentMetricGroup = agentMetricGroup; + } + + public void setActionMetricGroup(ActionMetricGroup actionMetricGroup) { + this.actionMetricGroup = actionMetricGroup; + } Review Comment: It might be easier to set the current action, rather than the metric group bound to the action. We may have more and more action related things to be switched. ########## runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetricGroup.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.agents.plan.AgentPlan; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; + +import java.util.HashMap; + +/** + * Represents a group of built-in metrics for monitoring the performance and behavior of a flink + * agent job. This class is responsible for collecting and managing various metrics such as the + * number of events processed, the number of actions being executed, and the number of actions + * executed per second. + */ +public class BuiltInMetricGroup { + + private final Meter numOfEventProcessedPerSec; + + private final Counter numOfActionsExecuting; Review Comment: Same here. ########## runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetricGroup.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.agents.plan.AgentPlan; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; + +import java.util.HashMap; + +/** + * Represents a group of built-in metrics for monitoring the performance and behavior of a flink + * agent job. This class is responsible for collecting and managing various metrics such as the + * number of events processed, the number of actions being executed, and the number of actions + * executed per second. + */ +public class BuiltInMetricGroup { Review Comment: ```suggestion public class BuiltInMetrics { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
