melin commented on issue #524: URL: https://github.com/apache/spark-kubernetes-operator/issues/524#issuecomment-4427209326
目前产品中的实现,希望对有需要的人提供帮助 <img width="1191" height="187" alt="Image" src="https://github.com/user-attachments/assets/c72d71d2-ed89-4511-b50b-f4e304c6d861" /> ``` import static io.github.melin.jobserver.spark.submission.k8s.Constants.JOBSERVER_NAME; import static io.github.melin.jobserver.spark.submission.k8s.Constants.JOBSERVER_NAME_LABEL; import static io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_APP_ID_LABEL; import static io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_POD_DRIVER_ROLE; import static io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_POD_EXECUTOR_ROLE; import static io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_ROLE_LABEL; import static io.github.melin.jobserver.spark.support.leader.LeaderTypeEnum.COLLECT_FLINK_POD_LOG; import com.gitee.melin.bee.util.ThreadUtils; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import io.github.melin.jobserver.spark.common.entity.CloudRegionEntity; import io.github.melin.jobserver.spark.common.entity.JobServerEntity; import io.github.melin.jobserver.spark.common.model.ClusterId; import io.github.melin.jobserver.spark.common.model.EngineId; import io.github.melin.jobserver.spark.common.util.DateUtils; import io.github.melin.jobserver.spark.common.util.FsUtils; import io.github.melin.jobserver.spark.service.CloudRegionService; import io.github.melin.jobserver.spark.service.JobServerService; import io.github.melin.jobserver.spark.support.cluster.ClusterEngineManager; import io.github.melin.jobserver.spark.support.cluster.ClusterManager; import io.github.melin.jobserver.spark.support.leader.RedisLeaderElection; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class SparkPodLogController implements InitializingBean { private static final Logger LOG = LoggerFactory.getLogger(SparkPodLogController.class); @Autowired private ClusterManager clusterManager; @Autowired private ClusterEngineManager clusterEngineManager; @Autowired private CloudRegionService cloudRegionService; @Autowired private JobServerService jobServerService; @Autowired private RedisLeaderElection redisLeaderElection; private List<ClusterId> clusterIds = new ArrayList<>(); private final ScheduledExecutorService executorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("check-informer-register"); @Override public void afterPropertiesSet() throws Exception { redisLeaderElection.buildLeader(COLLECT_FLINK_POD_LOG); executorService.scheduleAtFixedRate( () -> { clusterManager.getK8sClientCache().forEach((clusterId, kubernetesClient) -> { try { if (redisLeaderElection.checkLeader(COLLECT_FLINK_POD_LOG)) { if (!clusterIds.contains(clusterId)) { registInformer(kubernetesClient, clusterId); } } else { kubernetesClient.informers().stopAllRegisteredInformers(); clusterIds.remove(clusterId); } } catch (Exception e) { LOG.error("❌ 注册 informer 失败: {}, error: {}", clusterId, e.getMessage()); } }); }, 10, 60, TimeUnit.SECONDS); } public synchronized void registInformer(KubernetesClient kubernetesClient, ClusterId clusterId) { if (redisLeaderElection.checkLeader(COLLECT_FLINK_POD_LOG)) { SharedInformerFactory factory = kubernetesClient.informers(); // 2. 创建监听器,过滤带有 spark-role=driver 标签的 Pod SharedIndexInformer<Pod> informer = factory.sharedIndexInformerFor(Pod.class, 30 * 1000L); CloudRegionEntity region = cloudRegionService.queryRegion(clusterId.getRegionCode()); String schema = region.getStorageType().getSchema(); String prefix = schema + region.getLogBucket(); informer.addEventHandler(new SparkResourceEventHandler(clusterId, kubernetesClient, prefix)); LOG.info("🚀 {} 控制器已启动,正在监控 Spark Pods...", clusterId); factory.startAllRegisteredInformers(); } else { kubernetesClient.informers().stopAllRegisteredInformers(); } } private class SparkResourceEventHandler implements ResourceEventHandler<Pod> { private ClusterId clusterId; private KubernetesClient kubernetesClient; private String prefix; public SparkResourceEventHandler(ClusterId clusterId, KubernetesClient kubernetesClient, String prefix) { this.clusterId = clusterId; this.kubernetesClient = kubernetesClient; this.prefix = prefix; } @Override public void onAdd(Pod pod) { /* 暂不处理 */ } @Override public void onUpdate(Pod oldPod, Pod newPod) { // 检查是否为 Spark Driver String role = newPod.getMetadata().getLabels().get(SPARK_ROLE_LABEL); if (!(SPARK_POD_DRIVER_ROLE.equals(role) || SPARK_POD_EXECUTOR_ROLE.equals(role))) { return; } String jobserverName = newPod.getMetadata().getLabels().get(JOBSERVER_NAME_LABEL); if (!JOBSERVER_NAME.equals(jobserverName)) { return; } String phase = newPod.getStatus().getPhase(); String oldPhase = oldPod.getStatus().getPhase(); // 只有当状态从非终态变为终态 (Succeeded/Failed) 时触发 if (!phase.equals(oldPhase) && ("Succeeded".equals(phase) || "Failed".equals(phase))) { handleFinishedPod(clusterId, kubernetesClient, newPod, role); } } @Override public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { String podName = pod.getMetadata().getName(); // 检查是否为 Spark Driver String role = pod.getMetadata().getLabels().get(SPARK_ROLE_LABEL); if (!(SPARK_POD_DRIVER_ROLE.equals(role) || SPARK_POD_EXECUTOR_ROLE.equals(role))) { return; } String jobserverName = pod.getMetadata().getLabels().get(JOBSERVER_NAME_LABEL); if (!JOBSERVER_NAME.equals(jobserverName)) { return; } LOG.info("Deleted Pod: {}, status: {}", podName, pod.getStatus().getPhase()); } private void handleFinishedPod(ClusterId clusterId, KubernetesClient kubernetesClient, Pod pod, String role) { String podName = pod.getMetadata().getName(); String namespace = pod.getMetadata().getNamespace(); String engineCode = pod.getMetadata().getLabels().get("engineCode"); EngineId engineId = EngineId.of(clusterId, engineCode); String applicationId = pod.getMetadata().getLabels().get(SPARK_APP_ID_LABEL); String podPhase = pod.getStatus().getPhase(); JobServerEntity jobServer = jobServerService.queryJobServerByAppId(applicationId); if (jobServer == null) { LOG.warn("engine: {}, pod name: {}, jobserver {} not exists", engineId, podName, applicationId); return; } try { // 3. 获取日志 (获取最后 1000 行防止数据过大) String logs = kubernetesClient .pods() .inNamespace(namespace) .withName(podName) .tailingLines(1000) .getLog(); String createdDate = DateUtils.formatDate(jobServer.getGmtCreated()); String path = prefix + "/spark_logs/" + createdDate + "/" + applicationId; if (SPARK_POD_DRIVER_ROLE.equals(role)) { clusterEngineManager.runSecured(engineId, (fileSystem) -> { String logPath = path + "/driver.log"; InputStream logInputStream = kubernetesClient .pods() .inNamespace(namespace) .withName(podName) .getLogInputStream(); FsUtils.uploadFile(fileSystem, logInputStream, logPath); LOG.info("Pod: {}, status: {}, log path: {}", podName, podPhase, logPath); return null; }); } else if (SPARK_POD_EXECUTOR_ROLE.equals(role)) { String executorId = StringUtils.substringAfter(podName, "exec-"); clusterEngineManager.runSecured(engineId, (fileSystem) -> { String logPath = path + "/executor/executor-" + executorId + ".log"; InputStream logInputStream = kubernetesClient .pods() .inNamespace(namespace) .withName(podName) .getLogInputStream(); FsUtils.uploadFile(fileSystem, logInputStream, logPath); LOG.info("Pod: {}, status: {}, log path: {}", podName, podPhase, logPath); return null; }); } } catch (Exception e) { LOG.error("❌ Pod: {} 处理日志失败: {}", podName, e.getMessage()); } } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
