luoluoyuyu commented on code in PR #3796: URL: https://github.com/apache/streampipes/pull/3796#discussion_r2450167845
########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/Limiter/SpRateLimiter.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.Limiter; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.streampipes.commons.prometheus.spRateLimiter.SpRateLimiterStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + + private RateLimiter rateLimiter; + + private double rateLimiterQueueSize = 0.0; + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private int waitTimeCount = 0; + + private volatile int currentQueueSize = 0; Review Comment: Please use AtomicInteger type to complete the counting here ########## streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spRateLimiter/SpRateLimiterMetrics.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.commons.prometheus.spRateLimiter; + +import io.prometheus.client.Gauge; +import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry; + +/** + * Rate Limiter Metrics Manager + */ +public class SpRateLimiterMetrics { + + public static final Gauge RATE_LIMITER_QUEUE_SIZE = StreamPipesCollectorRegistry.registerGauge( + "sp_rate_limiter_queue_size", + "Current size of the waiting queue" + ); + + public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME = StreamPipesCollectorRegistry.registerGauge( + "sp_rate_limiter_average_wait_time_seconds", + "Average wait time for permit acquisition in seconds" + ); + + public static void updateCoreMetrics(double queueSize, double averageWaitTime) { + double safeQueueSize = Math.max(0.0, Math.min(queueSize, 10000.0)); + double safeWaitTime = Math.max(0.0, Math.min(averageWaitTime, 3600.0)); Review Comment: Shouldn't there be any need to set boundaries here? ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/Limiter/SpRateLimiter.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.Limiter; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.streampipes.commons.prometheus.spRateLimiter.SpRateLimiterStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + + private RateLimiter rateLimiter; + + private double rateLimiterQueueSize = 0.0; + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private int waitTimeCount = 0; + + private volatile int currentQueueSize = 0; + + private SpRateLimiterStats stats; + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + /** + * Creates a rate limiter with default parameters. + * Default: 100 permits per second, 1000ms warmup period. + */ + public void createRateLimiter() { + createRateLimiter(DEFAULT_PERMITS_PER_SECOND, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created and scheduler initialized"); + } + + /** + * Creates a rate limiter with the specified permits per second. + * Uses default warmup period of 1000ms. + * + * @param permitsPerSecond The number of permits per second + */ + public void createRateLimiter(double permitsPerSecond) { + createRateLimiter(permitsPerSecond, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created with {} permits/sec and scheduler initialized", permitsPerSecond); + } + + public void initScheduledTasks() { + if (!schedulerInitialized) { + synchronized (SpRateLimiter.class) { + if (!schedulerInitialized) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::ScheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void ScheduledTask() { + this.stats = new SpRateLimiterStats(); + stats.setAverageWaitTime(this.rateLimiterAverageWaitTime); + stats.setQueueSize(this.rateLimiterQueueSize); + stats.updateAllMetrics(); + } + + /** + * Creates a rate limiter with the specified parameters. + * + * @param permitsPerSecond The number of permits per second + * @param warmupPeriod The warmup period + * @param unit The time unit for the warmup period + * @throws IllegalArgumentException if parameters are invalid + */ + public void createRateLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + if (this.rateLimiter == null) { + validateParameters(permitsPerSecond, warmupPeriod, unit); + this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + LOG.info("RateLimiter created with {} permits per second, warmup period: {} {}", + permitsPerSecond, warmupPeriod, unit); + } else { + LOG.warn("RateLimiter already exists. Use setRate() to modify the rate instead."); + } + } + + /** + * Acquires a permit from the rate limiter, blocking if necessary. + * If the rate limiter is not initialized, logs a warning and returns immediately. + * If the rate is zero or negative, logs a warning and sleeps for 1 second. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void limit() throws InterruptedException { + if (this.rateLimiter == null) { + LOG.warn("RateLimiter has not been initialized. Please call createRateLimiter() first."); + return; + } + + long startTime = System.currentTimeMillis(); + + synchronized (this) { + currentQueueSize++; + rateLimiterQueueSize = currentQueueSize; + } + + try { + if (rateLimiter.getRate() <= 0) { + LOG.warn("RateLimiter is set to zero or negative rate. No permits will be acquired."); + updateAverageWaitTime(ZERO_RATE_WAIT_TIME_MS); + Thread.sleep(ZERO_RATE_WAIT_TIME_MS); + } else { + this.rateLimiter.acquire(); + long waitTime = System.currentTimeMillis() - startTime; Review Comment: The Rate here should be used to limit the speed limit, so it is the value set at initialization, and whether this function ```tryAcquire(int permits, long timeout, TimeUnit unit)``` should be used. Permits is the byte length at runtime. ########## streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spMemoryManager/SpMemoryManagerMetrics.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.commons.prometheus.spMemoryManager; + +import io.prometheus.client.Gauge; +import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry; + +/** + * Memory Manager Metrics Manager + */ +public class SpMemoryManagerMetrics { + + public static final Gauge MEMORY_USED_BYTES = StreamPipesCollectorRegistry.registerGauge( + "sp_memory_used_bytes", + "Amount of memory used in bytes" + ); + + public static final Gauge MEMORY_ALLOCATION_RATE = StreamPipesCollectorRegistry.registerGauge( + "sp_memory_allocation_rate_bytes_per_second", + "Memory allocation rate in bytes per second" + ); + + public static void updateCoreMetrics(double usedMemory, double totalMemory, double allocationRate) { + double safeUsedMemory = Math.max(0.0, Math.min(usedMemory, totalMemory)); + double safeAllocationRate = Math.max(0.0, Math.min(allocationRate, 1e12)); Review Comment: Please check whether the logic here is usedMemory/totalMemory, and whether there are any problems with other logic ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/Limiter/SpRateLimiter.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.Limiter; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.streampipes.commons.prometheus.spRateLimiter.SpRateLimiterStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + + private RateLimiter rateLimiter; + + private double rateLimiterQueueSize = 0.0; + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private int waitTimeCount = 0; + + private volatile int currentQueueSize = 0; + + private SpRateLimiterStats stats; + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + /** + * Creates a rate limiter with default parameters. + * Default: 100 permits per second, 1000ms warmup period. + */ + public void createRateLimiter() { + createRateLimiter(DEFAULT_PERMITS_PER_SECOND, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created and scheduler initialized"); + } + + /** + * Creates a rate limiter with the specified permits per second. + * Uses default warmup period of 1000ms. + * + * @param permitsPerSecond The number of permits per second + */ + public void createRateLimiter(double permitsPerSecond) { + createRateLimiter(permitsPerSecond, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created with {} permits/sec and scheduler initialized", permitsPerSecond); + } + + public void initScheduledTasks() { + if (!schedulerInitialized) { + synchronized (SpRateLimiter.class) { + if (!schedulerInitialized) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::ScheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void ScheduledTask() { + this.stats = new SpRateLimiterStats(); + stats.setAverageWaitTime(this.rateLimiterAverageWaitTime); + stats.setQueueSize(this.rateLimiterQueueSize); + stats.updateAllMetrics(); + } + + /** + * Creates a rate limiter with the specified parameters. + * + * @param permitsPerSecond The number of permits per second + * @param warmupPeriod The warmup period + * @param unit The time unit for the warmup period + * @throws IllegalArgumentException if parameters are invalid + */ + public void createRateLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + if (this.rateLimiter == null) { + validateParameters(permitsPerSecond, warmupPeriod, unit); + this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + LOG.info("RateLimiter created with {} permits per second, warmup period: {} {}", + permitsPerSecond, warmupPeriod, unit); + } else { + LOG.warn("RateLimiter already exists. Use setRate() to modify the rate instead."); + } + } + + /** + * Acquires a permit from the rate limiter, blocking if necessary. + * If the rate limiter is not initialized, logs a warning and returns immediately. + * If the rate is zero or negative, logs a warning and sleeps for 1 second. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void limit() throws InterruptedException { + if (this.rateLimiter == null) { + LOG.warn("RateLimiter has not been initialized. Please call createRateLimiter() first."); + return; + } + + long startTime = System.currentTimeMillis(); + + synchronized (this) { + currentQueueSize++; + rateLimiterQueueSize = currentQueueSize; Review Comment: Should only one variable be used to calculate the size? The other one might be redundant. ########## streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java: ########## @@ -52,7 +52,11 @@ private void initListener() { consumer.setMessageListener(message -> { if (message instanceof BytesMessage) { ByteSequence bs = ((ActiveMQBytesMessage) message).getContent(); - eventProcessor.onEvent(bs.getData()); + try { + eventProcessor.onEvent(bs.getData()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } Review Comment: Please refer to ```https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html``` to handle interrupt exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
