luoluoyuyu commented on code in PR #3796: URL: https://github.com/apache/streampipes/pull/3796#discussion_r2455091844
########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.limiter; + +import org.apache.streampipes.commons.prometheus.spratelimiter.SpRateLimiterStats; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + private static final long TIMEOUT_MS = 1000; + private static final int PERMITS_PER_REQUEST = 1; + + private RateLimiter rateLimiter; + + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private AtomicInteger waitTimeCount = new AtomicInteger(0); + + private AtomicInteger currentQueueSize = new AtomicInteger(0); + + private SpRateLimiterStats stats; + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + /** + * Creates a rate limiter with default parameters. + * Default: 100 permits per second, 1000ms warmup period. + */ + public void createRateLimiter() { + createRateLimiter(DEFAULT_PERMITS_PER_SECOND, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created and scheduler initialized"); + } + + /** + * Creates a rate limiter with the specified permits per second. + * Uses default warmup period of 1000ms. + * + * @param permitsPerSecond The number of permits per second + */ + public void createRateLimiter(double permitsPerSecond) { + createRateLimiter(permitsPerSecond, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created with {} permits/sec and scheduler initialized", permitsPerSecond); + } + + public void initScheduledTasks() { + if (!schedulerInitialized) { + synchronized (SpRateLimiter.class) { + if (!schedulerInitialized) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::scheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void scheduledTask() { + this.stats = new SpRateLimiterStats(); + stats.setAverageWaitTime(this.rateLimiterAverageWaitTime); + stats.setQueueSize(this.currentQueueSize.get()); + stats.updateAllMetrics(); + } + + /** + * Creates a rate limiter with the specified parameters. + * + * @param permitsPerSecond The number of permits per second + * @param warmupPeriod The warmup period + * @param unit The time unit for the warmup period + * @throws IllegalArgumentException if parameters are invalid + */ + public void createRateLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + if (this.rateLimiter == null) { + validateParameters(permitsPerSecond, warmupPeriod, unit); + this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + LOG.info("RateLimiter created with {} permits per second, warmup period: {} {}", + permitsPerSecond, warmupPeriod, unit); + } else { + LOG.warn("RateLimiter already exists. Use setRate() to modify the rate instead."); + } + } + + /** + * Acquires a permit from the rate limiter for processing data, with timeout. + * Each request consumes exactly 1 permit regardless of data size. + * This provides simple and fair rate limiting based on request count. + * + * @param bytes The number of bytes to process (for logging purposes only) + * @return true if permit was acquired successfully, false if timeout occurred + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public boolean limit(long bytes) throws InterruptedException { + if (this.rateLimiter == null) { + LOG.warn("RateLimiter has not been initialized. Please call createRateLimiter() first."); + return false; + } + + long startTime = System.currentTimeMillis(); + + synchronized (this) { + currentQueueSize.incrementAndGet(); + } + + try { + if (rateLimiter.getRate() <= 0) { + LOG.warn("RateLimiter is set to zero or negative rate. No permits will be acquired."); + updateAverageWaitTime(ZERO_RATE_WAIT_TIME_MS); + try { + Thread.sleep(ZERO_RATE_WAIT_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return false; + } else { + // Each request consumes exactly 1 permit regardless of data size + long timeoutMs = TIMEOUT_MS; + boolean acquired = rateLimiter.tryAcquire(PERMITS_PER_REQUEST, timeoutMs, TimeUnit.MILLISECONDS); + + long waitTime = System.currentTimeMillis() - startTime; + updateAverageWaitTime(waitTime); + + if (!acquired) { + LOG.warn("Failed to acquire permit for {} bytes within {} ms timeout (rate: {} requests/sec)", + bytes, timeoutMs, rateLimiter.getRate()); + } else { + LOG.debug("Successfully acquired permit for {} bytes in {} ms (rate: {} requests/sec)", + bytes, waitTime, rateLimiter.getRate()); + } + + return acquired; + } + } finally { + synchronized (this) { Review Comment: delete? ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.limiter; + +import org.apache.streampipes.commons.prometheus.spratelimiter.SpRateLimiterStats; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + private static final long TIMEOUT_MS = 1000; + private static final int PERMITS_PER_REQUEST = 1; + + private RateLimiter rateLimiter; + + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private AtomicInteger waitTimeCount = new AtomicInteger(0); + + private AtomicInteger currentQueueSize = new AtomicInteger(0); + + private SpRateLimiterStats stats; + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + /** + * Creates a rate limiter with default parameters. + * Default: 100 permits per second, 1000ms warmup period. + */ + public void createRateLimiter() { + createRateLimiter(DEFAULT_PERMITS_PER_SECOND, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created and scheduler initialized"); + } + + /** + * Creates a rate limiter with the specified permits per second. + * Uses default warmup period of 1000ms. + * + * @param permitsPerSecond The number of permits per second + */ + public void createRateLimiter(double permitsPerSecond) { + createRateLimiter(permitsPerSecond, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created with {} permits/sec and scheduler initialized", permitsPerSecond); + } + + public void initScheduledTasks() { + if (!schedulerInitialized) { + synchronized (SpRateLimiter.class) { + if (!schedulerInitialized) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::scheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void scheduledTask() { + this.stats = new SpRateLimiterStats(); + stats.setAverageWaitTime(this.rateLimiterAverageWaitTime); + stats.setQueueSize(this.currentQueueSize.get()); + stats.updateAllMetrics(); + } + + /** + * Creates a rate limiter with the specified parameters. + * + * @param permitsPerSecond The number of permits per second + * @param warmupPeriod The warmup period + * @param unit The time unit for the warmup period + * @throws IllegalArgumentException if parameters are invalid + */ + public void createRateLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + if (this.rateLimiter == null) { + validateParameters(permitsPerSecond, warmupPeriod, unit); + this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + LOG.info("RateLimiter created with {} permits per second, warmup period: {} {}", + permitsPerSecond, warmupPeriod, unit); + } else { + LOG.warn("RateLimiter already exists. Use setRate() to modify the rate instead."); + } + } + + /** + * Acquires a permit from the rate limiter for processing data, with timeout. + * Each request consumes exactly 1 permit regardless of data size. + * This provides simple and fair rate limiting based on request count. + * + * @param bytes The number of bytes to process (for logging purposes only) + * @return true if permit was acquired successfully, false if timeout occurred + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public boolean limit(long bytes) throws InterruptedException { + if (this.rateLimiter == null) { + LOG.warn("RateLimiter has not been initialized. Please call createRateLimiter() first."); + return false; + } + + long startTime = System.currentTimeMillis(); + + synchronized (this) { Review Comment: delete? ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.limiter; + +import org.apache.streampipes.commons.prometheus.spratelimiter.SpRateLimiterStats; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + private static final long TIMEOUT_MS = 1000; + private static final int PERMITS_PER_REQUEST = 1; + + private RateLimiter rateLimiter; + + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private AtomicInteger waitTimeCount = new AtomicInteger(0); + + private AtomicInteger currentQueueSize = new AtomicInteger(0); + + private SpRateLimiterStats stats; + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + /** + * Creates a rate limiter with default parameters. + * Default: 100 permits per second, 1000ms warmup period. + */ + public void createRateLimiter() { + createRateLimiter(DEFAULT_PERMITS_PER_SECOND, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created and scheduler initialized"); + } + + /** + * Creates a rate limiter with the specified permits per second. + * Uses default warmup period of 1000ms. + * + * @param permitsPerSecond The number of permits per second + */ + public void createRateLimiter(double permitsPerSecond) { + createRateLimiter(permitsPerSecond, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created with {} permits/sec and scheduler initialized", permitsPerSecond); + } + + public void initScheduledTasks() { + if (!schedulerInitialized) { + synchronized (SpRateLimiter.class) { + if (!schedulerInitialized) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::scheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void scheduledTask() { + this.stats = new SpRateLimiterStats(); + stats.setAverageWaitTime(this.rateLimiterAverageWaitTime); + stats.setQueueSize(this.currentQueueSize.get()); + stats.updateAllMetrics(); + } + + /** + * Creates a rate limiter with the specified parameters. + * + * @param permitsPerSecond The number of permits per second + * @param warmupPeriod The warmup period + * @param unit The time unit for the warmup period + * @throws IllegalArgumentException if parameters are invalid + */ + public void createRateLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + if (this.rateLimiter == null) { + validateParameters(permitsPerSecond, warmupPeriod, unit); + this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + LOG.info("RateLimiter created with {} permits per second, warmup period: {} {}", + permitsPerSecond, warmupPeriod, unit); + } else { + LOG.warn("RateLimiter already exists. Use setRate() to modify the rate instead."); + } + } + + /** + * Acquires a permit from the rate limiter for processing data, with timeout. + * Each request consumes exactly 1 permit regardless of data size. + * This provides simple and fair rate limiting based on request count. + * + * @param bytes The number of bytes to process (for logging purposes only) + * @return true if permit was acquired successfully, false if timeout occurred + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public boolean limit(long bytes) throws InterruptedException { + if (this.rateLimiter == null) { + LOG.warn("RateLimiter has not been initialized. Please call createRateLimiter() first."); + return false; + } + + long startTime = System.currentTimeMillis(); + + synchronized (this) { + currentQueueSize.incrementAndGet(); + } + + try { + if (rateLimiter.getRate() <= 0) { + LOG.warn("RateLimiter is set to zero or negative rate. No permits will be acquired."); + updateAverageWaitTime(ZERO_RATE_WAIT_TIME_MS); + try { + Thread.sleep(ZERO_RATE_WAIT_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return false; Review Comment: delete ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java: ########## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.memorymanager; + +import org.apache.streampipes.commons.prometheus.spmemorymanager.SpMemoryManagerStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +/** + * A singleton memory manager implementation for StreamPipes extensions. + * This class provides memory allocation and deallocation functionality with + * blocking behavior when insufficient memory is available. + */ +public enum SpMemoryManager { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpMemoryManager.class); + + // Configuration constants + private static final long DEFAULT_INITIAL_MEMORY = 10L * 1024 * 1024 * 1024; // 10 GB + private static final long WAIT_TIMEOUT_MS = 1000L; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final int BYTES_TO_MB = 1024 * 1024; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + + // Memory control thresholds + private static final double MEMORY_USAGE_THRESHOLD = 0.9; // 90% threshold for blocking + private static final double MEMORY_WARNING_THRESHOLD = 0.8; // 80% warning threshold + + private long freeMemory; + + private double memoryUsedBytes = 0.0; + private double memoryAllocationRate = 0.0; + + private long lastAllocationTime = System.currentTimeMillis(); + private long totalAllocatedBytes = 0L; + + // Memory control state + private volatile boolean memoryBlocked = false; + private volatile boolean memoryWarningActive = false; + + private SpMemoryManagerStats stats = new SpMemoryManagerStats(); + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + SpMemoryManager() { + this.freeMemory = DEFAULT_INITIAL_MEMORY; + initScheduledTask(); + } + + public void initScheduledTask() { + if (!schedulerInitialized) { + synchronized (SpMemoryManager.class) { + if (!schedulerInitialized) { + scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(this::scheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, java.util.concurrent.TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void scheduledTask() { + this.stats = new SpMemoryManagerStats(); + stats.setAllocationRate(this.getMEMORY_ALLOCATION_RATE()); + stats.setMemoryUsedBytes(this.getMEMORY_USED_BYTES()); + stats.updateAllMetrics(); + + // Check memory usage thresholds + checkMemoryThresholds(); + } + + /** + * Checks memory usage against configured thresholds and updates blocking state. + */ + private void checkMemoryThresholds() { + double memoryUsageRatio = (double) getAllocatedMemory() / DEFAULT_INITIAL_MEMORY; + + if (memoryUsageRatio >= MEMORY_USAGE_THRESHOLD) { + if (!memoryBlocked) { + memoryBlocked = true; + LOG.warn("Memory usage reached {}% threshold. Blocking data consumption.", + (int)(MEMORY_USAGE_THRESHOLD * 100)); + } + } else if (memoryUsageRatio <= MEMORY_WARNING_THRESHOLD) { + if (memoryBlocked) { + memoryBlocked = false; + LOG.info("Memory usage dropped below {}% threshold. Resuming data consumption.", + (int)(MEMORY_WARNING_THRESHOLD * 100)); + } + } + + // Update warning state + memoryWarningActive = memoryUsageRatio >= MEMORY_WARNING_THRESHOLD + && memoryUsageRatio < MEMORY_USAGE_THRESHOLD; + } + + /** + * Allocates the specified amount of memory. + * If insufficient memory is available, this method will block until memory becomes available. + * If memory usage is above threshold, allocation will be blocked. + * + * @param bytes The number of bytes to allocate + * @throws IllegalArgumentException if bytes is non-positive + */ + public void allocate(long bytes) { + if (bytes <= 0) { + LOG.warn("Attempted to allocate non-positive memory: {} bytes", bytes); + return; + } + + // Check if memory is blocked due to threshold + if (memoryBlocked) { + LOG.warn("Memory allocation blocked due to high memory usage threshold. " + + "Current usage: {}%", getMemoryUsagePercentage()); + return; + } + + // Loop until enough memory is available + while (true) { + long currentFree = freeMemory; + long newFreeMemory = currentFree - bytes; + + if (newFreeMemory >= 0) { + // Sufficient memory available, perform allocation + freeMemory = newFreeMemory; + + long allocatedMemory = DEFAULT_INITIAL_MEMORY - freeMemory; + memoryUsedBytes = (double) allocatedMemory; + + updateAllocationRate(bytes); + + return; + } else { + // Insufficient memory, block and wait + LOG.warn("Not enough free memory to allocate {} bytes. Current free memory: {} bytes. " + + "Blocking allocation.", bytes, currentFree); + + synchronized (this) { + try { + // Block and wait for memory to be freed + wait(WAIT_TIMEOUT_MS); Review Comment: this.wait? ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java: ########## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.memorymanager; + +import org.apache.streampipes.commons.prometheus.spmemorymanager.SpMemoryManagerStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +/** + * A singleton memory manager implementation for StreamPipes extensions. + * This class provides memory allocation and deallocation functionality with + * blocking behavior when insufficient memory is available. + */ +public enum SpMemoryManager { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpMemoryManager.class); + + // Configuration constants + private static final long DEFAULT_INITIAL_MEMORY = 10L * 1024 * 1024 * 1024; // 10 GB + private static final long WAIT_TIMEOUT_MS = 1000L; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final int BYTES_TO_MB = 1024 * 1024; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + + // Memory control thresholds + private static final double MEMORY_USAGE_THRESHOLD = 0.9; // 90% threshold for blocking + private static final double MEMORY_WARNING_THRESHOLD = 0.8; // 80% warning threshold + + private long freeMemory; + + private double memoryUsedBytes = 0.0; + private double memoryAllocationRate = 0.0; + + private long lastAllocationTime = System.currentTimeMillis(); + private long totalAllocatedBytes = 0L; + + // Memory control state + private volatile boolean memoryBlocked = false; + private volatile boolean memoryWarningActive = false; + + private SpMemoryManagerStats stats = new SpMemoryManagerStats(); + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + SpMemoryManager() { + this.freeMemory = DEFAULT_INITIAL_MEMORY; + initScheduledTask(); + } + + public void initScheduledTask() { + if (!schedulerInitialized) { + synchronized (SpMemoryManager.class) { + if (!schedulerInitialized) { + scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(this::scheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, java.util.concurrent.TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void scheduledTask() { + this.stats = new SpMemoryManagerStats(); + stats.setAllocationRate(this.getMEMORY_ALLOCATION_RATE()); + stats.setMemoryUsedBytes(this.getMEMORY_USED_BYTES()); + stats.updateAllMetrics(); + + // Check memory usage thresholds + checkMemoryThresholds(); + } + + /** + * Checks memory usage against configured thresholds and updates blocking state. + */ + private void checkMemoryThresholds() { + double memoryUsageRatio = (double) getAllocatedMemory() / DEFAULT_INITIAL_MEMORY; + + if (memoryUsageRatio >= MEMORY_USAGE_THRESHOLD) { + if (!memoryBlocked) { + memoryBlocked = true; + LOG.warn("Memory usage reached {}% threshold. Blocking data consumption.", + (int)(MEMORY_USAGE_THRESHOLD * 100)); + } + } else if (memoryUsageRatio <= MEMORY_WARNING_THRESHOLD) { + if (memoryBlocked) { + memoryBlocked = false; + LOG.info("Memory usage dropped below {}% threshold. Resuming data consumption.", + (int)(MEMORY_WARNING_THRESHOLD * 100)); + } + } + + // Update warning state + memoryWarningActive = memoryUsageRatio >= MEMORY_WARNING_THRESHOLD + && memoryUsageRatio < MEMORY_USAGE_THRESHOLD; + } + + /** + * Allocates the specified amount of memory. + * If insufficient memory is available, this method will block until memory becomes available. + * If memory usage is above threshold, allocation will be blocked. + * + * @param bytes The number of bytes to allocate + * @throws IllegalArgumentException if bytes is non-positive + */ + public void allocate(long bytes) { + if (bytes <= 0) { + LOG.warn("Attempted to allocate non-positive memory: {} bytes", bytes); + return; + } + + // Check if memory is blocked due to threshold + if (memoryBlocked) { + LOG.warn("Memory allocation blocked due to high memory usage threshold. " + + "Current usage: {}%", getMemoryUsagePercentage()); + return; + } + + // Loop until enough memory is available + while (true) { + long currentFree = freeMemory; + long newFreeMemory = currentFree - bytes; + + if (newFreeMemory >= 0) { + // Sufficient memory available, perform allocation + freeMemory = newFreeMemory; + + long allocatedMemory = DEFAULT_INITIAL_MEMORY - freeMemory; + memoryUsedBytes = (double) allocatedMemory; + + updateAllocationRate(bytes); + + return; + } else { + // Insufficient memory, block and wait + LOG.warn("Not enough free memory to allocate {} bytes. Current free memory: {} bytes. " + + "Blocking allocation.", bytes, currentFree); + + synchronized (this) { + try { + // Block and wait for memory to be freed + wait(WAIT_TIMEOUT_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Memory allocation blocking was interrupted"); + return; + } + } + } + } + } + + /** + * Frees the specified amount of memory. + * This will notify any threads waiting for memory allocation. + * + * @param bytes The number of bytes to free + * @throws IllegalArgumentException if bytes is non-positive + */ + public void free(long bytes) { + if (bytes <= 0) { + LOG.warn("Attempted to free non-positive memory: {} bytes", bytes); + return; + } + + synchronized (this) { + freeMemory += bytes; Review Comment: use AtomicInt ########## streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.limiter; + +import org.apache.streampipes.commons.prometheus.spratelimiter.SpRateLimiterStats; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A singleton rate limiter implementation for StreamPipes extensions. + * This class provides rate limiting functionality using Google Guava's RateLimiter. + * It supports configurable permits per second and warmup periods. + */ +public enum SpRateLimiter { + + INSTANCE; + + private static final Logger LOG = LoggerFactory.getLogger(SpRateLimiter.class); + + // Configuration constants + private static final double DEFAULT_PERMITS_PER_SECOND = 100.0; + private static final long DEFAULT_WARMUP_PERIOD = 1000L; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final int SCHEDULER_INITIAL_DELAY_SECONDS = 0; + private static final int SCHEDULER_PERIOD_SECONDS = 15; + private static final long ZERO_RATE_WAIT_TIME_MS = 1000L; + private static final int STATS_RESET_THRESHOLD = 1000; + private static final int STATS_RESET_FACTOR = 999; + private static final int STATS_RESET_DIVISOR = 1000; + private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; + private static final long TIMEOUT_MS = 1000; + private static final int PERMITS_PER_REQUEST = 1; + + private RateLimiter rateLimiter; + + private double rateLimiterAverageWaitTime = 0.0; + + private long totalWaitTime = 0L; + private AtomicInteger waitTimeCount = new AtomicInteger(0); + + private AtomicInteger currentQueueSize = new AtomicInteger(0); + + private SpRateLimiterStats stats; + private static volatile boolean schedulerInitialized = false; + private static ScheduledExecutorService scheduler; + + /** + * Creates a rate limiter with default parameters. + * Default: 100 permits per second, 1000ms warmup period. + */ + public void createRateLimiter() { + createRateLimiter(DEFAULT_PERMITS_PER_SECOND, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created and scheduler initialized"); + } + + /** + * Creates a rate limiter with the specified permits per second. + * Uses default warmup period of 1000ms. + * + * @param permitsPerSecond The number of permits per second + */ + public void createRateLimiter(double permitsPerSecond) { + createRateLimiter(permitsPerSecond, DEFAULT_WARMUP_PERIOD, DEFAULT_TIME_UNIT); + initScheduledTasks(); + LOG.info("RateLimiter created with {} permits/sec and scheduler initialized", permitsPerSecond); + } + + public void initScheduledTasks() { + if (!schedulerInitialized) { + synchronized (SpRateLimiter.class) { + if (!schedulerInitialized) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::scheduledTask, SCHEDULER_INITIAL_DELAY_SECONDS, SCHEDULER_PERIOD_SECONDS, TimeUnit.SECONDS); + schedulerInitialized = true; + } + } + } + } + + public void scheduledTask() { + this.stats = new SpRateLimiterStats(); + stats.setAverageWaitTime(this.rateLimiterAverageWaitTime); + stats.setQueueSize(this.currentQueueSize.get()); + stats.updateAllMetrics(); + } + + /** + * Creates a rate limiter with the specified parameters. + * + * @param permitsPerSecond The number of permits per second + * @param warmupPeriod The warmup period + * @param unit The time unit for the warmup period + * @throws IllegalArgumentException if parameters are invalid + */ + public void createRateLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + if (this.rateLimiter == null) { + validateParameters(permitsPerSecond, warmupPeriod, unit); + this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + LOG.info("RateLimiter created with {} permits per second, warmup period: {} {}", Review Comment: rateLimiter.setRate(totalmem*70%) ########## streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java: ########## @@ -117,6 +122,10 @@ public void process(Map<String, Object> rawEvent, String topicName) { increaseCounter(sourceInfo.getSourceId()); } catch (RuntimeException e) { addError(e); + } catch (InterruptedException e) { + throw new SpRuntimeException(e); Review Comment: SetInterruoted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
