This is an automated email from the ASF dual-hosted git repository. marat pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 1991a4fc385999b06b3d71e65596f68ee00588cd Author: Marat Gubaidullin <[email protected]> AuthorDate: Fri Feb 27 18:54:02 2026 -0500 Backend for 4.18.0 --- .../camel/karavan/api/GitCommitResource.java | 65 +++++ .../org/apache/camel/karavan/cache/CacheEvent.java | 8 + .../org/apache/camel/karavan/cache/CacheUtils.java | 25 ++ .../camel/karavan/cache/ProjectFolderCommited.java | 59 ++++ .../apache/camel/karavan/cache/SystemCommit.java | 100 +++++++ .../apache/camel/karavan/loader/CacheLoader.java | 61 ++++ .../org/apache/camel/karavan/loader/GitLoader.java | 118 ++++++++ .../apache/camel/karavan/loader/StartupLoader.java | 99 +++++++ .../apache/camel/karavan/model/CommitResult.java | 9 + .../camel/karavan/model/PathCommitDetails.java | 3 + .../karavan/persistence/AccessCacheEntity.java | 28 ++ .../karavan/persistence/PersistenceService.java | 108 +++++++ .../karavan/persistence/ProjectCacheEntity.java | 29 ++ .../karavan/persistence/SessionCacheEntity.java | 33 +++ .../karavan/scheduler/ActivityCleanupService.java | 35 +++ .../karavan/scheduler/SessionCleanupService.java | 23 ++ .../camel/karavan/service/GitHistoryService.java | 310 +++++++++++++++++++++ .../camel/karavan/service/LogStreamingService.java | 165 +++++++++++ 18 files changed, 1278 insertions(+) diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/GitCommitResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/GitCommitResource.java new file mode 100644 index 00000000..fe172e4c --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/GitCommitResource.java @@ -0,0 +1,65 @@ +package org.apache.camel.karavan.api; + +import io.quarkus.security.Authenticated; +import jakarta.inject.Inject; +import jakarta.ws.rs.*; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.apache.camel.karavan.cache.KaravanCache; +import org.apache.camel.karavan.cache.ProjectFolderCommit; +import org.apache.camel.karavan.cache.SystemCommit; +import org.apache.camel.karavan.service.GitHistoryService; +import org.jboss.logging.Logger; + +import java.util.List; + +@Path("/ui/git") +public class GitCommitResource { + + private static final Logger LOGGER = Logger.getLogger(GitCommitResource.class.getName()); + + @Inject + GitHistoryService gitHistoryService; + + @Inject + KaravanCache karavanCache; + + @GET + @Authenticated + @Produces(MediaType.APPLICATION_JSON) + @Path("/commits/{projectId}") + public List<ProjectFolderCommit> getProjectCommits(@PathParam("projectId") String projectId) { + try { + return karavanCache.getProjectLastCommits(projectId); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + return List.of(); + } + } + + @POST + @Authenticated + @Produces(MediaType.APPLICATION_JSON) + @Path("/commits/{projectId}") + public Response loadProjectCommits(@PathParam("projectId") String projectId) { + try { + gitHistoryService.importProjectCommits(projectId); + return Response.accepted().build(); + } catch (Exception e) { + return Response.serverError().entity(e.getMessage()).build(); + } + } + + @GET + @Authenticated + @Produces(MediaType.APPLICATION_JSON) + @Path("/system") + public List<SystemCommit> getSystemCommits() { + try { + return karavanCache.getSystemLastCommits(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + return List.of(); + } + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/cache/CacheEvent.java b/karavan-app/src/main/java/org/apache/camel/karavan/cache/CacheEvent.java new file mode 100644 index 00000000..c83710b5 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/cache/CacheEvent.java @@ -0,0 +1,8 @@ +package org.apache.camel.karavan.cache; + +public record CacheEvent(String key, Operation operation, Object data) { + public enum Operation { + SAVE, + DELETE + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/cache/CacheUtils.java b/karavan-app/src/main/java/org/apache/camel/karavan/cache/CacheUtils.java new file mode 100644 index 00000000..8ab2c999 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/cache/CacheUtils.java @@ -0,0 +1,25 @@ +package org.apache.camel.karavan.cache; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class CacheUtils { + + public static <T> List<T> query(Map<?, T> cache, Predicate<T> predicate, Function<T, T> copier) { + return cache.values().stream() + .filter(predicate) + .map(copier) + .collect(Collectors.toList()); + } + + public static <T> T queryFirst(Map<?, T> cache, Predicate<T> predicate, Function<T, T> copier) { + return cache.values().stream() + .filter(predicate) + .map(copier) + .findFirst() + .orElse(null); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/cache/ProjectFolderCommited.java b/karavan-app/src/main/java/org/apache/camel/karavan/cache/ProjectFolderCommited.java new file mode 100644 index 00000000..0490d3f9 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/cache/ProjectFolderCommited.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.karavan.cache; + +public class ProjectFolderCommited { + + String projectId; + String lastCommit; + Long lastCommitTimestamp = 0L; + + public ProjectFolderCommited(String projectId, String lastCommit, Long lastCommitTimestamp) { + this.projectId = projectId; + this.lastCommit = lastCommit; + this.lastCommitTimestamp = lastCommitTimestamp; + } + + public String getProjectId() { + return projectId; + } + + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + public String getLastCommit() { + return lastCommit; + } + + public void setLastCommit(String lastCommit) { + this.lastCommit = lastCommit; + } + + public Long getLastCommitTimestamp() { + return lastCommitTimestamp; + } + + public void setLastCommitTimestamp(Long lastCommitTimestamp) { + this.lastCommitTimestamp = lastCommitTimestamp; + } + + public ProjectFolderCommited copy() { + return new ProjectFolderCommited(projectId, lastCommit, lastCommitTimestamp); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/cache/SystemCommit.java b/karavan-app/src/main/java/org/apache/camel/karavan/cache/SystemCommit.java new file mode 100644 index 00000000..04c9bdf6 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/cache/SystemCommit.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.karavan.cache; + +import java.util.List; + +public class SystemCommit { + private String id; + private String authorName; + private String authorEmail; + private Long commitTime; + private String message; + private List<String> projectIds; + + public SystemCommit() { + } + + public SystemCommit(String id, String authorName, String authorEmail, Long commitTime, String message, List<String> projectIds) { + this.id = id; + this.authorName = authorName; + this.authorEmail = authorEmail; + this.commitTime = commitTime; + this.message = message; + this.projectIds = projectIds; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getAuthorName() { + return authorName; + } + + public void setAuthorName(String authorName) { + this.authorName = authorName; + } + + public String getAuthorEmail() { + return authorEmail; + } + + public void setAuthorEmail(String authorEmail) { + this.authorEmail = authorEmail; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Long getCommitTime() { + return commitTime; + } + + public void setCommitTime(Long commitTime) { + this.commitTime = commitTime; + } + + public List<String> getProjectIds() { + return projectIds; + } + + public void setProjectIds(List<String> projectIds) { + this.projectIds = projectIds; + } + + @Override + public String toString() { + return "SystemCommit{" + + "id='" + id + '\'' + + ", authorName='" + authorName + '\'' + + ", authorEmail='" + authorEmail + '\'' + + ", message='" + message + '\'' + + ", commitTime='" + commitTime + '\'' + + '}'; + } +} diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/loader/CacheLoader.java b/karavan-app/src/main/java/org/apache/camel/karavan/loader/CacheLoader.java new file mode 100644 index 00000000..da896d4b --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/loader/CacheLoader.java @@ -0,0 +1,61 @@ +package org.apache.camel.karavan.loader; + +import io.vertx.core.json.JsonObject; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.karavan.cache.*; +import org.apache.camel.karavan.persistence.AccessCacheEntity; +import org.apache.camel.karavan.persistence.PersistenceService; +import org.apache.camel.karavan.persistence.ProjectCacheEntity; +import org.jboss.logging.Logger; + +@ApplicationScoped +public class CacheLoader { + + private static final Logger LOGGER = Logger.getLogger(CacheLoader.class); + + @Inject + KaravanCache karavanCache; + @Inject + PersistenceService persistenceService; + + public void load() { + LOGGER.info("Starting Karavan Cache Hydration..."); + + var allEntities = persistenceService.getAll(ProjectCacheEntity.class); + + allEntities.stream() + .filter(entity -> ProjectFolder.class.getSimpleName().equals(entity.type)) + .forEach(entity -> { + JsonObject json = new JsonObject(entity.data); + var project = json.mapTo(ProjectFolder.class); + karavanCache.saveProject(project, false); + }); + + allEntities.stream() + .filter(entity -> ProjectFile.class.getSimpleName().equals(entity.type)) + .forEach(entity -> { + JsonObject json = new JsonObject(entity.data); + var file = json.mapTo(ProjectFile.class); + karavanCache.saveProjectFile(file, null, false); + }); + + persistenceService.getAll(AccessCacheEntity.class).forEach(entity -> { + JsonObject json = new JsonObject(entity.data); + if (AccessUser.class.getSimpleName().equals(entity.type)) { + karavanCache.saveUser(json.mapTo(AccessUser.class), false); + } else if (AccessRole.class.getSimpleName().equals(entity.type)) { + karavanCache.saveRole(json.mapTo(AccessRole.class), false); + } else if (AccessPassword.class.getSimpleName().equals(entity.type)) { + karavanCache.savePassword(json.mapTo(AccessPassword.class), false); + } + }); + + persistenceService.getActiveSessions().forEach(entity -> { + JsonObject json = new JsonObject(entity.data); + karavanCache.saveAccessSession(json.mapTo(AccessSession.class), false); + }); + + LOGGER.info("Karavan Cache Hydration complete."); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/loader/GitLoader.java b/karavan-app/src/main/java/org/apache/camel/karavan/loader/GitLoader.java new file mode 100644 index 00000000..0f334cfb --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/loader/GitLoader.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.karavan.loader; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.karavan.KaravanConstants; +import org.apache.camel.karavan.cache.KaravanCache; +import org.apache.camel.karavan.cache.ProjectFile; +import org.apache.camel.karavan.cache.ProjectFolder; +import org.apache.camel.karavan.service.CodeService; +import org.apache.camel.karavan.service.GitHistoryService; +import org.apache.camel.karavan.service.GitService; +import org.apache.camel.karavan.service.ProjectService; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; + +import java.time.Instant; +import java.util.Objects; +import java.util.regex.Pattern; + +import static org.apache.camel.karavan.KaravanConstants.DEV; + +@ApplicationScoped +public class GitLoader { + + private static final Logger LOGGER = Logger.getLogger(GitLoader.class.getName()); + + @ConfigProperty(name = "karavan.environment", defaultValue = KaravanConstants.DEV) + String environment; + + @Inject + ProjectService projectService; + + @Inject + KaravanCache karavanCache; + + @Inject + GitService gitService; + + @Inject + GitHistoryService gitHistoryService; + + @Inject + CodeService codeService; + + public void load() throws Exception { + boolean git = gitService.checkGit(); + LOGGER.info("Starting Project service: git is " + (git ? "ready" : "not ready")); + if (gitService.checkGit()) { + projectService.importProjects(false); + if (Objects.equals(environment, DEV)) { + addKameletsProject(); + addBuildInProject(ProjectFolder.Type.templates.name()); + addBuildInProject(ProjectFolder.Type.configuration.name()); + addBuildInProject(ProjectFolder.Type.documentation.name()); + addBuildInProject(ProjectFolder.Type.contracts.name()); + } + gitHistoryService.importCommits(); + } else { + LOGGER.info("Projects are not ready"); + throw new Exception("Projects are not ready"); + } + } + + void addKameletsProject() { + try { + ProjectFolder kamelets = karavanCache.getProject(ProjectFolder.Type.kamelets.name()); + if (kamelets == null) { + LOGGER.info("Add custom kamelets project"); + kamelets = new ProjectFolder(ProjectFolder.Type.kamelets.name(), "Custom Kamelets", Instant.now().getEpochSecond() * 1000L, ProjectFolder.Type.kamelets); + karavanCache.saveProject(kamelets, false); + } + } catch (Exception e) { + LOGGER.error("Error during custom kamelets project creation", e); + } + } + + public void addBuildInProject(String projectId) { + try { + ProjectFolder projectFolder = karavanCache.getProject(projectId); + if (projectFolder == null) { + var title = Pattern.compile("^.").matcher(projectId).replaceFirst(m -> m.group().toUpperCase()); + projectFolder = new ProjectFolder(projectId, title, Instant.now().getEpochSecond() * 1000L, ProjectFolder.Type.valueOf(projectId)); + karavanCache.saveProject(projectFolder, false); + + codeService.getBuildInProjectFiles(projectId).forEach((name, value) -> { + ProjectFile file = new ProjectFile(name, value, projectId, Instant.now().getEpochSecond() * 1000L); + karavanCache.saveProjectFile(file, null, false); + }); + } else { + codeService.getBuildInProjectFiles(projectId).forEach((name, value) -> { + ProjectFile f = karavanCache.getProjectFile(projectId, name); + if (f == null) { + ProjectFile file = new ProjectFile(name, value, projectId, Instant.now().getEpochSecond() * 1000L); + karavanCache.saveProjectFile(file, null, false); + } + }); + } + } catch (Exception e) { + LOGGER.error("Error during creation of project " + projectId, e); + } + } +} diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/loader/StartupLoader.java b/karavan-app/src/main/java/org/apache/camel/karavan/loader/StartupLoader.java new file mode 100644 index 00000000..8f823dfc --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/loader/StartupLoader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.karavan.loader; + +import io.quarkus.runtime.Quarkus; +import io.quarkus.runtime.StartupEvent; +import io.vertx.mutiny.core.eventbus.EventBus; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Default; +import jakarta.inject.Inject; +import org.apache.camel.karavan.KaravanConstants; +import org.apache.camel.karavan.docker.DockerService; +import org.apache.camel.karavan.service.AuthService; +import org.apache.camel.karavan.service.ConfigService; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.Readiness; +import org.jboss.logging.Logger; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.camel.karavan.KaravanEvents.NOTIFICATION_PROJECTS_STARTED; + +@Default +@Readiness +@ApplicationScoped +public class StartupLoader implements HealthCheck { + + private static final Logger LOGGER = Logger.getLogger(StartupLoader.class.getName()); + + @ConfigProperty(name = "karavan.environment", defaultValue = KaravanConstants.DEV) + String environment; + + @Inject + DockerService dockerService; + + @Inject + EventBus eventBus; + + @Inject + AuthService authService; + + @Inject + CacheLoader cacheLoader; + + @Inject + GitLoader gitLoader; + + private final AtomicBoolean ready = new AtomicBoolean(false); + + @Override + public HealthCheckResponse call() { + if (ready.get()) { + return HealthCheckResponse.named("Projects").up().build(); + } else { + return HealthCheckResponse.named("Projects").down().build(); + } + } + + void onStart(@Observes StartupEvent ev) throws Exception { + LOGGER.info("Starting " + ConfigService.getAppName() + " in " + environment + " env in " + (ConfigService.inKubernetes() ? "Kubernetes" : "Docker")); + if (!ConfigService.inKubernetes() && !dockerService.checkDocker()){ + Quarkus.asyncExit(); + } else { + createCaches(); + } + } + + void createCaches() { + try { + LOGGER.info("Loading projects ..."); + cacheLoader.load(); + gitLoader.load(); + LOGGER.info("Projects loaded"); + eventBus.publish(NOTIFICATION_PROJECTS_STARTED, null); + LOGGER.info("Creating defaults..."); + authService.loadDefaults(); + ready.set(true); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } +} diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/model/CommitResult.java b/karavan-app/src/main/java/org/apache/camel/karavan/model/CommitResult.java new file mode 100644 index 00000000..6f4e8a09 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/model/CommitResult.java @@ -0,0 +1,9 @@ +package org.apache.camel.karavan.model; + +import org.apache.camel.karavan.cache.ProjectFolder; +import org.eclipse.jgit.transport.RemoteRefUpdate; + +import java.util.List; + +public record CommitResult (ProjectFolder projectFolder, List<RemoteRefUpdate.Status> statuses, List<String> messages, String commitId, Long commitTime) {} + diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/model/PathCommitDetails.java b/karavan-app/src/main/java/org/apache/camel/karavan/model/PathCommitDetails.java new file mode 100644 index 00000000..e28b33c2 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/model/PathCommitDetails.java @@ -0,0 +1,3 @@ +package org.apache.camel.karavan.model; + +public record PathCommitDetails(String projectId, String fileName, String commitId, Long commitTime, String content, boolean isFolder) { } diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/persistence/AccessCacheEntity.java b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/AccessCacheEntity.java new file mode 100644 index 00000000..fb024732 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/AccessCacheEntity.java @@ -0,0 +1,28 @@ +package org.apache.camel.karavan.persistence; + +import jakarta.persistence.*; +import org.hibernate.annotations.UpdateTimestamp; + +import java.time.Instant; + +@Entity +@Table(name = AccessCacheEntity.TABLE_NAME, indexes = { + @Index(name = "idx_access_type", columnList = "type"), + @Index(name = "idx_access_last_update", columnList = "last_update") +}) +public class AccessCacheEntity { + + public static final String TABLE_NAME = "access_state"; + + @Id + public String key; // The GroupedKey + + public String type; // "UserFile", "RoleFolder", etc. + + @Column(columnDefinition = "jsonb") + public String data; // The actual object serialized to JSON + + @UpdateTimestamp + @Column(name = "last_update") + public Instant lastUpdate; +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/persistence/PersistenceService.java b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/PersistenceService.java new file mode 100644 index 00000000..4525d96d --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/PersistenceService.java @@ -0,0 +1,108 @@ +package org.apache.camel.karavan.persistence; + +import io.quarkus.vertx.ConsumeEvent; +import io.vertx.core.json.JsonObject; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.Query; +import jakarta.transaction.Transactional; +import org.apache.camel.karavan.cache.AccessSession; +import org.apache.camel.karavan.cache.CacheEvent; +import org.jboss.logging.Logger; + +import java.time.Instant; +import java.util.List; +import java.util.Objects; + +import static org.apache.camel.karavan.KaravanEvents.*; + +@ApplicationScoped +public class PersistenceService { + + private static final Logger LOGGER = Logger.getLogger(PersistenceService.class.getName()); + + @Inject + EntityManager entityManager; + + private static final String SAVE_SQL = + "INSERT INTO %s (key, type, data, last_update) " + + "VALUES (:key, :type, CAST(:data AS jsonb), :now) " + + "ON CONFLICT (key) DO UPDATE SET " + + "type = EXCLUDED.type, " + + "data = EXCLUDED.data, " + + "last_update = EXCLUDED.last_update"; + + private static final String SAVE_SESSION_SQL = + "INSERT INTO session_state (key, type, data, expiry, last_update) " + // Added expiry + "VALUES (:key, :type, CAST(:data AS jsonb), :expiry, :now) " + + "ON CONFLICT (key) DO UPDATE SET " + + "type = EXCLUDED.type, data = EXCLUDED.data, " + + "expiry = EXCLUDED.expiry, last_update = EXCLUDED.last_update"; + + private static final String DELETE_SQL = "DELETE FROM %s WHERE key = :key"; + + @ConsumeEvent(value = PERSIST_PROJECT, blocking = true, ordered = true) + @Transactional + public void handleProjectUpdate(CacheEvent event) { + execute(ProjectCacheEntity.TABLE_NAME, event); + } + + @ConsumeEvent(value = PERSIST_ACCESS, blocking = true, ordered = true) + @Transactional + public void handleAccessUpdate(CacheEvent event) { + execute(AccessCacheEntity.TABLE_NAME, event); + } + + @ConsumeEvent(value = PERSIST_SESSION, blocking = true, ordered = true) + @Transactional + public void handleSessionUpdate(CacheEvent event) { + execute(SessionCacheEntity.TABLE_NAME, event); + } + + private void execute(String tableName, CacheEvent event) { + try { + Query query; + if (event.operation().equals(CacheEvent.Operation.SAVE)) { + String json = Objects.requireNonNull(JsonObject.mapFrom(event.data())).encode(); + String entityName = event.data().getClass().getSimpleName(); + + String sql = tableName.equals(SessionCacheEntity.TABLE_NAME) ? SAVE_SESSION_SQL : String.format(SAVE_SQL, tableName); + + query = entityManager.createNativeQuery(String.format(sql, tableName)) + .setParameter("key", event.key()) + .setParameter("type", entityName) + .setParameter("data", json) + .setParameter("now", Instant.now()); + + if (event.data() instanceof AccessSession s) { + query.setParameter("expiry", s.getExpiredAt()); + } + } else { + query = entityManager.createNativeQuery(String.format(DELETE_SQL, tableName)) + .setParameter("key", event.key()); + } + query.executeUpdate(); + } catch (Exception e) { + LOGGER.errorf(e, "Failed to persist event %s in %s with key %s", event.operation(), tableName, event.key()); + } + } + + /** + * Fetches all records from a specific cache table as Entity objects. + */ + public <T> List<T> getAll(Class<T> entityClass) { + String jpql = "SELECT e FROM " + entityClass.getSimpleName() + " e"; + return entityManager.createQuery(jpql, entityClass).getResultList(); + } + + /** + * Fetches only active sessions. + */ + public List<SessionCacheEntity> getActiveSessions() { + String jpql = "SELECT e FROM SessionCacheEntity e WHERE e.expiry > :now"; + return entityManager.createQuery(jpql, SessionCacheEntity.class) + .setParameter("now", Instant.now()) + .getResultList(); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/persistence/ProjectCacheEntity.java b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/ProjectCacheEntity.java new file mode 100644 index 00000000..30b1d394 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/ProjectCacheEntity.java @@ -0,0 +1,29 @@ +package org.apache.camel.karavan.persistence; + +import jakarta.persistence.*; +import org.hibernate.annotations.UpdateTimestamp; + +import java.time.Instant; + +import static org.apache.camel.karavan.persistence.ProjectCacheEntity.TABLE_NAME; + +@Entity +@Table(name = TABLE_NAME, indexes = { + @Index(name = "idx_project_type", columnList = "type"), + @Index(name = "idx_project_last_update", columnList = "last_update") +}) +public class ProjectCacheEntity { + + public static final String TABLE_NAME = "project_state"; + + @Id + public String key; + public String type; + + @Column(columnDefinition = "jsonb") + public String data; + + @UpdateTimestamp + @Column(name = "last_update") + public Instant lastUpdate; +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/persistence/SessionCacheEntity.java b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/SessionCacheEntity.java new file mode 100644 index 00000000..adfd3ea8 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/persistence/SessionCacheEntity.java @@ -0,0 +1,33 @@ +package org.apache.camel.karavan.persistence; + +import jakarta.persistence.*; +import org.hibernate.annotations.UpdateTimestamp; + +import java.time.Instant; + +import static org.apache.camel.karavan.persistence.SessionCacheEntity.TABLE_NAME; + +@Entity +@Table(name = TABLE_NAME, indexes = { + @Index(name = "idx_session_type", columnList = "type"), + @Index(name = "idx_session_expiry", columnList = "expiry") // Add index for cleanup +}) +public class SessionCacheEntity { + + public static final String TABLE_NAME = "session_state"; + + @Id + public String key; + + public String type; + + @Column(columnDefinition = "jsonb") + public String data; + + @Column(name = "expiry") + public Instant expiry; // The exact time this session expires + + @UpdateTimestamp + @Column(name = "last_update") + public Instant lastUpdate; +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/ActivityCleanupService.java b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/ActivityCleanupService.java new file mode 100644 index 00000000..4ff1315b --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/ActivityCleanupService.java @@ -0,0 +1,35 @@ +package org.apache.camel.karavan.scheduler; + +import io.quarkus.scheduler.Scheduled; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.karavan.cache.KaravanCache; + +import java.time.Instant; + +@ApplicationScoped +public class ActivityCleanupService { + + private static final long THRESHOLD = 120000; + + @Inject + KaravanCache karavanCache; + + @Scheduled(every = "2m") // Run every 2 minutes + void deleteUsersActivities() { + + karavanCache.getCopyUsersWorking().entrySet().stream() + .filter(e -> { + var lastTimestamp = Instant.ofEpochMilli(e.getValue().getTimeStamp()).plusMillis(THRESHOLD); + return Instant.now().isAfter(lastTimestamp); + }) + .forEach(e -> karavanCache.deleteUserWorking(e.getKey())); + + karavanCache.getCopyUsersHeartBeat().entrySet().stream() + .filter(e -> { + var lastTimestamp = Instant.ofEpochMilli(e.getValue().getTimeStamp()).plusMillis(THRESHOLD); + return Instant.now().isAfter(lastTimestamp); + }) + .forEach(e -> karavanCache.deleteUserHeartBeat(e.getKey())); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/SessionCleanupService.java b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/SessionCleanupService.java new file mode 100644 index 00000000..04013d33 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/SessionCleanupService.java @@ -0,0 +1,23 @@ +package org.apache.camel.karavan.scheduler; + +import io.quarkus.scheduler.Scheduled; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.karavan.cache.KaravanCache; + +import java.time.Instant; + +@ApplicationScoped +public class SessionCleanupService { + + @Inject + KaravanCache karavanCache; + + @Scheduled(every = "10m") // Run every 10 minutes + void deleteExpiredSessions() { + + karavanCache.getAccessSessions().stream() + .filter(s -> Instant.now().isAfter(s.expiredAt)) + .forEach(session -> karavanCache.deleteAccessSession(session.sessionId)); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/GitHistoryService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/GitHistoryService.java new file mode 100644 index 00000000..66efd66a --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/GitHistoryService.java @@ -0,0 +1,310 @@ +package org.apache.camel.karavan.service; + +import io.vertx.core.Vertx; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.apache.camel.karavan.cache.KaravanCache; +import org.apache.camel.karavan.cache.ProjectFileCommitDiff; +import org.apache.camel.karavan.cache.ProjectFolderCommit; +import org.apache.camel.karavan.cache.SystemCommit; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.diff.DiffFormatter; +import org.eclipse.jgit.diff.RawTextComparator; +import org.eclipse.jgit.errors.MissingObjectException; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.AbstractTreeIterator; +import org.eclipse.jgit.treewalk.CanonicalTreeParser; +import org.eclipse.jgit.treewalk.EmptyTreeIterator; +import org.eclipse.jgit.util.io.DisabledOutputStream; +import org.eclipse.microprofile.context.ManagedExecutor; +import org.jboss.logging.Logger; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.StreamSupport; + +@Singleton +public class GitHistoryService { + + private static final Logger LOGGER = Logger.getLogger(GitHistoryService.class.getName()); + + @Inject + ManagedExecutor managedExecutor; + + @Inject + Vertx vertx; + + @Inject + GitService gitService; + + @Inject + KaravanCache karavanCache; + + public void importProjectCommits(String projectId) { + LOGGER.info("Import commits for " + projectId); + managedExecutor.runAsync(() -> { + var commits = getProjectCommits(projectId, 10); + karavanCache.saveProjectLastCommits(projectId, commits); + }); + } + + public void importCommits() { + LOGGER.info("Import commits for system"); + managedExecutor.runAsync(() -> { + var commits = getCommits( 100); + karavanCache.saveSystemCommits(commits); + }); + } + + public List<SystemCommit> getCommits(int maxCount) { + List<SystemCommit> result = new ArrayList<>(); + try { + Git pollGit = gitService.getGit(true, vertx.fileSystem().createTempDirectoryBlocking("commits")); + if (pollGit == null) return result; + + // Grab the repository reference to pass to our helper + Repository repository = pollGit.getRepository(); + + Iterable<RevCommit> commits = pollGit.log() + .setMaxCount(maxCount) + .all() + .call(); + + StreamSupport.stream(commits.spliterator(), false) + .sorted(Comparator.comparingInt(RevCommit::getCommitTime).reversed()) + .forEach(commit -> { + try { + // Get the folders modified in this specific commit + List<String> modifiedFolders = getChangedFoldersInCommit(repository, commit); + + SystemCommit systemCommit = new SystemCommit( + commit.getId().getName(), + commit.getAuthorIdent().getName(), + commit.getAuthorIdent().getEmailAddress(), + commit.getCommitTime() * 1000L, + commit.getShortMessage(), + modifiedFolders // <-- Injected here + ); + result.add(systemCommit); + } catch (Exception e) { + LOGGER.error("Error building diffs for commit " + commit.getId().getName(), e); + } + }); + + } catch (Exception e) { + LOGGER.error("Error", e); + } + + return result; + } + + private List<String> getChangedFoldersInCommit(Repository repository, RevCommit commit) throws IOException { + Set<String> changedFolders = new HashSet<>(); + + // DisabledOutputStream ensures we don't print the actual file diffs to the console + try (DiffFormatter diffFormatter = new DiffFormatter(DisabledOutputStream.INSTANCE); + RevWalk revWalk = new RevWalk(repository)) { + + diffFormatter.setRepository(repository); + diffFormatter.setDetectRenames(true); + + AbstractTreeIterator parentTreeParser; + + if (commit.getParentCount() > 0) { + // For standard/merge commits, compare against the first parent + RevCommit parent = revWalk.parseCommit(commit.getParent(0).getId()); + parentTreeParser = new CanonicalTreeParser(null, repository.newObjectReader(), parent.getTree().getId()); + } else { + // For the initial root commit, compare against an empty tree + parentTreeParser = new EmptyTreeIterator(); + } + + AbstractTreeIterator commitTreeParser = new CanonicalTreeParser(null, repository.newObjectReader(), commit.getTree().getId()); + + // Calculate the differences + List<DiffEntry> diffs = diffFormatter.scan(parentTreeParser, commitTreeParser); + + for (DiffEntry diff : diffs) { + // If the file was deleted, getNewPath() returns /dev/null, so we must use getOldPath() + String path = (diff.getChangeType() == DiffEntry.ChangeType.DELETE) + ? diff.getOldPath() + : diff.getNewPath(); + + // Extract the folder path from the file path + int lastSlash = path.lastIndexOf('/'); + + // If lastSlash is -1, the file is in the root directory. + // You can change "/" to "" depending on your preference. + String folder = (lastSlash == -1) ? "/" : path.substring(0, lastSlash); + + changedFolders.add(folder); // Set automatically deduplicates + } + } + + return new ArrayList<>(changedFolders); + } + + public List<ProjectFolderCommit> getProjectCommits(String projectId, int maxCount) { + List<ProjectFolderCommit> result = new ArrayList<>(); + try { + Git pollGit = gitService.getGit(true, vertx.fileSystem().createTempDirectoryBlocking("commits")); + if (pollGit == null) return result; + + Repository repo = pollGit.getRepository(); + + Iterable<RevCommit> commits = pollGit.log() + .setMaxCount(maxCount) + .all() + .addPath(projectId) + .call(); + StreamSupport.stream(commits.spliterator(), false) + .sorted(Comparator.comparingInt(RevCommit::getCommitTime).reversed()) + .forEach(commit -> { + try { + List<ProjectFileCommitDiff> diffs = buildDiffsWithBeforeAfter(repo, commit, projectId); + + ProjectFolderCommit projectCommit = new ProjectFolderCommit( + commit.getId().getName(), + projectId, + commit.getAuthorIdent().getName(), + commit.getAuthorIdent().getEmailAddress(), + commit.getCommitTime() * 1000L, + commit.getShortMessage(), + diffs + ); + + result.add(projectCommit); + } catch (Exception e) { + LOGGER.error("Error building diffs for commit " + commit.getId().getName(), e); + } + }); + + } catch (Exception e) { + LOGGER.error("Error", e); + } + + return result; + } + + private List<ProjectFileCommitDiff> buildDiffsWithBeforeAfter(Repository repo, RevCommit commit, String projectId) throws Exception { + List<ProjectFileCommitDiff> out = new ArrayList<>(); + + try (RevWalk revWalk = new RevWalk(repo)) { + revWalk.parseHeaders(commit); + + RevCommit parent = commit.getParentCount() > 0 ? revWalk.parseCommit(commit.getParent(0).getId()) : null; + + AbstractTreeIterator oldTreeIter = (parent == null) + ? new EmptyTreeIterator() + : treeIterator(repo, parent); + + AbstractTreeIterator newTreeIter = treeIterator(repo, commit); + + // 1) Collect DiffEntry list + List<DiffEntry> entries; + try (DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE)) { + df.setRepository(repo); + df.setDiffComparator(RawTextComparator.DEFAULT); + df.setDetectRenames(true); + + entries = df.scan(oldTreeIter, newTreeIter); + } + + // 2) For each entry, produce: diff text + before/after + for (DiffEntry entry : entries) { + // Optional: keep only diffs under the project path + if (!isUnderProjectPath(entry, projectId)) continue; + + String patchText = formatUnifiedDiff(repo, entry); + + String before = readBlobAsText(repo, entry.getOldId().toObjectId()); + String after = readBlobAsText(repo, entry.getNewId().toObjectId()); + + // For added/deleted files, one side will be /dev/null and ObjectId may be zero + if (isZeroId(entry.getOldId().toObjectId())) before = null; + if (isZeroId(entry.getNewId().toObjectId())) after = null; + + ProjectFileCommitDiff d = new ProjectFileCommitDiff(); + d.setChangeType(entry.getChangeType().name()); + d.setOldPath(entry.getOldPath()); + d.setNewPath(entry.getNewPath()); + d.setDiff(patchText); + d.setBefore(before); + d.setAfter(after); + + out.add(d); + } + } + + return out; + } + + private AbstractTreeIterator treeIterator(Repository repo, RevCommit commit) throws Exception { + try (RevWalk revWalk = new RevWalk(repo)) { + RevCommit parsed = revWalk.parseCommit(commit.getId()); + ObjectId treeId = parsed.getTree().getId(); + + CanonicalTreeParser parser = new CanonicalTreeParser(); + try (var reader = repo.newObjectReader()) { + parser.reset(reader, treeId); + } + return parser; + } + } + + private String formatUnifiedDiff(Repository repo, DiffEntry entry) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (DiffFormatter df = new DiffFormatter(baos)) { + df.setRepository(repo); + df.setDiffComparator(RawTextComparator.DEFAULT); + df.setDetectRenames(true); + df.format(entry); + } + + return baos.toString(StandardCharsets.UTF_8); + } + + private String readBlobAsText(Repository repo, ObjectId id) throws Exception { + if (id == null || isZeroId(id)) return null; + + try { + ObjectLoader loader = repo.open(id, Constants.OBJ_BLOB); + + // If you expect huge files, consider a size cap + byte[] bytes = loader.getBytes(); + + // If you need binary detection, add it here (e.g., scan for 0x00) + return new String(bytes, StandardCharsets.UTF_8); + } catch (MissingObjectException e) { + // Blob not available (should be rare unless repo state is odd) + return null; + } + } + + private boolean isZeroId(ObjectId id) { + return id == null || ObjectId.zeroId().equals(id); + } + + private boolean isUnderProjectPath(DiffEntry entry, String projectId) { + // projectId is used as a path prefix in addPath(projectId) + // but diff scan can still contain unrelated entries in some setups; this is a safe filter. + String p = projectId.endsWith("/") ? projectId : projectId + "/"; + String oldPath = entry.getOldPath() == null ? "" : entry.getOldPath(); + String newPath = entry.getNewPath() == null ? "" : entry.getNewPath(); + + // DiffEntry uses DiffEntry.DEV_NULL for added/deleted sides + if (DiffEntry.DEV_NULL.equals(oldPath)) oldPath = ""; + if (DiffEntry.DEV_NULL.equals(newPath)) newPath = ""; + + return oldPath.startsWith(p) || newPath.startsWith(p) || oldPath.equals(projectId) || newPath.equals(projectId); + } +} diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/LogStreamingService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/LogStreamingService.java new file mode 100644 index 00000000..21b71b44 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/LogStreamingService.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.karavan.service; + +import com.github.dockerjava.api.DockerClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.smallrye.mutiny.tuples.Tuple2; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.sse.Sse; +import jakarta.ws.rs.sse.SseEventSink; +import org.apache.camel.karavan.docker.DockerLogCallback; +import org.apache.camel.karavan.docker.DockerService; +import org.apache.camel.karavan.kubernetes.KubernetesService; +import org.jboss.logging.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; + +@ApplicationScoped +public class LogStreamingService { + + private static final Logger LOGGER = Logger.getLogger(LogStreamingService.class.getName()); + public static final String SERVICE_NAME = "LOG_WATCH"; + + private final ConcurrentHashMap<String, Tuple2<LogWatch, KubernetesClient>> logWatches = new ConcurrentHashMap<>(); + + @Inject + KubernetesService kubernetesService; + + @Inject + DockerService dockerService; + + public void streamLog(String type, String name, String username, SseEventSink eventSink, Sse sse) { + String oldName = Thread.currentThread().getName(); + Thread.currentThread().setName("log-watcher-" + name + "-" + username); + try { + if (ConfigService.inKubernetes()) { + streamKubernetesLogs(name, username, eventSink, sse); + } else { + streamDockerLogs(name, eventSink, sse); + } + } finally { + Thread.currentThread().setName(oldName); + } + } + + private void streamDockerLogs(String name, SseEventSink eventSink, Sse sse) { + LOGGER.info("LogWatch for " + name + " starting (Docker)"); + final DockerLogCallback[] callbackHolder = new DockerLogCallback[1]; + + callbackHolder[0] = new DockerLogCallback(line -> { + try { + if (eventSink.isClosed()) throw new IOException("Sink closed"); + // FIX: Strip trailing newline to match Kubernetes behavior + String cleanLine = line.endsWith("\n") + ? line.substring(0, line.length() - 1) + : line; + + eventSink.send(sse.newEvent(cleanLine)) + .toCompletableFuture() + .join(); + } catch (CompletionException | IOException e) { + LOGGER.info("Browser disconnected from " + name + ". Stopping Docker stream."); + try { + if (callbackHolder[0] != null) callbackHolder[0].close(); + } catch (IOException ex) { /* ignore */ } + } catch (Exception e) { + LOGGER.error("Error sending log event: " + e.getMessage()); + } + }); + + try { + DockerClient client = dockerService.getDockerClient(); + client.logContainerCmd(name) + .withStdOut(true).withStdErr(true).withTimestamps(false) + .withFollowStream(true).withTail(100) + .exec(callbackHolder[0]); + + callbackHolder[0].awaitCompletion(); + } catch (InterruptedException e) { + LOGGER.info("LogWatch interrupted for " + name); + } catch (Exception e) { + LOGGER.error("Docker Log Error for " + name + ": " + e.getMessage()); + } finally { + cleanupDockerResources(callbackHolder[0], eventSink, name); + } + } + + private void streamKubernetesLogs(String name, String username, SseEventSink eventSink, Sse sse) { + Tuple2<LogWatch, KubernetesClient> request = null; + try { + request = kubernetesService.getContainerLogWatch(name); + manageK8sSession(username, name, request); // Renamed for clarity + + LogWatch logWatch = request.getItem1(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput()))) { + String line; + while (!eventSink.isClosed() && (line = reader.readLine()) != null) { + try { + eventSink.send(sse.newEvent(line)) + .toCompletableFuture() + .join(); + } catch (Exception e) { + LOGGER.debug("Client disconnected during K8s log stream"); + break; // Stop reading immediately + } + } + } + } catch (Exception e) { + LOGGER.error("Kubernetes Log Error for " + name + ": " + e.getMessage()); + } finally { + cleanupK8sResources(request, eventSink, name); + } + } + + private void cleanupDockerResources(DockerLogCallback callback, SseEventSink sink, String name) { + try { + if (callback != null) callback.close(); + } catch (Exception e) { LOGGER.warn("Error closing callback", e); } + if (!sink.isClosed()) sink.close(); + LOGGER.info("LogWatch for " + name + " ended"); + } + + private void cleanupK8sResources(Tuple2<LogWatch, KubernetesClient> request, SseEventSink sink, String name) { + if (request != null) { + try { + if (request.getItem1() != null) request.getItem1().close(); + if (request.getItem2() != null) request.getItem2().close(); + } catch (Exception e) { LOGGER.warn("Error closing K8s resources", e); } + } + if (!sink.isClosed()) sink.close(); + LOGGER.info("LogWatch for " + name + " ended"); + } + + private void manageK8sSession(String username, String containerName, Tuple2<LogWatch, KubernetesClient> logWatch) { + String key = SERVICE_NAME + ":" + username + ":" + containerName; + if (logWatches.containsKey(key)) { + var old = logWatches.remove(key); + try { + if (old.getItem1() != null) old.getItem1().close(); + if (old.getItem2() != null) old.getItem2().close(); + } catch (Exception e) { /* ignore */ } + } + logWatches.put(key, logWatch); + } +} \ No newline at end of file
