Copilot commented on code in PR #13723: URL: https://github.com/apache/skywalking/pull/13723#discussion_r2883799809
########## oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/counter/CounterWindow.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl.counter; + +import com.google.common.collect.ImmutableMap; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * CounterWindow stores a series of counter samples in order to calculate the increase + * or instant rate of increase. + * + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@ToString +@EqualsAndHashCode +public class CounterWindow { + + public static final CounterWindow INSTANCE = new CounterWindow(); + + private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>(); + private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>(); + + public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) { + ID id = new ID(name, labels); + Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>()); + synchronized (window) { + window.offer(Tuple.of(now, value)); + long waterLevel = now - windowSize; + Tuple2<Long, Double> peek = window.peek(); + if (peek._1 > waterLevel) { + return peek; + } + + Tuple2<Long, Double> result = peek; + while (peek._1 < waterLevel) { + result = window.poll(); + peek = window.element(); + } Review Comment: The `PriorityQueue<>` is created without a comparator, but `Tuple2<Long, Double>` is not `Comparable`, which will fail at runtime when ordering is needed. Also, `window.element()` can throw `NoSuchElementException` if the queue becomes empty after `poll()` (e.g., a single stale element). Use a `PriorityQueue` with a comparator on the timestamp (`_1`) and guard the loop by checking `peek == null` after polling/peeking. ########## oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/counter/CounterWindow.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl.counter; + +import com.google.common.collect.ImmutableMap; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * CounterWindow stores a series of counter samples in order to calculate the increase + * or instant rate of increase. + * + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@ToString +@EqualsAndHashCode +public class CounterWindow { + + public static final CounterWindow INSTANCE = new CounterWindow(); + + private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>(); + private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>(); + + public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) { + ID id = new ID(name, labels); + Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>()); + synchronized (window) { + window.offer(Tuple.of(now, value)); + long waterLevel = now - windowSize; + Tuple2<Long, Double> peek = window.peek(); + if (peek._1 > waterLevel) { + return peek; + } + + Tuple2<Long, Double> result = peek; + while (peek._1 < waterLevel) { + result = window.poll(); + peek = window.element(); + } + + // Choose the closed slot to the expected timestamp + if (waterLevel - result._1 <= peek._1 - waterLevel) { + return result; + } + + return peek; + } + } + + public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> labels, Double value, long now) { + ID id = new ID(name, labels); + + Tuple2<Long, Double> element = Tuple.of(now, value); + Tuple2<Long, Double> result = lastElementMap.put(id, element); + if (result == null) { + return element; + } + return result; + } + + public void reset() { + windows.clear(); Review Comment: `reset()` clears `windows` but leaves `lastElementMap` intact, so subsequent `pop()` calls can still see stale last elements from before reset. Clear both maps to fully reset the counter state. ```suggestion windows.clear(); lastElementMap.clear(); ``` ########## oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/Expression.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl; + +import java.util.Map; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Wraps a compiled {@link MalExpression} with runtime state management. + * + * <p>Two-phase usage: + * <ul> + * <li>{@link #parse()} — returns compile-time {@link ExpressionMetadata} extracted from the AST. + * Called once at startup by {@link org.apache.skywalking.oap.meter.analyzer.v2.Analyzer#build} + * to discover sample names, scope type, aggregation labels, and metric type.</li> + * <li>{@link #run(Map)} — executes the compiled expression on actual sample data. + * Called at every ingestion cycle. Pure computation, no side effects.</li> + * </ul> + */ +@Slf4j +@ToString(of = {"literal"}) +public class Expression { + + private final String metricName; + private final String literal; + private final MalExpression expression; + + public Expression(final String metricName, final String literal, final MalExpression expression) { + this.metricName = metricName; + this.literal = literal; + this.expression = expression; + } + + /** + * Returns compile-time metadata extracted from the expression AST. + */ + public ExpressionMetadata parse() { + final ExpressionMetadata metadata = expression.metadata(); + if (metadata.getScopeType() == null) { + throw new ExpressionParsingException( + literal + ": one of service(), instance() or endpoint() should be invoke"); Review Comment: The error message has a grammatical issue (“should be invoke”). Consider changing it to something like “should be invoked” and (optionally) include the metric name to make startup failures easier to diagnose. ```suggestion literal + " (metric: " + metricName + "): one of service(), instance() or endpoint() should be invoked"); ``` ########## oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/EntityDescription/EntityDescription.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl.EntityDescription; Review Comment: Java package names should be all-lowercase. Using `...dsl.EntityDescription` risks portability issues on case-insensitive filesystems and violates standard conventions. Prefer renaming the package (and folder) to a lowercase form (e.g., `...dsl.entity` or `...dsl.entitydescription`). ```suggestion package org.apache.skywalking.oap.meter.analyzer.v2.dsl.entitydescription; ``` ########## oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/v2/dsl/spec/sink/SamplerSpec.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.skywalking.oap.log.analyzer.v2.dsl.ExecutionContext; +import org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.AbstractSpec; +import org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink.sampler.RateLimitingSampler; +import org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink.sampler.Sampler; +import org.apache.skywalking.oap.log.analyzer.v2.provider.LogAnalyzerModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +public class SamplerSpec extends AbstractSpec { + private final Map<String, Sampler> rateLimitSamplersByString; + private final Map<Integer, Sampler> possibilitySamplers; Review Comment: `possibilitySamplers` is declared but never used in this class (and there is no corresponding `possibility(...)` method). Either implement the possibility sampler path (to match docs/DSL capabilities) or remove the unused field until the feature is added. ########## oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/v2/dsl/spec/sink/sampler/PossibilitySampler.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink.sampler; + +import io.netty.util.internal.ThreadLocalRandom; Review Comment: `io.netty.util.internal.ThreadLocalRandom` is an internal Netty API and not intended for application use; it can change without compatibility guarantees. Prefer `java.util.concurrent.ThreadLocalRandom` (or `java.util.Random`/`SplittableRandom` depending on requirements). ```suggestion import java.util.concurrent.ThreadLocalRandom; ``` ########## oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/v2/dsl/spec/sink/sampler/PossibilitySampler.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink.sampler; + +import io.netty.util.internal.ThreadLocalRandom; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; + +@RequiredArgsConstructor +@Accessors(fluent = true) +@EqualsAndHashCode(of = {"percentage"}) +public class PossibilitySampler implements Sampler { + @Getter + private final int percentage; + + private final ThreadLocalRandom random = ThreadLocalRandom.current(); Review Comment: `io.netty.util.internal.ThreadLocalRandom` is an internal Netty API and not intended for application use; it can change without compatibility guarantees. Prefer `java.util.concurrent.ThreadLocalRandom` (or `java.util.Random`/`SplittableRandom` depending on requirements). ########## oap-server/analyzer/hierarchy/CLAUDE.md: ########## @@ -0,0 +1,118 @@ +# Hierarchy Rule Compiler + +Compiles hierarchy matching rule expressions into `BiFunction<Service, Service, Boolean>` implementation classes at runtime using ANTLR4 parsing and Javassist bytecode generation. + +## Compilation Workflow + +``` +Rule expression string (e.g., "{ (u, l) -> u.name == l.name }") + → HierarchyRuleScriptParser.parse(expression) [ANTLR4 lexer/parser → visitor] + → HierarchyRuleModel (immutable AST) + → HierarchyRuleClassGenerator.compile(ruleName, expression) + 1. classPool.makeClass() — create class implementing BiFunction + 2. generateApplyMethod(model) — emit Java source for apply(Object, Object) + 3. ctClass.toClass(HierarchyRulePackageHolder.class) — load via package anchor + → BiFunction<Service, Service, Boolean> instance +``` + +The generated class implements: +```java +Object apply(Object arg0, Object arg1) + // cast internally to Service and returns Boolean +``` + +No separate consumer/closure classes are needed — hierarchy rules are simple enough to compile into a single method body. + +## File Structure + +``` +oap-server/analyzer/hierarchy/ + src/main/antlr4/.../HierarchyRuleLexer.g4 — ANTLR4 lexer grammar + src/main/antlr4/.../HierarchyRuleParser.g4 — ANTLR4 parser grammar + + src/main/java/.../compiler/ + HierarchyRuleScriptParser.java — ANTLR4 facade: expression → AST + HierarchyRuleModel.java — Immutable AST model classes + HierarchyRuleClassGenerator.java — Javassist code generator + CompiledHierarchyRuleProvider.java — SPI provider: compiles rule expressions + hierarchy/rule/rt/ + HierarchyRulePackageHolder.java — Class loading anchor (empty marker) + + src/main/resources/META-INF/services/ + ...HierarchyDefinitionService$HierarchyRuleProvider — SPI registration + + src/test/java/.../compiler/ + HierarchyRuleScriptParserTest.java — 5 parser tests + HierarchyRuleClassGeneratorTest.java — 4 generator tests +``` + +## Package & Class Naming + +| Component | Package / Name | +|-----------|---------------| +| Parser/Model/Generator | `org.apache.skywalking.oap.server.core.config.compiler` | +| Generated classes | `org.apache.skywalking.oap.server.core.config.compiler.hierarchy.rule.rt.HierarchyRule_<N>` | +| Package holder | `org.apache.skywalking.oap.server.core.config.compiler.hierarchy.rule.rt.HierarchyRulePackageHolder` | +| SPI provider | `org.apache.skywalking.oap.server.core.config.compiler.CompiledHierarchyRuleProvider` | Review Comment: The documented packages don’t match the code in this PR (which uses `org.apache.skywalking.oap.server.core.config.v2.compiler...`). Updating these paths will help future maintainers locate the compiler/model/provider classes correctly. ```suggestion | Parser/Model/Generator | `org.apache.skywalking.oap.server.core.config.v2.compiler` | | Generated classes | `org.apache.skywalking.oap.server.core.config.v2.compiler.hierarchy.rule.rt.HierarchyRule_<N>` | | Package holder | `org.apache.skywalking.oap.server.core.config.v2.compiler.hierarchy.rule.rt.HierarchyRulePackageHolder` | | SPI provider | `org.apache.skywalking.oap.server.core.config.v2.compiler.CompiledHierarchyRuleProvider` | ``` ########## docs/en/changes/changes.md: ########## @@ -1,6 +1,7 @@ ## 10.4.0 #### Project +* Remove Maven CI-friendly `${revision}` property and `flatten-maven-plugin`; use hardcoded version numbers in all POMs. Review Comment: This change log entry appears unrelated to the stated purpose of this PR (Groovy DSL runtime replacement with ANTLR4 + Javassist). If the Maven revision/flatten removal is not a deliberate, user-relevant change bundled with this feature, it should be moved to a separate PR or replaced with a change entry that describes the DSL runtime/compiler migration. ```suggestion * Migrate Groovy-based OAL DSL runtime and compiler to an ANTLR4-based parser with Javassist bytecode generation, removing the Groovy runtime dependency. ``` ########## oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/ExpressionMetadata.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import lombok.Getter; +import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType; + +/** + * Immutable metadata extracted from a MAL expression at compile time. + * Replaces the ThreadLocal-based {@code ExpressionParsingContext} pattern. + */ +@Getter +public class ExpressionMetadata { + + private final List<String> samples; + private final ScopeType scopeType; + private final Set<String> scopeLabels; + private final Set<String> aggregationLabels; + private final DownsamplingType downsampling; + private final boolean isHistogram; + private final int[] percentiles; + + public ExpressionMetadata(final List<String> samples, + final ScopeType scopeType, + final Set<String> scopeLabels, + final Set<String> aggregationLabels, + final DownsamplingType downsampling, + final boolean isHistogram, + final int[] percentiles) { + this.samples = Collections.unmodifiableList(samples); + this.scopeType = scopeType; + this.scopeLabels = Collections.unmodifiableSet(scopeLabels); + this.aggregationLabels = Collections.unmodifiableSet(aggregationLabels); + this.downsampling = downsampling; + this.isHistogram = isHistogram; + this.percentiles = percentiles; + } Review Comment: This class is documented as “Immutable”, but it doesn’t make defensive copies of `samples`, `scopeLabels`, `aggregationLabels`, or the `percentiles` array. `Collections.unmodifiable*` only wraps the original collection, so external mutations will still affect this instance; arrays are always mutable. Copy inputs (e.g., `List.copyOf`, `Set.copyOf`, and `Arrays.copyOf`) to ensure immutability. ########## oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.skywalking.oap.meter.analyzer.v2.dsl.Sample; +import org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamily; +import org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamilyBuilder; + +/** + * Static helper methods called by v2-generated {@code MalExpression} classes. + * Keeps new runtime behaviour in the v2 compiler package, avoiding modifications + * to the shared {@link SampleFamily} class. + */ +public final class MalRuntimeHelper { + + private MalRuntimeHelper() { + } + + /** + * Groovy regex match ({@code =~}): returns a {@code String[][]} where each row is + * one match with group 0 (full match) and capture groups 1..N. + * Returns {@code null} if the pattern does not match, so that Groovy-style + * truthiness checks ({@code matcher ? matcher[0][1] : "unknown"}) work via null check. + */ + public static String[][] regexMatch(final String input, final String regex) { + if (input == null) { + return null; + } + final Matcher m = Pattern.compile(regex).matcher(input); Review Comment: `Pattern.compile(regex)` on every call can be expensive if regex matching is on a hot path (generated code may call this frequently). Consider caching compiled `Pattern` instances (e.g., a bounded cache keyed by `regex`) or generating code that pre-compiles constant patterns into static fields. ########## oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/v2/dsl/spec/sink/sampler/RateLimitingSampler.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink.sampler; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +@Accessors(fluent = true) +@EqualsAndHashCode(of = {"rpm"}) +public class RateLimitingSampler implements Sampler { + @Getter + @Setter + private volatile int rpm; + + private final AtomicInteger factor = new AtomicInteger(); + + private final ResetHandler resetHandler; + + public RateLimitingSampler(final ResetHandler resetHandler) { + this.resetHandler = resetHandler; + } + + @Override + public RateLimitingSampler start() { + resetHandler.start(this); + return this; + } + + @Override + public void close() { + resetHandler.close(this); + } + + @Override + public boolean sample() { + return factor.getAndIncrement() < rpm; + } + + @Override + public RateLimitingSampler reset() { + factor.set(0); + return this; + } + + @Slf4j + public static class ResetHandler { + private final List<Sampler> samplers = new ArrayList<>(); + + private volatile ScheduledFuture<?> future; + + private volatile boolean started = false; + + private synchronized void start(final Sampler sampler) { + samplers.add(sampler); + + if (!started) { + future = Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.MINUTES); + started = true; + } Review Comment: `newSingleThreadScheduledExecutor()` is created but never shut down. Cancelling the `ScheduledFuture` does not stop the executor thread, which can leak threads over time (especially if `ResetHandler` instances are created in multiple specs/tests). Store the `ScheduledExecutorService` and call `shutdown()`/`shutdownNow()` when `samplers` becomes empty, and recreate the executor on the next `start()`. ########## oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/v2/dsl/spec/sink/sampler/RateLimitingSampler.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.v2.dsl.spec.sink.sampler; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +@Accessors(fluent = true) +@EqualsAndHashCode(of = {"rpm"}) +public class RateLimitingSampler implements Sampler { + @Getter + @Setter + private volatile int rpm; + + private final AtomicInteger factor = new AtomicInteger(); + + private final ResetHandler resetHandler; + + public RateLimitingSampler(final ResetHandler resetHandler) { + this.resetHandler = resetHandler; + } + + @Override + public RateLimitingSampler start() { + resetHandler.start(this); + return this; + } + + @Override + public void close() { + resetHandler.close(this); + } + + @Override + public boolean sample() { + return factor.getAndIncrement() < rpm; + } + + @Override + public RateLimitingSampler reset() { + factor.set(0); + return this; + } + + @Slf4j + public static class ResetHandler { + private final List<Sampler> samplers = new ArrayList<>(); + + private volatile ScheduledFuture<?> future; + + private volatile boolean started = false; + + private synchronized void start(final Sampler sampler) { + samplers.add(sampler); + + if (!started) { + future = Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.MINUTES); + started = true; + } + } + + private synchronized void close(final Sampler sampler) { + samplers.remove(sampler); + + if (samplers.isEmpty() && future != null) { + future.cancel(true); + started = false; + } Review Comment: `newSingleThreadScheduledExecutor()` is created but never shut down. Cancelling the `ScheduledFuture` does not stop the executor thread, which can leak threads over time (especially if `ResetHandler` instances are created in multiple specs/tests). Store the `ScheduledExecutorService` and call `shutdown()`/`shutdownNow()` when `samplers` becomes empty, and recreate the executor on the next `start()`. ########## oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/v2/provider/LALConfigs.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.v2.provider; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.util.ResourceUtils; +import org.yaml.snakeyaml.Yaml; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.io.Files.getNameWithoutExtension; +import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank; +import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isEmpty; + +@Data +@Slf4j +public class LALConfigs { + private List<LALConfig> rules; + + public static List<LALConfigs> load(final String path, final List<String> files) throws Exception { + if (isEmpty(files)) { + return Collections.emptyList(); + } + + checkArgument(isNotBlank(path), "path cannot be blank"); + + try { + final File[] rules = ResourceUtils.getPathFiles(path); + + return Arrays.stream(rules) + .filter(File::isFile) + .filter(it -> { + //noinspection UnstableApiUsage + return files.contains(getNameWithoutExtension(it.getName())); + }) + .map(f -> { + try (final Reader r = new FileReader(f)) { + return new Yaml().<LALConfigs>loadAs(r, LALConfigs.class); + } catch (IOException e) { + log.debug("Failed to read file {}", f, e); + } + return null; Review Comment: If a config file can’t be read/parsed, this silently drops it (returns `null`) and continues, which can lead to missing rules with no startup failure. Since LAL compilation is intended to be fail-fast at boot, consider throwing a `ModuleStartException` (or rethrowing as a runtime exception) on `IOException` so configuration issues are not silently ignored. ```suggestion throw new RuntimeException("Failed to read LAL config file: " + f, e); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
