This is an automated email from the ASF dual-hosted git repository. marat pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 7f195bd8875b46562b393053b5f8d591f8a60259 Author: Marat Gubaidullin <marat.gubaidul...@gmail.com> AuthorDate: Wed Jul 12 14:58:51 2023 -0400 DataGrid in bashi works #817 --- karavan-cloud/karavan-bashi/.java-version | 1 + .../camel/karavan/bashi/ConductorService.java | 58 ++++++++++---- .../apache/camel/karavan/bashi/KaravanBashi.java | 2 +- .../karavan/bashi/docker/DockerEventListener.java | 39 +++++---- .../camel/karavan/bashi/docker/DockerService.java | 92 ++++++++++++++++++---- .../src/main/resources/application.properties | 2 +- 6 files changed, 146 insertions(+), 48 deletions(-) diff --git a/karavan-cloud/karavan-bashi/.java-version b/karavan-cloud/karavan-bashi/.java-version new file mode 100644 index 00000000..b4de3947 --- /dev/null +++ b/karavan-cloud/karavan-bashi/.java-version @@ -0,0 +1 @@ +11 diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java index 71e2872b..7f4f0147 100644 --- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java +++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java @@ -1,13 +1,13 @@ package org.apache.camel.karavan.bashi; +import com.github.dockerjava.api.model.Container; import com.github.dockerjava.api.model.HealthCheck; +import com.github.dockerjava.api.model.Statistics; import io.quarkus.vertx.ConsumeEvent; import io.vertx.core.json.JsonObject; import org.apache.camel.karavan.bashi.docker.DockerService; import org.apache.camel.karavan.datagrid.DatagridService; -import org.apache.camel.karavan.datagrid.model.CommandName; -import org.apache.camel.karavan.datagrid.model.DevModeCommand; -import org.apache.camel.karavan.datagrid.model.Project; +import org.apache.camel.karavan.datagrid.model.*; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -46,6 +46,9 @@ public class ConductorService { @ConfigProperty(name = "infinispan.password") String infinispanPassword; + @ConfigProperty(name = "karavan.environment") + String environment; + @Inject DockerService dockerService; @@ -56,6 +59,7 @@ public class ConductorService { public static final String ADDRESS_INFINISPAN_START = "ADDRESS_INFINISPAN_START"; public static final String ADDRESS_INFINISPAN_HEALTH = "ADDRESS_DATAGRID_HEALTH"; + public static final String ADDRESS_CONTAINER_STATS = "ADDRESS_CONTAINER_STATS"; @ConsumeEvent(value = ADDRESS_INFINISPAN_START, blocking = true, ordered = true) void startInfinispan(String data) throws InterruptedException { @@ -73,11 +77,13 @@ public class ConductorService { } @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true) - void startDatagridService(String infinispanHealth){ - datagridService.start(); + void startServices(String infinispanHealth){ + if (infinispanHealth.equals("healthy")) { + datagridService.start(); + } } -// @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true) + @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true) void startKaravan(String infinispanHealth) throws InterruptedException { if (infinispanHealth.equals("healthy")) { LOGGER.info("Karavan is starting..."); @@ -98,20 +104,42 @@ public class ConductorService { @ConsumeEvent(value = DatagridService.ADDRESS_DEVMODE_COMMAND, blocking = true, ordered = true) void receiveCommand(JsonObject message) throws InterruptedException { - System.out.println("receiveCommand " + message); + LOGGER.info("DevMode Command: " + message); DevModeCommand command = message.mapTo(DevModeCommand.class); - String runnerName = command.getProjectId() + "-" + DEVMODE_SUFFIX; + String containerName = command.getProjectId() + "-" + DEVMODE_SUFFIX; + Project p = datagridService.getProject(command.getProjectId()); if (Objects.equals(command.getCommandName(), CommandName.RUN)) { - Project p = datagridService.getProject(command.getProjectId()); - LOGGER.infof("Runner starting for %s", p.getProjectId()); - dockerService.createContainer(runnerName, runnerImage, + LOGGER.infof("DevMode starting for %s", p.getProjectId()); + Container container = dockerService.createContainer(containerName, runnerImage, List.of(), "", false, new HealthCheck(), Map.of("type", "runner") ); - dockerService.startContainer(runnerName); - LOGGER.infof("Runner started for %s", p.getProjectId()); + dockerService.startContainer(containerName); + LOGGER.infof("DevMode started for %s", p.getProjectId()); + + // update DevModeStatus + DevModeStatus dms = datagridService.getDevModeStatus(p.getProjectId()); + dms.setContainerName(containerName); + dms.setContainerId(container.getId()); + datagridService.saveDevModeStatus(dms); } else if (Objects.equals(command.getCommandName(), CommandName.DELETE)){ - dockerService.stopContainer(runnerName); - dockerService.deleteContainer(runnerName); + dockerService.stopContainer(containerName); + dockerService.deleteContainer(containerName); + datagridService.deleteDevModeStatus(p.getName()); + } + } + + @ConsumeEvent(value = ADDRESS_CONTAINER_STATS, blocking = true, ordered = true) + public void saveStats(JsonObject data) { + String projectId = data.getString("projectId"); + String memory = data.getString("memory"); + String cpu = data.getString("cpu"); + if (datagridService.isReady()) { + PodStatus podStatus = datagridService.getDevModePodStatuses(projectId, environment); + if (podStatus != null) { + podStatus.setCpuInfo(cpu); + podStatus.setMemoryInfo(memory); + datagridService.savePodStatus(podStatus); + } } } } \ No newline at end of file diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java index 7b3ed5d7..f0a8a630 100644 --- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java +++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java @@ -23,8 +23,8 @@ public class KaravanBashi { void onStart(@Observes StartupEvent ev) throws InterruptedException { LOGGER.info("Karavan Bashi is starting..."); - dockerService.checkContainersStatus(); dockerService.createNetwork(); + dockerService.checkDataGridHealth(); dockerService.startListeners(); eventBus.publish(ConductorService.ADDRESS_INFINISPAN_START, ""); } diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java index 64acd143..be068895 100644 --- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java +++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java @@ -8,6 +8,7 @@ import io.vertx.core.eventbus.EventBus; import org.apache.camel.karavan.bashi.ConductorService; import org.apache.camel.karavan.bashi.Constants; import org.apache.camel.karavan.datagrid.DatagridService; +import org.apache.camel.karavan.datagrid.model.PodStatus; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -15,6 +16,7 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import java.io.Closeable; import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import java.util.Objects; @@ -43,22 +45,31 @@ public class DockerEventListener implements ResultCallback<Event> { @Override public void onNext(Event event) { // LOGGER.info(event.getType() + " : " + event.getStatus()); - if (Objects.equals(event.getType(), EventType.CONTAINER)) { - Container container = dockerService.getContainer(event.getId()); - String status = event.getStatus(); - if (container.getNames()[0].equals("/infinispan") && status.startsWith("health_status:")) { - String health = status.replace("health_status: ", ""); - LOGGER.infof("Container %s health status: %s", container.getNames()[0], health); - eventBus.publish(ConductorService.ADDRESS_INFINISPAN_HEALTH, health); - } else if (container.getNames()[0].endsWith(Constants.DEVMODE_SUFFIX)) { - if (Arrays.asList("stop", "die", "kill", "pause", "destroy").contains(event.getStatus())) { - String name = container.getNames()[0].replace("/", ""); - String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, ""); - datagridService.deletePodStatus(projectId, environment, name); - } else if (Arrays.asList("start", "unpause").contains(event.getStatus())) { - + try { + if (Objects.equals(event.getType(), EventType.CONTAINER)) { + Container container = dockerService.getContainer(event.getId()); + String status = event.getStatus(); + if (container.getNames()[0].equals("/infinispan") && status.startsWith("health_status:")) { + String health = status.replace("health_status: ", ""); + LOGGER.infof("Container %s health status: %s", container.getNames()[0], health); + eventBus.publish(ConductorService.ADDRESS_INFINISPAN_HEALTH, health); + } else if (container.getNames()[0].endsWith(Constants.DEVMODE_SUFFIX)) { + if (Arrays.asList("stop", "die", "kill", "pause", "destroy").contains(event.getStatus())) { + String name = container.getNames()[0].replace("/", ""); + String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, ""); + LOGGER.info("Deleted PodStatus for " + projectId); + datagridService.deletePodStatus(projectId, environment, name); + } else if (Arrays.asList("start", "unpause").contains(event.getStatus())) { + String name = container.getNames()[0].replace("/", ""); + String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, ""); + PodStatus ps = new PodStatus(name, true, null, projectId, environment, true, Instant.ofEpochSecond(container.getCreated()).toString()); + LOGGER.info("Saved PodStatus for " + projectId); + datagridService.savePodStatus(ps); + } } } + } catch (Exception exception) { + LOGGER.error(exception.getMessage()); } } diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java index 44fa6824..80558b5f 100644 --- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java +++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java @@ -12,25 +12,34 @@ import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.core.InvocationBuilder; import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; import com.github.dockerjava.transport.DockerHttpClient; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.mutiny.tuples.Tuple2; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.json.JsonObject; +import org.apache.camel.karavan.bashi.Constants; import org.jboss.logging.Logger; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.text.DecimalFormat; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.camel.karavan.bashi.ConductorService.ADDRESS_CONTAINER_STATS; import static org.apache.camel.karavan.bashi.ConductorService.ADDRESS_INFINISPAN_HEALTH; +import static org.apache.camel.karavan.bashi.Constants.DATAGRID_CONTAINER_NAME; import static org.apache.camel.karavan.bashi.Constants.NETWORK_NAME; @ApplicationScoped public class DockerService { private static final Logger LOGGER = Logger.getLogger(DockerService.class.getName()); + private static final DecimalFormat formatCpu = new DecimalFormat("0.00"); + private static final DecimalFormat formatMiB = new DecimalFormat("0.0"); + private static final DecimalFormat formatGiB = new DecimalFormat("0.00"); + private static final Map<String, Tuple2<Long, Long>> previousStats = new ConcurrentHashMap<>(); @Inject DockerEventListener dockerEventListener; @@ -38,6 +47,57 @@ public class DockerService { @Inject EventBus eventBus; + @Scheduled(every = "{karavan.container-stats-interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + void collectContainersStats() { + System.out.println("collectContainersStats "); + getDockerClient().listContainersCmd().exec().forEach(container -> { + Statistics stats = getContainerStats(container.getId()); + + String name = container.getNames()[0].replace("/", ""); + String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, ""); + String memoryUsage = formatMemory(stats.getMemoryStats().getUsage()); + String memoryLimit = formatMemory(stats.getMemoryStats().getLimit()); + JsonObject data = JsonObject.of( + "projectId", projectId, + "memory", memoryUsage + " / " + memoryLimit, + "cpu", formatCpu(name, stats) + ); + eventBus.publish(ADDRESS_CONTAINER_STATS, data); + }); + } + + private String formatMemory(Long memory) { + if (memory < (1073741824)) { + return formatMiB.format(memory.doubleValue() / 1048576) + "MiB"; + } else { + return formatGiB.format(memory.doubleValue() / 1073741824) + "GiB"; + } + } + + private String formatCpu(String containerName, Statistics stats) { + double cpuUsage = 0; + long previousCpu = previousStats.containsKey(containerName) ? previousStats.get(containerName).getItem1() : -1; + long previousSystem = previousStats.containsKey(containerName) ? previousStats.get(containerName).getItem2() : -1; + + CpuStatsConfig cpuStats = stats.getCpuStats(); + if (cpuStats != null) { + CpuUsageConfig cpuUsageConfig = cpuStats.getCpuUsage(); + long systemUsage = cpuStats.getSystemCpuUsage(); + long totalUsage = cpuUsageConfig.getTotalUsage(); + + if (previousCpu != -1 && previousSystem != -1) { + float cpuDelta = totalUsage - previousCpu; + float systemDelta = systemUsage - previousSystem; + + if (cpuDelta > 0 && systemDelta > 0) { + cpuUsage = cpuDelta / systemDelta * cpuStats.getOnlineCpus() * 100; + } + } + previousStats.put(containerName, Tuple2.of(totalUsage, systemUsage)); + } + return formatCpu.format(cpuUsage) + "%"; + } + public void startListeners() { getDockerClient().eventsCmd().exec(dockerEventListener); } @@ -53,12 +113,12 @@ public class DockerService { } } - public void checkContainersStatus() { - getDockerClient().listContainersCmd().withShowAll(true).exec().stream() + public void checkDataGridHealth() { + getDockerClient().listContainersCmd().exec().stream() .filter(c -> c.getState().equals("running")) .forEach(c -> { HealthState hs = getDockerClient().inspectContainerCmd(c.getId()).exec().getState().getHealth(); - if (c.getNames()[0].equals("/infinispan")) { + if (c.getNames()[0].equals("/" + DATAGRID_CONTAINER_NAME)) { eventBus.publish(ADDRESS_INFINISPAN_HEALTH, hs.getStatus()); } }); @@ -69,14 +129,9 @@ public class DockerService { return containers.get(0); } - public List<Container> getRunnerContainer() { - return getDockerClient().listContainersCmd() - .withShowAll(true).withLabelFilter(Map.of("type", "runner")).exec(); - } - - public Statistics getContainerStats(String id) { + public Statistics getContainerStats(String containerId) { InvocationBuilder.AsyncResultCallback<Statistics> callback = new InvocationBuilder.AsyncResultCallback<>(); - getDockerClient().statsCmd(id).withContainerId(id).exec(callback); + getDockerClient().statsCmd(containerId).withContainerId(containerId).withNoStream(true).exec(callback); Statistics stats = null; try { stats = callback.awaitResult(); @@ -87,7 +142,7 @@ public class DockerService { return stats; } - public void createContainer(String name, String image, List<String> env, String ports, + public Container createContainer(String name, String image, List<String> env, String ports, boolean exposedPort, HealthCheck healthCheck, Map<String, String> labels) throws InterruptedException { List<Container> containers = getDockerClient().listContainersCmd().withShowAll(true).withNameFilter(List.of(name)).exec(); if (containers.size() == 0) { @@ -95,7 +150,7 @@ public class DockerService { List<ExposedPort> exposedPorts = getPortsFromString(ports).values().stream().map(i -> ExposedPort.tcp(i)).collect(Collectors.toList()); - CreateContainerResponse container = getDockerClient().createContainerCmd(image) + CreateContainerResponse response = getDockerClient().createContainerCmd(image) .withName(name) .withLabels(labels) .withEnv(env) @@ -104,9 +159,12 @@ public class DockerService { .withHostConfig(getHostConfig(ports, exposedPort)) .withHealthcheck(healthCheck) .exec(); - LOGGER.info("Container created: " + container.getId()); + LOGGER.info("Container created: " + response.getId()); + return getDockerClient().listContainersCmd().withShowAll(true) + .withIdFilter(Collections.singleton(response.getId())).exec().get(0); } else { LOGGER.info("Container already exists: " + containers.get(0).getId()); + return containers.get(0); } } diff --git a/karavan-cloud/karavan-bashi/src/main/resources/application.properties b/karavan-cloud/karavan-bashi/src/main/resources/application.properties index 1ed3add7..534ad21d 100644 --- a/karavan-cloud/karavan-bashi/src/main/resources/application.properties +++ b/karavan-cloud/karavan-bashi/src/main/resources/application.properties @@ -11,7 +11,7 @@ karavan.port=8080:8080 karavan.environment=dev karavan.default-runtime=quarkus karavan.runtimes=quarkus,spring-boot -karavan.devmode-status-interval=2s +karavan.container-stats-interval=5s # Git repository Configuration karavan.git-repository=${GIT_REPOSITORY}