This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new e722c2fda Implement size-based file rotation for FileBasedEventLogger
#8415
e722c2fda is described below
commit e722c2fdadbe8153746f7f1d9fccc8d1d66187e2
Author: ANKIT KUMAR <[email protected]>
AuthorDate: Mon Mar 9 19:07:19 2026 +0530
Implement size-based file rotation for FileBasedEventLogger #8415
---
storm-client/src/jvm/org/apache/storm/Config.java | 14 ++
.../apache/storm/metric/FileBasedEventLogger.java | 116 +++++++++++++----
.../storm/metric/FileBasedEventLoggerTest.java | 144 +++++++++++++++++++++
3 files changed, 250 insertions(+), 24 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java
b/storm-client/src/jvm/org/apache/storm/Config.java
index a098e6cb6..e332b726b 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -477,6 +477,20 @@ public class Config extends HashMap<String, Object> {
@IsInteger
@IsPositiveNumber(includeZero = true)
public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS =
"topology.eventlogger.executors";
+ /**
+ * The maximum size in MB for the event logger file before it rotates.
+ * If not specified, a default of 100 MB is used.
+ */
+ @IsInteger
+ @IsPositiveNumber
+ public static final String TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB =
"topology.eventlogger.rotation.size.mb";
+ /**
+ * The maximum number of retained files for the event logger.
+ * If not specified, a default of 5 is used.
+ */
+ @IsInteger
+ @IsPositiveNumber
+ public static final String TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES =
"topology.eventlogger.max.retained.files";
/**
* The maximum amount of time given to the topology to fully process a
message emitted by a spout. If the message is not acked within
* this time frame, Storm will fail the message on the spout. Some spouts
implementations will then replay the message at a later time.
diff --git
a/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
b/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
index 64ee85cd8..f626b8396 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
@@ -19,21 +19,23 @@
package org.apache.storm.metric;
import java.io.BufferedWriter;
-import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.storm.Config;
import
org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,56 +43,74 @@ public class FileBasedEventLogger implements IEventLogger {
private static final Logger LOG =
LoggerFactory.getLogger(FileBasedEventLogger.class);
private static final int FLUSH_INTERVAL_MILLIS = 1000;
+ private static final long BYTES_PER_MB = 1024L * 1024L;
+ private static final int DEFAULT_ROTATION_SIZE_MB = 100;
+ private static final int DEFAULT_MAX_RETAINED_FILES = 5;
private Path eventLogPath;
private BufferedWriter eventLogWriter;
private ScheduledExecutorService flushScheduler;
private volatile boolean dirty = false;
+ private final Object writeLock = new Object();
+
+ // File rotation configs
+ private long maxFileSize;
+ private int maxRetainedFiles;
+ private long currentFileSize = 0;
private void initLogWriter(Path logFilePath) {
try {
LOG.info("logFilePath {}", logFilePath);
eventLogPath = logFilePath;
+
+ currentFileSize = Files.exists(eventLogPath) ?
Files.size(eventLogPath) : 0L;
+
eventLogWriter = Files.newBufferedWriter(eventLogPath,
StandardCharsets.UTF_8, StandardOpenOption.CREATE,
- StandardOpenOption.WRITE,
StandardOpenOption.APPEND);
+ StandardOpenOption.WRITE, StandardOpenOption.APPEND);
} catch (IOException e) {
LOG.error("Error setting up FileBasedEventLogger.", e);
throw new RuntimeException(e);
}
}
-
private void setUpFlushTask() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat("event-logger-flush-%d")
- .setDaemon(true)
- .build();
+ .setNameFormat("event-logger-flush-%d")
+ .setDaemon(true)
+ .build();
flushScheduler =
Executors.newSingleThreadScheduledExecutor(threadFactory);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
- if (dirty) {
- eventLogWriter.flush();
- dirty = false;
+ synchronized (writeLock) {
+ if (dirty && eventLogWriter != null) {
+ eventLogWriter.flush();
+ dirty = false;
+ }
}
- } catch (IOException ex) {
+ } catch (Exception ex) {
LOG.error("Error flushing " + eventLogPath, ex);
- throw new RuntimeException(ex);
}
}
};
- flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS,
FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS,
FLUSH_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
}
-
@Override
public void prepare(Map<String, Object> conf, Map<String, Object>
arguments, TopologyContext context) {
String stormId = context.getStormId();
int port = context.getThisWorkerPort();
+ int rotationSizeMb =
ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB),
+ DEFAULT_ROTATION_SIZE_MB);
+ this.maxFileSize = rotationSizeMb * BYTES_PER_MB;
+ this.maxRetainedFiles =
ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES),
+ DEFAULT_MAX_RETAINED_FILES);
+
/*
* Include the topology name & worker port in the file name so that
* multiple event loggers can log independently.
@@ -98,9 +118,11 @@ public class FileBasedEventLogger implements IEventLogger {
String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf,
stormId, port);
Path path = Paths.get(workersArtifactRoot, "events.log");
- File dir = path.toFile().getParentFile();
- if (!dir.exists()) {
- dir.mkdirs();
+ try {
+ Files.createDirectories(path.getParent());
+ } catch (IOException e) {
+ LOG.error("Failed to create directories for event logger", e);
+ throw new RuntimeException(e);
}
initLogWriter(path);
setUpFlushTask();
@@ -109,30 +131,76 @@ public class FileBasedEventLogger implements IEventLogger
{
@Override
public void log(EventInfo event) {
try {
- //TODO: file rotation
- eventLogWriter.write(buildLogMessage(event));
- eventLogWriter.newLine();
- dirty = true;
+ String logMessage = buildLogMessage(event);
+ int writeLength = logMessage.length() +
System.lineSeparator().length();
+
+ synchronized (writeLock) {
+ if (currentFileSize + writeLength > maxFileSize) {
+ rotateFiles();
+ }
+
+ if (eventLogWriter != null) {
+ eventLogWriter.write(logMessage);
+ eventLogWriter.newLine();
+ currentFileSize += writeLength;
+ dirty = true;
+ }
+ }
} catch (IOException ex) {
LOG.error("Error logging event {}", event, ex);
throw new RuntimeException(ex);
}
}
+ private void rotateFiles() throws IOException {
+ eventLogWriter.close();
+
+ // Delete any files that exceed maxRetainedFiles (e.g. if the config
was
+ // lowered)
+ int i = maxRetainedFiles;
+ while (Files.exists(Paths.get(eventLogPath.toString() + "." + i))) {
+ Files.delete(Paths.get(eventLogPath.toString() + "." + i));
+ i++;
+ }
+
+ // Shift existing rotated files
+ for (i = maxRetainedFiles - 1; i >= 1; i--) {
+ Path src = Paths.get(eventLogPath.toString() + "." + i);
+ Path dst = Paths.get(eventLogPath.toString() + "." + (i + 1));
+ if (Files.exists(src)) {
+ Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+
+ // Rename current events.log
+ if (Files.exists(eventLogPath)) {
+ Path dst = Paths.get(eventLogPath.toString() + ".1");
+ Files.move(eventLogPath, dst, StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ // Re-open writers to empty file
+ initLogWriter(eventLogPath);
+ currentFileSize = 0;
+ }
+
protected String buildLogMessage(EventInfo event) {
return event.toString();
}
@Override
public void close() {
- try {
- eventLogWriter.close();
+ closeFlushScheduler();
+ try {
+ synchronized (writeLock) {
+ if (eventLogWriter != null) {
+ eventLogWriter.close();
+ eventLogWriter = null;
+ }
+ }
} catch (IOException ex) {
LOG.error("Error closing event log.", ex);
}
-
- closeFlushScheduler();
}
private void closeFlushScheduler() {
diff --git
a/storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java
b/storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java
new file mode 100644
index 000000000..1db39b4ac
--- /dev/null
+++
b/storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FileBasedEventLoggerTest {
+
+ private Path tempDir;
+ private FileBasedEventLogger eventLogger;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ tempDir = Files.createTempDirectory("storm-eventlogger-test");
+ eventLogger = new FileBasedEventLogger();
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ eventLogger.close();
+ if (tempDir != null) {
+ Files.walk(tempDir)
+ .map(Path::toFile)
+ .forEach(File::delete);
+ tempDir.toFile().delete();
+ }
+ }
+
+ private TopologyContext mockTopologyContext() {
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getStormId()).thenReturn("test-topology-1");
+ when(context.getThisWorkerPort()).thenReturn(6700);
+ return context;
+ }
+
+ @Test
+ public void testFileRotation() throws IOException, InterruptedException {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.STORM_WORKERS_ARTIFACTS_DIR,
tempDir.toAbsolutePath().toString());
+ // We set rotation to be 1MB to trigger it easily, but we'll need to
write
+ // a lot. Alternatively, we can use a very small value, but we need an
int >= 1.
+ // Wait, Config is by MB. If we set it to 1, we still need to write
1MB.
+ // Let's reflection inject a smaller value for tests? No, Storm uses
config.
+ // We will just use `1` MB and write a large string a few times.
+ conf.put(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB, 1);
+ conf.put(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES, 2);
+
+ eventLogger.prepare(conf, new HashMap<>(), mockTopologyContext());
+
+ // 1 MB = 1048576 bytes
+ // We create an event message that is about 100KB, write it 11 times
to exceed 1MB.
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 100_000; i++) {
+ sb.append("A"); // 1 byte
+ }
+ String largeValue = sb.toString();
+
+ List<Object> values = new ArrayList<>();
+ values.add(largeValue);
+
+ // This toString() will add some bytes overhead, so each event is ~
100KB.
+ IEventLogger.EventInfo eventInfo = new IEventLogger.EventInfo(
+ System.currentTimeMillis(), "test-component", 1, "msgId",
values);
+
+ // Write 10 times -> ~1 MB
+ for (int i = 0; i < 10; i++) {
+ eventLogger.log(eventInfo);
+ }
+
+ // Wait a bit for flush if any (though rotation is synchronous in
write)
+ Thread.sleep(100);
+
+ Path expectedLogDir =
tempDir.resolve("test-topology-1").resolve("6700");
+ Path logFile = expectedLogDir.resolve("events.log");
+ Path logFile1 = expectedLogDir.resolve("events.log.1");
+ Path logFile2 = expectedLogDir.resolve("events.log.2");
+
+ // The first 10 writes should be in one file, almost 1 MB.
+ assertTrue(Files.exists(logFile));
+
+ // Write 2 more times to push it over 1MB
+ eventLogger.log(eventInfo);
+ eventLogger.log(eventInfo);
+
+ Thread.sleep(100);
+
+ // Now we expect events.log.1 to exist and events.log to be new
+ assertTrue(Files.exists(logFile1), "Rotated file events.log.1 should
exist");
+
+ // Write 12 more times to push over 1MB again
+ for (int i = 0; i < 12; i++) {
+ eventLogger.log(eventInfo);
+ }
+
+ Thread.sleep(100);
+
+ // Now events.log.2 and events.log.1 and events.log should exist
+ assertTrue(Files.exists(logFile2), "Rotated file events.log.2 should
exist");
+
+ // Write 12 MORE times to push over 1MB again
+ for (int i = 0; i < 12; i++) {
+ eventLogger.log(eventInfo);
+ }
+
+ Thread.sleep(100);
+
+ // max config was 2, so events.log.3 should NOT exist, and
events.log.2
+ // should exist.
+ Path logFile3 = expectedLogDir.resolve("events.log.3");
+ assertTrue(!Files.exists(logFile3), "Rotated file events.log.3 should
not exist");
+ assertTrue(Files.exists(logFile2), "Rotated file events.log.2 should
exist");
+ }
+}