This is an automated email from the ASF dual-hosted git repository. glauesppen pushed a commit to branch revert-311-MonitoringHackIT in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit e85aa5710f3dd1cc84763ecb9d8edec559eff310 Author: Glaucia Esppenchutz <[email protected]> AuthorDate: Mon Jun 19 22:13:03 2023 +0100 Revert "Updates for monitoring the runtime metrics of underlying platform (Spark)" --- .../main/java/org/apache/wayang/core/api/Job.java | 22 +- .../org/apache/wayang/core/api/WayangContext.java | 29 -- .../spark/monitoring/interfaces/Application.java | 99 ---- .../spark/monitoring/interfaces/Executor.java | 140 ------ .../wayang/spark/monitoring/interfaces/Job.java | 102 ---- .../monitoring/interfaces/SerializableObject.java | 39 -- .../wayang/spark/monitoring/interfaces/Stage.java | 177 ------- .../wayang/spark/monitoring/interfaces/Task.java | 253 ---------- .../spark/monitoring/interfaces/TaskMetric.java | 175 ------- .../spark/monitoring/metrics/ApplicationEnd.java | 103 ---- .../spark/monitoring/metrics/ApplicationStart.java | 96 ---- .../spark/monitoring/metrics/ExecutorAdded.java | 131 ------ .../spark/monitoring/metrics/ExecutorRemoved.java | 128 ----- .../spark/monitoring/metrics/ExecutorUpdated.java | 138 ------ .../wayang/spark/monitoring/metrics/JobEnd.java | 89 ---- .../wayang/spark/monitoring/metrics/JobStart.java | 88 ---- .../spark/monitoring/metrics/SparkListener.java | 524 --------------------- .../spark/monitoring/metrics/StageCompleted.java | 182 ------- .../monitoring/metrics/StageExecutorMetrics.java | 165 ------- .../spark/monitoring/metrics/StageSubmitted.java | 163 ------- .../wayang/spark/monitoring/metrics/TaskEnd.java | 220 --------- .../monitoring/metrics/TaskGettingResult.java | 218 --------- .../spark/monitoring/metrics/TaskMetric.java | 158 ------- .../wayang/spark/monitoring/metrics/TaskStart.java | 217 --------- .../wayang/spark/platform/SparkPlatform.java | 31 +- .../main/resources/wayang-spark-kafka.properties | 42 -- wayang-platforms/wayang-spark/pom.xml | 47 +- 27 files changed, 44 insertions(+), 3732 deletions(-) diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java index ce20b2df..97454b3a 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java @@ -109,7 +109,6 @@ public class Job extends OneTimeExecutable { */ private final WayangPlan wayangPlan; - /** * {@link OptimizationContext} for the {@link #wayangPlan}. */ @@ -162,12 +161,9 @@ public class Job extends OneTimeExecutable { private Monitor monitor; - - /** * Name for this instance. */ - private boolean montiorWithHackIT; private final String name; /** @@ -203,8 +199,7 @@ public class Job extends OneTimeExecutable { for (String udfJar : udfJars) { this.addUdfJar(udfJar); } - // set HackIT debugger enable or disable - this.setMontiorWithHackIT(wayangContext.isWithHackITMonitioring()); + // Prepare re-optimization. if (this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize")) { this.cardinalityBreakpoint = new CardinalityBreakpoint(this.configuration); @@ -617,21 +612,6 @@ public class Job extends OneTimeExecutable { return true; } - /** - * getter method - * @return boolean either enable or disable HACKIT for WAYANG Job - */ - public boolean isMontiorWithHackIT() { - return montiorWithHackIT; - } - - /** - * setter method - * @param montiorWithHackIT boolean - */ - public void setMontiorWithHackIT(boolean montiorWithHackIT) { - this.montiorWithHackIT = montiorWithHackIT; - } /** * Enumerate possible execution plans from the given {@link WayangPlan} and determine the (seemingly) best one. */ diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java index 4d660b78..f40f5369 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java @@ -45,11 +45,6 @@ public class WayangContext { private final Configuration configuration; - - /** - * For enabling montioring for debugging using HACKIT! - */ - private boolean withHackITMonitioring; public WayangContext() { this(new Configuration()); } @@ -69,14 +64,6 @@ public class WayangContext { return this; } - /** - * enable the monitoring for platform tasks - * @return WayangContext - */ - public WayangContext withMontioringForHackIT(){ - this.setWithHackITMonitioring(true); - return this; - } /** * Registers the given {@link Plugin} with this instance. * @@ -196,20 +183,4 @@ public class WayangContext { } return this.cardinalityRepository; } - - /** - * getter method - * @return boolean that depicts HACKIT enable - */ - public boolean isWithHackITMonitioring() { - return withHackITMonitioring; - } - - /** - * setter method - * @param withHackITMonitioring is a boolean for HACKIT - */ - public void setWithHackITMonitioring(boolean withHackITMonitioring) { - this.withHackITMonitioring = withHackITMonitioring; - } } diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Application.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Application.java deleted file mode 100644 index 2d7277b5..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Application.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; -import java.io.Serializable; -import java.util.List; -/** - * This interface represents an application metrics in a Spark cluster. - * The implementing classes must be serializable. - */ -public interface Application extends Serializable { - /** - * Sets the name of the event associated with the application. - * - * @param name the name of the event - */ - void setEventame(String name); - /** - * Returns the name of the event associated with the application. - * - * @return the name of the event - */ - String getEventName(); - /** - * Sets the name of the application. - * - * @param name the name of the application - */ - void setName(String name); - /** - * Returns the name of the application. - * - * @return the name of the application - */ - String getName(); - /** - * Sets the start time of the application. - * - * @param time the start time of the application - */ - void setStartTime(long time); - /** - * Returns the start time of the application. - * - * @return the start time of the application - */ - long getTime(); - /** - * Sets the ID of the application. - * - * @param id the ID of the application - */ - void setAppID(String id); - /** - * Returns the ID of the application. - * - * @return the ID of the application - */ - String getAppID(); - /** - * Sets the user associated with the Spark application. - * - * @param user the user associated with the Spark application - */ - void setSparkUser(String user); - /** - * Returns the user associated with the Spark application. - * - * @return the user associated with the Spark application - */ - String getSparkUser(); - /** - * Sets the list of jobs associated with the application. - * - * @param listOfJobs the list of jobs associated with the application - */ - void setListOfJobs(List<Job> listOfJobs); - /** - * Returns the list of jobs associated with the application. - * - * @return the list of jobs associated with the application - */ - List<Job> getListOfjobs(); -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Executor.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Executor.java deleted file mode 100644 index f748afba..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Executor.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; - -import java.io.Serializable; -/** - * The Executor interface represents an executor in a Spark cluster. It defines methods for setting and getting various attributes of an executor. - * These attributes include the name of the event, the ID of the stage the executor is running, the ID of the executor itself, - * the attempt ID of the stage, the time at which the executor started, the host where the executor is running, - * the total number of cores available to the executor, the resource information of the executor, - * and the reason for which the executor was removed from the cluster. - */ - -public interface Executor extends Serializable { - /** - * Sets the name of the event associated with this executor. - * @param name The name of the event. - */ - void setEventame(String name); - - /** - * Returns the name of the event associated with this executor. - * @return The name of the event. - */ - String getEventName(); - - /** - * Sets the ID of the stage the executor is running. - * @param id The ID of the stage. - */ - void setStageID(int id); - - /** - * Returns the ID of the stage the executor is running. - * @return The ID of the stage. - */ - int getStageID(); - - /** - * Sets the ID of this executor. - * @param id The ID of the executor. - */ - void setExecutorID(String id); - - /** - * Returns the ID of this executor. - * @return The ID of the executor. - */ - String getExecutorID(); - - /** - * Sets the attempt ID of the stage. - * @param id The attempt ID of the stage. - */ - void stageAttempt(int id); - - /** - * Returns the attempt ID of the stage. - * @return The attempt ID of the stage. - */ - int getStageAttempt(); - - /** - * Sets the time at which this executor started. - * @param Time The start time of the executor. - */ - void executorTime(long Time); - - /** - * Returns the time at which this executor started. - * @return The start time of the executor. - */ - long getExecutorTime(); - - /** - * Sets the host where this executor is running. - * @param host The host where the executor is running. - */ - void setExecutorHost(String host); - - /** - * Returns the host where this executor is running. - * @return The host where the executor is running. - */ - String getExecutorHost(); - - /** - * Sets the total number of cores available to this executor. - * @param cores The total number of cores. - */ - void setTotalCores(int cores); - - /** - * Returns the total number of cores available to this executor. - * @return The total number of cores. - */ - int getTotalCores(); - - /** - * Sets the resource information of this executor. - * @param resourceInfoId The resource information of the executor. - */ - void setResourceInfo(int resourceInfoId); - - /** - * Returns the resource information of this executor. - * @return The resource information of the executor. - */ - int getResourceInfo(); - - /** - * Sets the reason for which this executor was removed from the cluster. - * @param reasonOfRemoval The reason for removal. - */ - void setReasonOfRemoval(String reasonOfRemoval); - - /** - * Returns the reason for which this executor was removed from the cluster. - * @return The reason for removal. - */ - String getReasonOfRemoval(); - - -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Job.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Job.java deleted file mode 100644 index 9ccebf1f..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Job.java +++ /dev/null @@ -1,102 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; - -import scala.collection.Seq; - -import java.io.Serializable; -import java.util.List; -/** - * The Job interface represents a job to be executed in a distributed system. - * A job comprises one or more stages, and contains metadata about the job - * such as its ID, product arity, and event name. - */ -public interface Job extends Serializable { - - /** - * Sets the name of the event associated with this job. - * - * @param name the name of the event - */ - void setEventame(String name); - - /** - * Returns the name of the event associated with this job. - * - * @return the name of the event - */ - String getEventName(); - - /** - * Sets the unique identifier for this job. - * - * @param jobID the unique identifier for this job - */ - void setJobID(int jobID); - - /** - * Returns the unique identifier for this job. - * - * @return the unique identifier for this job - */ - int getJobID(); - - /** - * Sets the number of output products produced by this job. - * - * @param productArity the number of output products produced by this job - */ - void setProductArity(int productArity); - - /** - * Returns the number of output products produced by this job. - * - * @return the number of output products produced by this job - */ - int getProductArity(); - - /** - * Sets the stage ID associated with this job. - * - * @param stageId the stage ID associated with this job - */ - void setStageID(Seq<Object> stageId); - - /** - * Returns the stage ID associated with this job. - * - * @return the stage ID associated with this job - */ - Seq<Object> getStageID(); - - /** - * Sets the list of stages comprising this job. - * - * @param listOfStages the list of stages comprising this job - */ - void setListOfStages(List<Stage> listOfStages); - - /** - * Returns the list of stages comprising this job. - * - * @return the list of stages comprising this job - */ - List<Stage> getListOfStages(); -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/SerializableObject.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/SerializableObject.java deleted file mode 100644 index 2ef8bcbb..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/SerializableObject.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; - -import java.io.Serializable; -/** - * The {@code SerializableObject} interface is a marker interface that indicates - * that its implementing classes are serializable. By extending the {@link Serializable} - * interface, classes that implement this interface can be serialized and deserialized - * to and from a stream of bytes. - * <p> - * It is recommended that classes implementing this interface provide a serialVersionUID - * field to ensure compatibility between serialized objects of different versions. - * </p> - * <p> - * This interface does not define any methods, but instead serves as a tag to indicate - * that implementing classes can be serialized. - * </p> - * - * @see Serializable - */ -public interface SerializableObject extends Serializable { -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Stage.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Stage.java deleted file mode 100644 index 02708194..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Stage.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; - -import org.apache.wayang.spark.monitoring.metrics.TaskMetric; - -import java.io.Serializable; -import java.util.List; -/** - * This interface represents a stage in a data processing pipeline. - * A stage is a set of tasks that are executed together as part of a larger computation. - */ -public interface Stage extends Serializable { - - /** - * Sets the name of the event associated with this stage. - * @param name the name of the event - */ - void setEventame(String name); - - /** - * Gets the name of the event associated with this stage. - * @return the name of the event - */ - String getEventName(); - - /** - * Sets the ID of this stage. - * @param ID the ID of the stage - */ - void setID(int ID); - - /** - * Gets the ID of this stage. - * @return the ID of the stage - */ - int getID(); - - /** - * Sets the number of tasks associated with this stage. - * @param tasks the number of tasks - */ - void setNumberOfTasks(int tasks); - - /** - * Gets the number of tasks associated with this stage. - * @return the number of tasks - */ - int getNumberOfTasks(); - - /** - * Sets the name of this stage. - * @param name the name of the stage - */ - void setStageName(String name); - - /** - * Gets the name of this stage. - * @return the name of the stage - */ - String getStageName(); - - /** - * Sets the status of this stage. - * @param status the status of the stage - */ - void setStatus(String status); - - /** - * Gets the status of this stage. - * @return the status of the stage - */ - String getStatus(); - - /** - * Sets the details of this stage. - * @param details the details of the stage - */ - void setDetails(String details); - - /** - * Gets the details of this stage. - * @return the details of the stage - */ - String getDetails(); - - /** - * Sets the submission time of this stage. - * @param time the submission time - */ - void setSubmissionTime(long time); - - /** - * Gets the submission time of this stage. - * @return the submission time - */ - long getSubmissionTime(); - - /** - * Sets the completion time of this stage. - * @param time the completion time - */ - void setCompletionTime(long time); - - /** - * Gets the completion time of this stage. - * @return the completion time - */ - long getCompletionTime(); - - /** - * Gets the task metric associated with this stage. - * @return the task metric - */ - TaskMetric getTaskMetric(); - - /** - * Sets the task metric associated with this stage. - * @param taskMetric the task metric - */ - void setTaskMetric(TaskMetric taskMetric); - - /** - * Sets the ID of the executor associated with this stage. - * @param ID the executor ID - */ - void setExecutorID(String ID); - - /** - * Gets the ID of the executor associated with this stage. - * @return the executor ID - */ - String getExecutorID(); - - /** - * Sets the ID of the stage attempt. - * @param id the stage attempt ID - */ - void setStageAttemptId(int id); - - /** - * Gets the ID of the stage attempt. - * @return the stage attempt ID - */ - int getStageAttemptId(); - /** - * Sets the list of tasks to be performed. - * - * @param tasks a List of Task objects representing the tasks to be performed - */ - void setListOfTasks(List<Task> tasks); - /** - * Retrieves the list of tasks to be performed. - * - * @return a List of Task objects representing the tasks to be performed - */ - List<Task> getListOfTasks(); - - - -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Task.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Task.java deleted file mode 100644 index 6aa3adbd..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Task.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; -/** - * The Task interface represents a task in a distributed computing or data processing system. - * - * <p>This interface extends the SerializableObject interface and defines the methods that should - * be implemented by a task in order to properly function in such a system. - * - * <p>The Task interface provides methods for setting and getting various properties of a task, - * including its ID, status, and execution metrics. - * - * <p>The Task interface also includes an enum for representing the status of a task while it is running. - * - * @author [Adeel Aslam] - */ -public interface Task extends SerializableObject{ - /** - * The TaskStatusForRunning enum represents the possible statuses of a task while it is running. - * - * <p>Each enum value corresponds to a specific status: FAILED, SUCCESS, KILLED, SUCCESSFUL, - * RUNNING, FINISHED, or SPECULATIVE. - * - * <p>The FAILED status indicates that the task has failed and will not be able to complete - * successfully. The SUCCESS status indicates that the task has completed successfully and - * produced a result. The KILLED status indicates that the task was killed before it could - * complete. The SUCCESSFUL status indicates that the task has completed successfully, but - * did not produce a result. The RUNNING status indicates that the task is currently running. - * The FINISHED status indicates that the task has finished running, but its result has not - * yet been obtained. The SPECULATIVE status indicates that the task is running in a speculative - * manner, in addition to the primary task. - * - * @author [Adeel Aslam] - */ - public enum TaskStatusForRunning { - FAILED, SUCCESS, KILLED, SUCCESSFUL,RUNNING,FINISHED, SPECULATIVE; - } - /** - * Sets the name of the event associated with this task. - * - * @param name the name of the event - */ - void setEventame(String name); - /** - * Gets the name of the event associated with this task. - * - * @return the name of the event - */ - String getEventName(); - /** - * Sets the ID of this task. - * - * @param id the task ID - */ - void setID(String id); - /** - * Gets the ID of this task. - * - * @return the task ID - */ - String getID(); - /** - * Sets the IP address of the host machine executing this task. - * - * @param Ip the IP address of the host machine - */ - void setHostIP(String Ip); - /** - * Gets the IP address of the host machine executing this task. - * - * @return the IP address of the host machine - */ - String getHostIP(); - /** - * Sets the ID of this task. - * - * @param taskId the ID of this task - */ - void setTaskID(long taskId); - /** - * Sets the ID of the stage to which this task belongs. - * - * @param id the ID of the stage to which this task belongs - */ - void setStageID(int id); - /** - * Returns the ID of the stage to which this task belongs. - * - * @return the ID of the stage to which this task belongs - */ - int getStageID(); - /** - * Returns the ID of this task. - * - * @return the ID of this task - */ - long getTaskID(); - /** - * Sets the ID of the executor assigned to this task. - * - * @param executorID the ID of the executor assigned to this task - */ - void setStringExecutorID(String executorID); - /** - * Returns the ID of the executor assigned to this task. - * - * @return the ID of the executor assigned to this task - */ - String getExecutorID(); - /** - * Sets the status of this task. - * - * @param status the status of this task - */ - void setTaskStatus(String status); - /** - * Returns the status of this task. - * - * @return the status of this task - */ - String getTaskStatus(); - /** - * Sets the index of this task. - * - * @param index the index of this task - */ - void setIndex(int index); - /** - * Returns the index of this task. - * - * @return the index of this task - */ - int getIndex(); - /** - * Sets the partition of this task. - * - * @param partition the partition of this task - */ - void setPartition(int partition); - /** - * Returns the partition of this task. - * - * @return the partition of this task - */ - int getPartition(); - /** - * Sets the launch time of this task. - * - * @param time the launch time of this task - */ - void setLaunchTime(long time); - /** - * Returns the launch time of this task. - * - * @return the launch time of this task - */ - long getLaunchTime(); - /** - * Sets the finish time of this task. - * - * @param time the finish time of this task - */ - void setFinishTime(long time); - /** - * Returns the finish time of this task. - * - * @return the finish time of this task - */ - long getFinishTime(); - /** - * Sets the getting time of this task. - * - * @param time the getting time of this task - */ - void setGettingTime(long time); - /** - * Returns the getting time of this task. - * - * @return the getting time of this task - */ - long getGettingTime(); - /** - * Sets the duration time of this task. - * - * @param time the duration time of this task - */ - void setDurationTime(long time); - /** - * Returns the duration time of this task. - * - * @return the duration time of this task - */ - long getDurationTime(); - /** - * Sets the status of this task. - * - * @param status the status of this task - */ - void setTaskStatus(boolean status); - /** - * Returns the status of this task. - * - * @return the status of this task - */ - boolean getTaskSatus(); - - /** - - Sets the task status for running. - @param taskStatusForRunning the {@link TaskStatusForRunning} status to be set for the task - */ - void setTaskStatusForRunning(TaskStatusForRunning taskStatusForRunning); - - /** - - Returns the current task status for running. - @return the {@link TaskStatusForRunning} status of the task - */ - TaskStatusForRunning getTaskStatusForRunning(); - - /** - - Returns the {@link TaskMetric} associated with this task. - @return the {@link TaskMetric} of the task - */ - org.apache.wayang.spark.monitoring.metrics.TaskMetric getTaskMetric(); - - /** - - Sets the {@link TaskMetric} associated with this task. - @param taskMetric the {@link TaskMetric} to be set for the task - */ - void setTaskMetric(org.apache.wayang.spark.monitoring.metrics.TaskMetric taskMetric); - // void setTaskRunningStatus(boolean status) - - -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/TaskMetric.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/TaskMetric.java deleted file mode 100644 index 84f6b93e..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/TaskMetric.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.interfaces; - -import java.io.Serializable; -/** - * The TaskMetric interface defines the methods used to set and retrieve - * performance metrics for a given task. - */ -public interface TaskMetric extends Serializable { - - /** - * Sets the number of bytes read by the task. - * - * @param bytesRead the number of bytes read - */ - void setBytesRead(long bytesRead); - - /** - * Gets the number of bytes read by the task. - * - * @return the number of bytes read - */ - long getByteRead(); - - /** - * Sets the CPU time used for deserializing the task executor. - * - * @param executorDeserializeCpuTime the CPU time used for deserializing the executor - */ - void setExecutorDeserializeCpuTime(long executorDeserializeCpuTime ); - - /** - * Gets the CPU time used for deserializing the task executor. - * - * @return the CPU time used for deserializing the executor - */ - long getExecutorDeserializeCpuTime(); - - /** - * Sets the time taken to deserialize the task executor. - * - * @param executorDeserializeTime the time taken to deserialize the executor - */ - void setExecutorDeserializeTime(long executorDeserializeTime); - - /** - * Gets the time taken to deserialize the task executor. - * - * @return the time taken to deserialize the executor - */ - long getExecutorDeserializeTime(); - - /** - * Sets the number of bytes spilled to disk by the task. - * - * @param DiskByteSpilled the number of bytes spilled to disk - */ - void setDiskBytesSpilled(long DiskByteSpilled); - - /** - * Gets the number of bytes spilled to disk by the task. - * - * @return the number of bytes spilled to disk - */ - long getDiskBytesSpilled(); - - /** - * Sets the total time taken by the task executor to run. - * - * @param time the time taken by the executor to run - */ - void setExecutorRunTime(long time); - - /** - * Gets the total time taken by the task executor to run. - * - * @return the time taken by the executor to run - */ - long getexecutorRunTime(); - - /** - * Sets the amount of time spent by the JVM on garbage collection. - * - * @param time the amount of time spent on garbage collection - */ - void setjvmGCTime(long time); - - /** - * Gets the amount of time spent by the JVM on garbage collection. - * - * @return the amount of time spent on garbage collection - */ - long getJVMGCTime(); - - /** - * Sets the peak execution memory used by the task executor. - * - * @param peakExecutionMemory the peak execution memory used by the executor - */ - void setPeakExecutionMemory(long peakExecutionMemory); - - /** - * Gets the peak execution memory used by the task executor. - * - * @return the peak execution memory used by the executor - */ - long getPeakExecutionMemory(); - - /** - * Sets the size of the result produced by the task. - * - * @param resultSize the size of the result produced - */ - void setResultSize(long resultSize); - - /** - * Gets the size of the result produced by the task. - * - * @return the size of the result produced - */ - long getResultSize(); - /** - * Sets the time taken to serialize the result of the task. - * - * @param resultSerializationTime the time taken to serialize the result - */ - void setResultSerializationTime(long resultSerializationTime); - /** - * Returns the time taken to serialize the result of the task. - * - * @return the time taken to serialize the result - */ - long getResultSerializationTime(); - /** - * Sets the number of records written by the task. - * - * @param recordsWritten the number of records written - */ - void setRecordsWritten(long recordsWritten); - /** - * Returns the number of records written by the task. - * - * @return the number of records written - */ - long getRecordsWrittern(); - /** - * Sets the number of bytes written by the task. - * - * @param bytesWritten the number of bytes written - */ - void setBytesWritten(long bytesWritten); - /** - * Returns the number of bytes written by the task. - * - * @return the number of bytes written - */ - long getBytesWrittern (); -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationEnd.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationEnd.java deleted file mode 100644 index 6f463c76..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationEnd.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.Application; -import org.apache.wayang.spark.monitoring.interfaces.Job; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; - -import java.util.List; -/** - * The {@code ApplicationEnd} class represents an application that has ended. It implements - * the {@link Application} interface and the {@link SerializableObject} interface, indicating - * that it is an application that can be serialized and deserialized to and from a stream of bytes. - * <p> - * This class contains fields to store the name, time, ID, Spark user, event name, and list of jobs - * associated with the application. It also provides methods to set and get the values of these fields. - * </p> - * - * @see Application - * @see SerializableObject - */ -public class ApplicationEnd implements Application, SerializableObject { - private String name; - private long time; - private String id; - private String sparkUser; - private String eventName; - private List<Job> listOfJobs; - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return name; - } - - @Override - public void setName(String name) { - this.name=name; - } - - @Override - public String getName() { - return name; - } - - @Override - public void setStartTime(long time) { - this.time=time; - } - - @Override - public long getTime() { - return time; - } - - @Override - public void setAppID(String id) { - this.id=id; - } - - @Override - public String getAppID() { - return id; - } - - @Override - public void setSparkUser(String user) { - this.sparkUser=user; - } - - @Override - public String getSparkUser() { - return sparkUser; - } - - @Override - public void setListOfJobs(List<Job> listOfJobs) { - this.listOfJobs=listOfJobs; - } - - @Override - public List<Job> getListOfjobs() { - return listOfJobs; - } -} - diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationStart.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationStart.java deleted file mode 100644 index 6caee437..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationStart.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.Application; -import org.apache.wayang.spark.monitoring.interfaces.Job; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; - -import java.util.List; -/** - * The ApplicationStart class implements the Application and SerializableObject interfaces. It represents an application start event in a Spark cluster. - * This class contains information about the name of the application, the time it started, the application ID, the Spark user who started the application, - * the name of the event, and a list of jobs associated with the application. - */ -public class ApplicationStart implements Application, SerializableObject { - private String name; - private long time; - private String id; - private String sparkUser; - private String eventName; - private List<Job> listOfJobs; - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - @Override - public void setName(String name) { - this.name=name; - } - - @Override - public String getName() { - return name; - } - - @Override - public void setStartTime(long time) { - this.time=time; - } - - @Override - public long getTime() { - return time; - } - - @Override - public void setAppID(String id) { - this.id=id; - } - - @Override - public String getAppID() { - return id; - } - - @Override - public void setSparkUser(String user) { - this.sparkUser=user; - } - - @Override - public String getSparkUser() { - return sparkUser; - } - - @Override - public void setListOfJobs(List<Job> listOfJobs) { - this.listOfJobs= listOfJobs; - } - - @Override - public List<Job> getListOfjobs() { - return listOfJobs; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorAdded.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorAdded.java deleted file mode 100644 index 7a644e2f..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorAdded.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.Executor; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; - -/** - * The ExecutorAdded class represents an executor added event in a distributed computing system. - * It implements the Executor interface and the SerializableObject interface. - * - * This class contains information about the executor that was added, such as its stage ID, executor ID, - * stage attempt, time, executor host, total cores, and reason of removal. - * - * This class provides getters and setters for all of the above properties, and implements the methods - * defined in the Executor interface. - */ - -public class ExecutorAdded implements Executor, SerializableObject { - private int stageId; - private String executorID; - private int stageAttempt; - private long time; - private String executorHost; - private int totalCores; - private String reasonOfRemoval; - - private String eventName; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setStageID(int id) { - this.stageId=id; - } - - @Override - public int getStageID() { - return stageId; - } - - @Override - public void setExecutorID(String id) { - this.executorID=id; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void stageAttempt(int id) { - this.stageAttempt=id; - } - - @Override - public int getStageAttempt() { - return stageAttempt; - } - - @Override - public void executorTime(long Time) { - this.time=time; - } - - @Override - public long getExecutorTime() { - return time; - } - - @Override - public void setExecutorHost(String host) { - this.executorHost=host; - } - - @Override - public String getExecutorHost() { - return executorHost; - } - - @Override - public void setTotalCores(int cores) { - this.totalCores=cores; - } - - @Override - public int getTotalCores() { - return totalCores; - } - - @Override - public void setResourceInfo(int resourceInfoId) { - - } - public String getReasonOfRemoval() { - return reasonOfRemoval; - } - - public void setReasonOfRemoval(String reasonOfRemoval) { - this.reasonOfRemoval = reasonOfRemoval; - } - @Override - public int getResourceInfo() { - return 0; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorRemoved.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorRemoved.java deleted file mode 100644 index 1f22236f..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorRemoved.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.Executor; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; - -/** - * An event class representing the removal of an executor. - * Implements the Executor interface and SerializableObject interface. - */ -public class ExecutorRemoved implements Executor, SerializableObject { - private int stageId; - private String executorID; - private int stageAttempt; - private long time; - private String executorHost; - private int totalCores; - - public String getReasonOfRemoval() { - return reasonOfRemoval; - } - - public void setReasonOfRemoval(String reasonOfRemoval) { - this.reasonOfRemoval = reasonOfRemoval; - } - - private String reasonOfRemoval; - - private String eventName; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setStageID(int id) { - this.stageId=id; - } - - @Override - public int getStageID() { - return stageId; - } - - @Override - public void setExecutorID(String id) { - this.executorID=id; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void stageAttempt(int id) { - this.stageAttempt=id; - } - - @Override - public int getStageAttempt() { - return stageAttempt; - } - - @Override - public void executorTime(long Time) { - this.time=time; - } - - @Override - public long getExecutorTime() { - return time; - } - - @Override - public void setExecutorHost(String host) { - this.executorHost=host; - } - - @Override - public String getExecutorHost() { - return executorHost; - } - - @Override - public void setTotalCores(int cores) { - this.totalCores=cores; - } - - @Override - public int getTotalCores() { - return totalCores; - } - - @Override - public void setResourceInfo(int resourceInfoId) { - - } - - @Override - public int getResourceInfo() { - return 0; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorUpdated.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorUpdated.java deleted file mode 100644 index 6883db0a..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorUpdated.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.Executor; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; - -/** - - An implementation of the Executor interface that represents an updated executor. - - This class contains information about an executor that has been updated, including its - - stage ID, executor ID, stage attempt, execution time, host, total cores, reason of removal, - - and event name. - - @author [Adeel Aslam] - - @version 1.0 - - @since [3-24-2023] - */ -public class ExecutorUpdated implements Executor, SerializableObject { - private int stageId; - private String executorID; - private int stageAttempt; - private long time; - private String executorHost; - private int totalCores; - public String getReasonOfRemoval() { - return reasonOfRemoval; - } - - public void setReasonOfRemoval(String reasonOfRemoval) { - this.reasonOfRemoval = reasonOfRemoval; - } - - private String reasonOfRemoval; - - private String eventName; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setStageID(int id) { - this.stageId=id; - } - - @Override - public int getStageID() { - return stageId; - } - - @Override - public void setExecutorID(String id) { - this.executorID=id; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void stageAttempt(int id) { - this.stageAttempt=id; - } - - @Override - public int getStageAttempt() { - return stageAttempt; - } - - @Override - public void executorTime(long Time) { - this.time=time; - } - - @Override - public long getExecutorTime() { - return time; - } - - @Override - public void setExecutorHost(String host) { - this.executorHost=host; - } - - @Override - public String getExecutorHost() { - return executorHost; - } - - @Override - public void setTotalCores(int cores) { - this.totalCores=cores; - } - - @Override - public int getTotalCores() { - return totalCores; - } - - @Override - public void setResourceInfo(int resourceInfoId) { - - } - - @Override - public int getResourceInfo() { - return 0; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobEnd.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobEnd.java deleted file mode 100644 index 43229f35..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobEnd.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.Job; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Stage; -import scala.collection.Seq; - -import java.util.List; -/** - - * JobEnd class represents a job's end in a system. - - * It implements the Job and SerializableObject interfaces. - */ -public class JobEnd implements Job, SerializableObject { - private int id; - private int productArity; - private Seq<Object> seqStageId; - - private String eventName; - private List<Stage> listOfStages; - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setJobID(int jobID) { - this.id=jobID; - } - - @Override - public int getJobID() { - return id; - } - - @Override - public void setProductArity(int productArity) { - this.productArity=productArity; - } - - @Override - public int getProductArity() { - return productArity; - } - - @Override - public void setStageID(Seq<Object> stageId) { - this.seqStageId=stageId; - } - - @Override - public Seq<Object> getStageID() { - return seqStageId; - } - - @Override - public void setListOfStages(List<Stage> listOfStages) { - this.listOfStages=listOfStages; - } - - @Override - public List<Stage> getListOfStages() { - return listOfStages; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobStart.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobStart.java deleted file mode 100644 index b83db3bf..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobStart.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.Job; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Stage; -import scala.collection.Seq; - -import java.util.List; -/** - * JobStart class represents a job's start in a system. - * It implements the Job and SerializableObject interfaces. - */ -public class JobStart implements Job, SerializableObject { - private int id; - private int productArity; - private Seq<Object> seqStageId; - private String eventName; - private List<Stage> listOfStages; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setJobID(int jobID) { - this.id=jobID; - } - - @Override - public int getJobID() { - return id; - } - - @Override - public void setProductArity(int productArity) { - this.productArity=productArity; - } - - @Override - public int getProductArity() { - return productArity; - } - - @Override - public void setStageID(Seq<Object> stageId) { - this.seqStageId=stageId; - } - - @Override - public Seq<Object> getStageID() { - return seqStageId; - } - - @Override - public void setListOfStages(List<Stage> listOfStages) { - this.listOfStages=listOfStages; - } - - @Override - public List<Stage> getListOfStages() { - return listOfStages; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java deleted file mode 100644 index 6cb182c8..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java +++ /dev/null @@ -1,524 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.*; -import org.apache.spark.scheduler.cluster.ExecutorInfo; -import org.apache.wayang.spark.monitoring.interfaces.Stage; -import org.apache.wayang.spark.monitoring.interfaces.Task; -import org.apache.wayang.spark.monitoring.interfaces.*; -import scala.collection.Seq; - -import java.io.*; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -/** - * A Spark listener implementation that captures events generated during the Spark job execution - * and sends them to a Kafka topic for further processing. - */ -public class SparkListener extends org.apache.spark.scheduler.SparkListener { - // Member variables to store various data objects - private List<Job> listOfJobs; - private List<Stage> listOfStages; - private List<Task> listOfTasks; - private List<SerializableObject> applicationObjects; - private List<SerializableObject> jobObjects; - private List<SerializableObject> stageObjects; - private List<SerializableObject> taskObjects; - // Kafka producer to send data to Kafka topic - private KafkaProducer<String, byte[]> producer; - private final static String KAFKA_PROPERTIES = "/wayang-spark-kafka.properties"; - // Kafka topic name to which the data will be sent - private String kafkaTopic; - // Logger instance to log messages - protected final Logger logger = LogManager.getLogger(this.getClass()); - /** - * Default constructor that initializes the Kafka producer and various data lists. - */ - public SparkListener(){ - Properties props = new Properties(); - try (InputStream inputStream = getClass().getResourceAsStream(KAFKA_PROPERTIES)) { - props.load(inputStream); - } - catch (Exception e){ - logger.error("This is an error message with an exception.", e); - } - producer = new KafkaProducer<>(props); - this.kafkaTopic = props.getProperty("kafka.topic"); - this.listOfJobs= new ArrayList<>(); - this.listOfStages= new ArrayList<>(); - this.listOfTasks= new ArrayList<>(); - this.applicationObjects= new ArrayList<>(); - this.jobObjects= new ArrayList<>(); - this.stageObjects= new ArrayList<>(); - this.taskObjects= new ArrayList<>(); - } - - @Override - public void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) { - super.onExecutorBlacklisted(executorBlacklisted); - executorBlacklisted.executorId(); - executorBlacklisted.time(); - } - /** - * Overridden method that captures the event generated when an executor is added in Spark - * and sends it to the Kafka topic for further processing. - * - * @param executorAddedSpark The event that occurred when the executor was added - */ - @Override - public void onExecutorAdded(SparkListenerExecutorAdded executorAddedSpark) { - super.onExecutorAdded(executorAddedSpark); - Executor executorAdded= new ExecutorAdded(); - executorAdded.setEventame("ExecutorAdded"); - executorAdded.setExecutorID(executorAddedSpark.executorId()); - ExecutorInfo executorInfo=executorAddedSpark.executorInfo(); - executorAdded.setExecutorHost(executorInfo.executorHost()); - executorAdded.setTotalCores(executorInfo.totalCores()); - // executorAdded.setResourceInfo(executorInfo.resourceProfileId()); - executorAdded.setExecutorHost(executorInfo.executorHost()); - executorAdded.executorTime(executorAddedSpark.time()); - try { - ByteArrayOutputStream boas = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(boas); - out.writeObject(executorAdded); - producer.send(new ProducerRecord(kafkaTopic, "ExecutorAdded", boas.toByteArray())); - } - catch (Exception e){ - e.printStackTrace(); - } - - } - - /** - - This method is called when an executor is removed from a Spark application, and it sends information about - the removal event to a Kafka topic. - @param executorRemovedSpark the SparkListenerExecutorRemoved event containing information about the removed executor - */ - @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemovedSpark) { - super.onExecutorRemoved(executorRemovedSpark); - Executor executorRemoved= new ExecutorRemoved(); - executorRemoved.setEventame("ExecutorRemoved"); - executorRemoved.setExecutorHost(executorRemovedSpark.executorId()); - executorRemoved.setReasonOfRemoval(executorRemovedSpark.reason()); - executorRemoved.executorTime(executorRemovedSpark.time()); - try { - ByteArrayOutputStream boas = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(boas); - out.writeObject(executorRemoved); - producer.send(new ProducerRecord(kafkaTopic, "ExecutorRemoved", boas.toByteArray())); - } - catch (Exception e){ - this.logger.error("Exception {} for executor added",e); - } - } - /** - - This method is called when metrics are updated for an executor in a Spark application, and it sends information about - the updated executor to a Kafka topic. - @param executorMetricsUpdateSpark the SparkListenerExecutorMetricsUpdate event containing information about the updated executor's metrics - */ - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdateSpark) { - super.onExecutorMetricsUpdate(executorMetricsUpdateSpark); - Executor executorUpdated= new ExecutorUpdated(); - executorUpdated.setExecutorID(executorMetricsUpdateSpark.execId()); - executorUpdated.setEventame("ExecutorUpdated"); - try { - ByteArrayOutputStream boas = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(boas); - out.writeObject(executorUpdated); - producer.send(new ProducerRecord(kafkaTopic, "ExecutorUpdated", boas.toByteArray())); - } - catch (Exception e){ - e.printStackTrace(); - } - } - /** - - This method is called when a task starts in a Spark application, and it creates a new TaskStart object with information - about the task and adds it to a list of task objects for serializable. - @param taskStartSpark the SparkListenerTaskStart event containing information about the started task - */ - @Override - public void onTaskStart(SparkListenerTaskStart taskStartSpark) { - super.onTaskStart(taskStartSpark); - Task taskStart=new TaskStart(); - TaskInfo taskInfo= taskStartSpark.taskInfo(); - taskStart.setID(taskInfo.id()); - taskStart.setEventame("OnTaskStart"); - taskStart.setHostIP(taskInfo.host()); - taskStart.setStringExecutorID(taskInfo.executorId()); - taskStart.setTaskStatus(taskInfo.status()); - taskStart.setTaskID(taskInfo.taskId()); - taskStart.setIndex(taskInfo.index()); - taskStart.setLaunchTime(taskInfo.launchTime()); - taskStart.setFinishTime(taskInfo.finishTime()); - //this.taskStart.setDurationTime(taskInfo.duration()); - taskStart.setGettingTime(taskInfo.gettingResultTime()); - taskStart.setStageID(taskStartSpark.stageId()); - //this.taskStart.setPartition(taskInfo.); - if(taskInfo.failed()){ - taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED); - } - else if(taskInfo.finished()){ - taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED); - } - else if(taskInfo.killed()){ - taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED); - } - else if(taskInfo.running()){ - taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING); - } - else if(taskInfo.successful()){ - taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL); - } - else if(taskInfo.speculative()){ - taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE); - } - else { - taskStart.setTaskStatusForRunning(null); - } - - this.taskObjects.add(taskStart); - - } - - /** - * This method is called when a Spark application starts. It extends the behavior of the - * superclass by passing along the given SparkListenerApplicationStart event to its parent - * implementation. - * - * @param applicationStartSpark the SparkListenerApplicationStart event that was triggered - */ - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStartSpark) { - super.onApplicationStart(applicationStartSpark); - Application applicationStart= new ApplicationStart(); - applicationStart.setAppID(applicationStartSpark.appId().get()); - applicationStart.setEventame("ApplicationStart"); - applicationStart.setName(applicationStartSpark.appName()); - applicationStart.setSparkUser(applicationStartSpark.sparkUser()); - applicationStart.setStartTime(applicationStartSpark.time()); - this.applicationObjects.add((SerializableObject) applicationStart); - // this.jobObjects.add((SerializableObject) this.applicationStart); - // this.stageObjects.add((SerializableObject) this.applicationStart); - // this.taskObjects.add((SerializableObject) this.applicationStart); - // this.objects.add((SerializableObject) this.applicationStart); - } - /** - - This method is called when a new Spark job starts. It creates a new JobStart object to represent the event, - sets its properties using the data provided in the SparkListenerJobStart object, and adds the object to the - list of jobs and the list of job objects. - @param jobStartSpark the SparkListenerJobStart object containing data about the new job - */ - @Override - public void onJobStart(SparkListenerJobStart jobStartSpark) { - super.onJobStart(jobStartSpark); - Job jobStart= new JobStart(); - jobStart.setEventame("JobStart"); - jobStart.setJobID(jobStartSpark.jobId()); - jobStart.setProductArity(jobStartSpark.productArity()); - jobStart.setStageID((Seq<Object>) jobStartSpark.stageIds()); - this.listOfJobs.add(jobStart); - this.jobObjects.add((SerializableObject) jobStart); - } -/** - - This method is called when a job ends in the Spark application. It creates a new instance of the JobEnd class, - sets the necessary attributes and adds it to the list of jobs and job objects. Then it serializes the job objects - and sends them to Kafka. It also resets the job objects and list of stages for the next job. - @param jobEndSpark a SparkListenerJobEnd object representing the end of a job - */ - @Override - public void onJobEnd(SparkListenerJobEnd jobEndSpark) { - super.onJobEnd(jobEndSpark); - Job jobEnd= new JobEnd(); - jobEnd.setJobID(jobEndSpark.jobId()); - jobEnd.setEventame("JobEnd"); - jobEnd.setProductArity(jobEndSpark.productArity()); - jobEnd.setListOfStages(this.listOfStages); - this.listOfJobs.add(jobEnd); - this.jobObjects.add((SerializableObject) jobEnd); - try { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(this.jobObjects); - producer.send(new ProducerRecord(kafkaTopic, "JobObjects", baos.toByteArray())); - this.jobObjects= new ArrayList<>(); - this.listOfStages= new ArrayList<>(); - } catch (Exception e) { - e.printStackTrace(); - } - } - /** - - Called when a SparkListenerTaskEnd event is triggered. - @param taskEndSpark The SparkListenerTaskEnd object representing the event that was triggered. - This method overrides the onTaskEnd method from the superclass and performs additional actions: - creates a new TaskEnd object and sets its properties based on the information in the taskEndSpark parameter - adds the TaskEnd object to the listOfTasks and taskObjects arrays - serializes the taskObjects array and sends it to a Kafka producer - The TaskEnd object represents the end of a task, and includes information such as the task's ID, event name, host IP, - executor ID, status, task ID, index, launch time, finish time, duration time, getting time, and task status for running. - The TaskMetric object represents the metrics for the task, and includes information such as the executor CPU time, - executor deserialize CPU time, executor deserialize time, disk bytes spilled, executor run time, JVM GC time, peak execution memory, - result size, and result serialization time. - This method catches and prints any exceptions that may occur during the serialization and sending of the taskObjects array. - */ - - @Override - public void onTaskEnd(SparkListenerTaskEnd taskEndSpark) { - super.onTaskEnd(taskEndSpark); - Task taskEnd=new TaskEnd(); - TaskInfo taskInfo= taskEndSpark.taskInfo(); - taskEnd.setID(taskInfo.id()); - taskEnd.setEventame("OnTaskGettingResult"); - taskEnd.setHostIP(taskInfo.host()); - taskEnd.setStringExecutorID(taskInfo.executorId()); - taskEnd.setTaskStatus(taskInfo.status()); - taskEnd.setTaskID(taskInfo.taskId()); - taskEnd.setIndex(taskInfo.index()); - taskEnd.setLaunchTime(taskInfo.launchTime()); - taskEnd.setFinishTime(taskInfo.finishTime()); - taskEnd.setDurationTime(taskInfo.duration()); - taskEnd.setGettingTime(taskInfo.gettingResultTime()); - // this. taskGettingResult.setStageID(taskGettingResult.stageId()); - //this.taskEnd.setPartition(taskInfo.partitionId()); - if(taskInfo.failed()){ - taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED); - } - else if(taskInfo.finished()){ - taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED); - } - else if(taskInfo.killed()){ - taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED); - } - else if(taskInfo.running()){ - taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING); - } - else if(taskInfo.successful()){ - taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL); - } - else if(taskInfo.speculative()){ - taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE); - } - else { - taskEnd.setTaskStatusForRunning(null); - } - - - TaskMetrics taskMetrics= taskEndSpark.taskMetrics(); - TaskMetric taskMetric= new TaskMetric(); - taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime()); - taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime()); - taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime()); - taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled()); - taskMetric.setExecutorRunTime(taskMetrics.executorRunTime()); - taskMetric.setjvmGCTime(taskMetrics.jvmGCTime()); - taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory()); - taskMetric.setResultSize(taskMetrics.resultSize()); - taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime()); - taskEnd.setTaskMetric(taskMetric); - this.listOfTasks.add(taskEnd); - this.taskObjects.add(taskEnd); - try { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(this.taskObjects); - producer.send(new ProducerRecord(kafkaTopic, "TaskEnd", baos.toByteArray())); - this.taskObjects= new ArrayList<>(); - - } catch (Exception e) { - e.printStackTrace(); - } - - } - /** - - Overrides the onStageCompleted method from SparkListener to customize handling of - stage completion events. Creates a StageCompleted object and sets its properties based on the - StageInfo and TaskMetrics of the completed stage. Adds the StageCompleted object to a list of stages - and adds it to a list of SerializableObjects to be sent to a Kafka producer. - @param stageCompletedSpark the SparkListenerStageCompleted event to be handled - */ - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompletedSpark) { - super.onStageCompleted(stageCompletedSpark); - Stage stageCompleted= new StageCompleted(); - StageInfo stageInfo=stageCompletedSpark.stageInfo(); - stageCompleted.setDetails(stageInfo.details()); - stageCompleted.setEventame("OnStageSubmitted"); - stageCompleted.setStageName(stageInfo.name()); - stageCompleted.setStatus(stageInfo.getStatusString()); - stageCompleted.setNumberOfTasks(stageInfo.numTasks()); - stageCompleted.setID(stageInfo.stageId()); - stageCompleted.setSubmissionTime((Long) stageInfo.submissionTime().get()); - stageCompleted.setCompletionTime((Long) stageInfo.completionTime().get()); - TaskMetrics taskMetrics= stageCompletedSpark.stageInfo().taskMetrics(); - TaskMetric taskMetric= new TaskMetric(); - taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime()); - taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime()); - taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime()); - taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled()); - taskMetric.setExecutorRunTime(taskMetrics.executorRunTime()); - taskMetric.setjvmGCTime(taskMetrics.jvmGCTime()); - taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory()); - taskMetric.setResultSize(taskMetrics.resultSize()); - taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime()); - stageCompleted.setTaskMetric(taskMetric); - this.listOfStages.add(stageCompleted); - stageCompleted.setListOfTasks(this.listOfTasks); - this.stageObjects.add((SerializableObject) stageCompleted); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(this.stageObjects); - producer.send(new ProducerRecord(kafkaTopic, "Stage", baos.toByteArray())); - this.stageObjects= new ArrayList<>(); - this.listOfTasks= new ArrayList<>(); - } catch (Exception e) { - e.printStackTrace(); - } - } - /** - * This method is called whenever a new stage is submitted to the Spark engine. It adds the details of the - * submitted stage to a list of stages and a list of stage objects. - * - * @param stageSubmittedSpark the SparkListenerStageSubmitted object containing information about the submitted stage - */ - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmittedSpark) { - super.onStageSubmitted(stageSubmittedSpark); - Stage stageSubmitted= new StageSubmitted(); - StageInfo stageInfo=stageSubmittedSpark.stageInfo(); - stageSubmitted.setDetails(stageInfo.details()); - stageSubmitted.setEventame("OnStageSubmitted"); - stageSubmitted.setStageName(stageInfo.name()); - stageSubmitted.setStatus(stageInfo.getStatusString()); - stageSubmitted.setNumberOfTasks(stageInfo.numTasks()); - stageSubmitted.setID(stageInfo.stageId()); - stageSubmitted.setSubmissionTime((Long) stageInfo.submissionTime().get()); - //this.stageSubmitted.setCompletionTime((Long) stageInfo.completionTime().get()); - TaskMetrics taskMetrics= stageSubmittedSpark.stageInfo().taskMetrics(); - TaskMetric taskMetric= new TaskMetric(); - taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime()); - taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime()); - taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime()); - taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled()); - taskMetric.setExecutorRunTime(taskMetrics.executorRunTime()); - taskMetric.setjvmGCTime(taskMetrics.jvmGCTime()); - taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory()); - taskMetric.setResultSize(taskMetrics.resultSize()); - taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime()); - stageSubmitted.setTaskMetric(taskMetric); - this.listOfStages.add(stageSubmitted); - this.stageObjects.add((SerializableObject) stageSubmitted); - - } - - /** - * This method is called when the Spark application ends. It creates a new ApplicationEnd object containing - * the start time of the application and a list of jobs, and adds the object to a list of application objects. - * It then serializes the list of application objects and sends it to a Kafka topic. Finally, it clears the - * lists of application and job objects to prepare for the next application run. - * - * @param applicationEndSpark the SparkListenerApplicationEnd object containing information about the end of the application - */ - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEndSpark) { - super.onApplicationEnd(applicationEndSpark); - Application applicationEnd=new ApplicationEnd(); - applicationEnd.setStartTime(applicationEndSpark.time()); - applicationEnd.setListOfJobs(this.listOfJobs); - this.applicationObjects.add((SerializableObject) applicationEnd); - try { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(this.applicationObjects); - producer.send(new ProducerRecord(kafkaTopic, "ApplicationObjects", baos.toByteArray())); - this.applicationObjects= new ArrayList<>(); - this.listOfJobs= new ArrayList<>(); - } catch (Exception e) { - e.printStackTrace(); - } - - - } - /** - - Called when a task's result is being fetched. Adds a new Task object to the listOfTasks and taskObjects. - @param taskGettingResultSpark The SparkListenerTaskGettingResult object containing information about the task result. - */ - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResultSpark) { - super.onTaskGettingResult(taskGettingResultSpark); - Task taskGettingResult=new TaskGettingResult(); - TaskInfo taskInfo= taskGettingResultSpark.taskInfo(); - taskGettingResult.setID(taskInfo.id()); - taskGettingResult.setEventame("OnTaskGettingResult"); - taskGettingResult.setHostIP(taskInfo.host()); - taskGettingResult.setStringExecutorID(taskInfo.executorId()); - taskGettingResult.setTaskStatus(taskInfo.status()); - taskGettingResult.setTaskID(taskInfo.taskId()); - taskGettingResult.setIndex(taskInfo.index()); - taskGettingResult.setLaunchTime(taskInfo.launchTime()); - taskGettingResult.setFinishTime(taskInfo.finishTime()); - taskGettingResult.setDurationTime(taskInfo.duration()); - taskGettingResult.setGettingTime(taskInfo.gettingResultTime()); - // this. taskGettingResult.setStageID(taskGettingResult.stageId()); - // this.taskGettingResult.setPartition(taskInfo.partitionId()); - if(taskInfo.failed()){ - taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED); - } - else if(taskInfo.finished()){ - taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED); - } - else if(taskInfo.killed()){ - taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED); - } - else if(taskInfo.running()){ - taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING); - } - else if(taskInfo.successful()){ - taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL); - } - else if(taskInfo.speculative()){ - taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE); - } - else { - taskGettingResult.setTaskStatusForRunning(null); - } - this.listOfTasks.add(taskGettingResult); - this.taskObjects.add( taskGettingResult); - - } -} \ No newline at end of file diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageCompleted.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageCompleted.java deleted file mode 100644 index 295e6db0..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageCompleted.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Stage; -import org.apache.wayang.spark.monitoring.interfaces.Task; - -import java.util.List; -/** - * Represents a completed stage in a distributed computing system. - * - * This class implements the Stage interface and SerializableObject interface. - * - * The completed stage contains the following information: - * - The ID of the stage - * - The number of tasks in the stage - * - The name of the stage - * - The status of the stage - * - The details of the stage - * - The ID of the executor that executed the stage - * - The stage attempt ID - * - The task metric for the stage - * - The list of tasks for the stage - * - The event name for the stage - * - The stage completion time - * - * This class provides methods to get and set the above information. - */ -public class StageCompleted implements Stage, SerializableObject { - private int id; - private int tasks; - private String stageName; - private String status; - private String details; - private String executorID; - private int stateAttempt; - private TaskMetric taskMetric; - private List<Task> listOfTasks; - private String eventName; - private long stageCompletionTime; - - @Override - public void setTaskMetric(TaskMetric taskMetric) { - this.taskMetric=taskMetric; - } - - - @Override - public void setExecutorID(String ID) { - this.executorID=ID; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void setStageAttemptId(int id) { - this.id=id; - } - - @Override - public int getStageAttemptId() { - return id; - } - - @Override - public void setListOfTasks(List<Task> tasks) { - this.listOfTasks=tasks; - } - - @Override - public List<Task> getListOfTasks() { - return listOfTasks; - } - - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setID(int ID) { - this.id=ID; - } - - @Override - public int getID() { - return id; - } - - @Override - public void setNumberOfTasks(int tasks) { - this.tasks=tasks; - } - - @Override - public int getNumberOfTasks() { - return tasks; - } - - @Override - public void setStageName(String name) { - this.stageName=name; - } - - @Override - public String getStageName() { - return stageName; - } - - @Override - public void setStatus(String Status) { - this.status=status; - } - - @Override - public String getStatus() { - return status; - } - - @Override - public void setDetails(String details) { - this.details=details; - } - - @Override - public String getDetails() { - return details; - } - - @Override - public void setSubmissionTime(long time) { - - - } - - @Override - public long getSubmissionTime() { - return 0; - } - - @Override - public void setCompletionTime(long time) { - this.stageCompletionTime=time; - } - - @Override - public long getCompletionTime() { - return stageCompletionTime; - } - - @Override - public TaskMetric getTaskMetric() { - return taskMetric; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageExecutorMetrics.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageExecutorMetrics.java deleted file mode 100644 index 25497fb3..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageExecutorMetrics.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Stage; -import org.apache.wayang.spark.monitoring.interfaces.Task; - -import java.util.List; -/** - * The StageExecutorMetrics class implements the Stage and SerializableObject interfaces and represents - * the metrics associated with a stage executed by an executor. - */ - -public class StageExecutorMetrics implements Stage, SerializableObject { - private int id; - private int tasks; - private String stageName; - private String status; - private String details; - private String executorID; - private int stateAttempt; - private List<Task> listOfTasks; - - public TaskMetric getTaskMetric() { - return taskMetric; - } - - @Override - public void setTaskMetric(TaskMetric taskMetric) { - this.taskMetric = taskMetric; - } - - - @Override - public void setExecutorID(String ID) { - this.executorID=ID; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void setStageAttemptId(int id) { - this.id=id; - } - - @Override - public int getStageAttemptId() { - return id; - } - - @Override - public void setListOfTasks(List<Task> tasks) { - this.listOfTasks=tasks; - } - - @Override - public List<Task> getListOfTasks() { - return listOfTasks; - } - - private TaskMetric taskMetric; - - private String eventName; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setID(int ID) { - this.id=ID; - } - - @Override - public int getID() { - return id; - } - - @Override - public void setNumberOfTasks(int tasks) { - this.tasks=tasks; - } - - @Override - public int getNumberOfTasks() { - return tasks; - } - - @Override - public void setStageName(String name) { - this.stageName=name; - } - - @Override - public String getStageName() { - return stageName; - } - - @Override - public void setStatus(String Status) { - this.status=status; - } - - @Override - public String getStatus() { - return status; - } - - @Override - public void setDetails(String details) { - this.details=details; - } - - @Override - public String getDetails() { - return details; - } - - @Override - public void setSubmissionTime(long time) { - - } - - @Override - public long getSubmissionTime() { - return 0; - } - - @Override - public void setCompletionTime(long time) { - - } - - @Override - public long getCompletionTime() { - return 0; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageSubmitted.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageSubmitted.java deleted file mode 100644 index 4729e77c..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageSubmitted.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Stage; -import org.apache.wayang.spark.monitoring.interfaces.Task; - -import java.util.List; -/** - - * The StageSubmitted class implements the Stage and SerializableObject interfaces to represent a submitted stage in a distributed system. - * It contains information about the stage's ID, number of tasks, name, status, details, executor ID, list of tasks, stage attempt ID, stage submission time, - * and task metric. It also allows the setting and getting of each of these properties through various interface methods. - */ -public class StageSubmitted implements Stage, SerializableObject { - private int id; - private int tasks; - private String stageName; - private String status; - private String details; - private String executorID; - private List<Task> listOfTasks; - private int stageAttemptID; - private long stageSubmissionTime; - public TaskMetric getTaskMetric() { - return taskMetric; - } - - public void setTaskMetric(TaskMetric taskMetric) { - this.taskMetric = taskMetric; - } - - @Override - public void setExecutorID(String ID) { - this.executorID=ID; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void setStageAttemptId(int id) { - this.stageAttemptID=id; - } - - @Override - public int getStageAttemptId() { - return id; - } - - @Override - public void setListOfTasks(List<Task> tasks) { - this.listOfTasks= tasks; - } - - @Override - public List<Task> getListOfTasks() { - return listOfTasks; - } - - private TaskMetric taskMetric; - private String eventName; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setID(int ID) { - this.id=ID; - } - - @Override - public int getID() { - return id; - } - - @Override - public void setNumberOfTasks(int tasks) { - this.tasks=tasks; - } - - @Override - public int getNumberOfTasks() { - return tasks; - } - - @Override - public void setStageName(String name) { - this.stageName=name; - } - - @Override - public String getStageName() { - return stageName; - } - - @Override - public void setStatus(String Status) { - this.status=status; - } - - @Override - public String getStatus() { - return status; - } - - @Override - public void setDetails(String details) { - this.details=details; - } - - @Override - public String getDetails() { - return details; - } - - @Override - public void setSubmissionTime(long time) { - this.stageSubmissionTime=time; - } - - @Override - public long getSubmissionTime() { - return this.stageSubmissionTime; - } - - @Override - public void setCompletionTime(long time) { - - } - - @Override - public long getCompletionTime() { - return 0; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskEnd.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskEnd.java deleted file mode 100644 index 7dc3e281..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskEnd.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Task; -/** - * Represents the end status of a task execution. - * - * <p>Implementing the {@link Task} interface, this class provides methods to set and get the - * attributes of a task, such as its ID, host IP, launch and finish times, and status. It also - * implements the {@link SerializableObject} interface to allow serialization of the class. - * - * <p>The class also includes a {@link TaskMetric} object to store metrics related to the task, - * and a {@link TaskStatusForRunning} object to provide information about the task's status during - * execution. - * - * @author [Adeel Aslam] - */ -public class TaskEnd implements Task, SerializableObject { - private String id; - private String hostIP; - private long taskId; - private String executorID; - private String taskStatus; - private int Index; - private int partition; - private long launchTime; - private long durationTime; - private long finishTime; - private long gettingResultTime; - private boolean status; - private int stageID; - - private String eventName; - TaskMetric taskMetric; - - TaskStatusForRunning taskStatusForRunning=null; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setID(String id) { - this.id=id; - } - - @Override - public String getID() { - return id; - } - @Override - public void setHostIP(String Ip) { - this.hostIP=Ip; - } - - @Override - public String getHostIP() { - return hostIP; - } - - @Override - public void setTaskID(long taskId) { - this.taskId=taskId; - } - - @Override - public void setStageID(int id) { - this.stageID=id; - } - - @Override - public int getStageID() { - return stageID; - } - - @Override - public long getTaskID() { - return taskId; - } - - @Override - public void setStringExecutorID(String executorID) { - this.executorID=executorID; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void setTaskStatus(String status) { - this.taskStatus=status; - } - - @Override - public String getTaskStatus() { - return taskStatus; - } - - @Override - public void setIndex(int index) { - this.Index= index; - - } - - @Override - public int getIndex() { - return Index; - } - - @Override - public void setPartition(int partition) { - this.partition=partition; - } - - @Override - public int getPartition() { - return partition; - } - - @Override - public void setLaunchTime(long time) { - this.launchTime=time; - } - - @Override - public long getLaunchTime() { - return launchTime; - } - - @Override - public void setFinishTime(long time) { - this.finishTime=time; - } - - @Override - public long getFinishTime() { - return finishTime; - } - - @Override - public void setGettingTime(long time) { - this.gettingResultTime=time; - } - - @Override - public long getGettingTime() { - return gettingResultTime; - } - - @Override - public void setDurationTime(long time) { - this.durationTime=time; - } - - @Override - public long getDurationTime() { - return durationTime; - } - - - @Override - public void setTaskStatus(boolean status) { - this.status=status; - } - - @Override - public boolean getTaskSatus() { - return status; - } - - @Override - public void setTaskStatusForRunning(TaskStatusForRunning taskStatusForRunning) { - this.taskStatusForRunning=taskStatusForRunning; - } - - @Override - public TaskStatusForRunning getTaskStatusForRunning() { - return taskStatusForRunning; - } - - @Override - public TaskMetric getTaskMetric() { - return taskMetric; - } - - @Override - public void setTaskMetric(TaskMetric taskMetric) { - this.taskMetric=taskMetric; - - } - - -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskGettingResult.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskGettingResult.java deleted file mode 100644 index 2318e2c0..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskGettingResult.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Task; -/** - * Represents the getting results of a task execution. - * - * <p>Implementing the {@link Task} interface, this class provides methods to set and get the - * attributes of a task, such as its ID, host IP, launch and finish times, and status. It also - * implements the {@link SerializableObject} interface to allow serialization of the class. - * - * <p>The class also includes a {@link TaskMetric} object to store metrics related to the task, - * and a {@link TaskStatusForRunning} object to provide information about the task's status during - * execution. - * - * @author [Adeel Aslam] - */ -public class TaskGettingResult implements Task, SerializableObject { - private String id; - private String hostIP; - private long taskId; - private String executorID; - private String taskStatus; - private int Index; - private int partition; - private long launchTime; - private long durationTime; - private long finishTime; - private long gettingResultTime; - private boolean status; - private int stageID; - - private String eventName; - - TaskStatusForRunning taskStatusForRunning=null; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setID(String id) { - this.id=id; - } - - @Override - public String getID() { - return id; - } - @Override - public void setHostIP(String Ip) { - this.hostIP=Ip; - } - - @Override - public String getHostIP() { - return hostIP; - } - - @Override - public void setTaskID(long taskId) { - this.taskId=taskId; - } - - @Override - public void setStageID(int id) { - this.stageID=id; - } - - @Override - public int getStageID() { - return stageID; - } - - @Override - public long getTaskID() { - return taskId; - } - - @Override - public void setStringExecutorID(String executorID) { - this.executorID=executorID; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void setTaskStatus(String status) { - this.taskStatus=status; - } - - @Override - public String getTaskStatus() { - return taskStatus; - } - - @Override - public void setIndex(int index) { - this.Index= index; - - } - - @Override - public int getIndex() { - return Index; - } - - @Override - public void setPartition(int partition) { - this.partition=partition; - } - - @Override - public int getPartition() { - return partition; - } - - @Override - public void setLaunchTime(long time) { - this.launchTime=time; - } - - @Override - public long getLaunchTime() { - return launchTime; - } - - @Override - public void setFinishTime(long time) { - this.finishTime=time; - } - - @Override - public long getFinishTime() { - return finishTime; - } - - @Override - public void setGettingTime(long time) { - this.gettingResultTime=time; - } - - @Override - public long getGettingTime() { - return gettingResultTime; - } - - @Override - public void setDurationTime(long time) { - this.durationTime=time; - } - - @Override - public long getDurationTime() { - return durationTime; - } - - - @Override - public void setTaskStatus(boolean status) { - this.status=status; - } - - @Override - public boolean getTaskSatus() { - return status; - } - - @Override - public void setTaskStatusForRunning(TaskStatusForRunning taskStatusForRunning) { - this.taskStatusForRunning=taskStatusForRunning; - } - - @Override - public TaskStatusForRunning getTaskStatusForRunning() { - return taskStatusForRunning; - } - - @Override - public TaskMetric getTaskMetric() { - return null; - } - - @Override - public void setTaskMetric(TaskMetric taskMetric) { - - } - - -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskMetric.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskMetric.java deleted file mode 100644 index 5ce05314..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskMetric.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; - -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -/** - - This class represents the metrics for a task in the Apache Wayang monitoring system. - - It implements the TaskMetric interface and SerializableObject interface to allow for serialization. - */ -public class TaskMetric implements org.apache.wayang.spark.monitoring.interfaces.TaskMetric, SerializableObject { - public long getExecutorCPUTime() { - return executorCPUTime; - } - - public void setExecutorCPUTime(long executorCPUTime) { - this.executorCPUTime = executorCPUTime; - } - - private long executorCPUTime; - private long bytesRead; - private long executorDeserializeCpuTime; - private long executorDeserializeTime; - private long DiskByteSpilled; - private long time; - private long JVMGCTime; - private long peakExecutionMemory; - private long resultSize; - private long resultSerializationTime; - private long recordsWritten; - private long bytesWritten; - @Override - public void setBytesRead(long bytesRead) { - this.bytesRead=bytesRead; - } - - @Override - public long getByteRead() { - return bytesRead; - } - - @Override - public void setExecutorDeserializeCpuTime(long executorDeserializeCpuTime) { - this.executorDeserializeCpuTime=executorDeserializeCpuTime; - } - - @Override - public long getExecutorDeserializeCpuTime() { - return executorDeserializeCpuTime; - } - - @Override - public void setExecutorDeserializeTime(long executorDeserializeTime) { - this.executorDeserializeTime=executorDeserializeTime; - } - - @Override - public long getExecutorDeserializeTime() { - return executorDeserializeTime; - } - - @Override - public void setDiskBytesSpilled(long DiskByteSpilled) { - this. DiskByteSpilled=DiskByteSpilled; - } - - @Override - public long getDiskBytesSpilled() { - return DiskByteSpilled; - } - - @Override - public void setExecutorRunTime(long time) { - this.time=time; - } - - @Override - public long getexecutorRunTime() { - return time; - } - - @Override - public void setjvmGCTime(long time) { - this.JVMGCTime=time; - } - - @Override - public long getJVMGCTime() { - return JVMGCTime; - } - - @Override - public void setPeakExecutionMemory(long peakExecutionMemory) { - this. peakExecutionMemory=peakExecutionMemory; - } - - @Override - public long getPeakExecutionMemory() { - return peakExecutionMemory; - } - - @Override - public void setResultSize(long resultSize) { - this. resultSize=resultSize; - } - - @Override - public long getResultSize() { - return resultSize; - } - - @Override - public void setResultSerializationTime(long resultSerializationTime) { - this. resultSerializationTime=resultSerializationTime; - } - - @Override - public long getResultSerializationTime() { - return resultSerializationTime; - } - - @Override - public void setRecordsWritten(long recordsWritten) { - this.recordsWritten= recordsWritten; - } - - @Override - public long getRecordsWrittern() { - return recordsWritten; - } - - @Override - public void setBytesWritten(long bytesWritten) { - this.bytesWritten=bytesWritten; - } - - @Override - public long getBytesWrittern() { - return bytesWritten; - } -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskStart.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskStart.java deleted file mode 100644 index f4907cc0..00000000 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskStart.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.spark.monitoring.metrics; -import org.apache.wayang.spark.monitoring.interfaces.SerializableObject; -import org.apache.wayang.spark.monitoring.interfaces.Task; -/** - * Represents the start status of a task execution. - * - * <p>Implementing the {@link Task} interface, this class provides methods to set and get the - * attributes of a task, such as its ID, host IP, launch and finish times, and status. It also - * implements the {@link SerializableObject} interface to allow serialization of the class. - * - * <p>The class also includes a {@link TaskMetric} object to store metrics related to the task, - * and a {@link TaskStatusForRunning} object to provide information about the task's status during - * execution. - * - * @author [Adeel Aslam] - */ -public class TaskStart implements Task, SerializableObject { - private String id; - private String hostIP; - private long taskId; - private String executorID; - private String taskStatus; - private int Index; - private int partition; - private long launchTime; - private long durationTime; - private long finishTime; - private long gettingResultTime; - private boolean status; - private int stageID; - - private String eventName; - - TaskStatusForRunning taskStatusForRunning=null; - - @Override - public void setEventame(String name) { - this.eventName=name; - } - - @Override - public String getEventName() { - return eventName; - } - - - @Override - public void setID(String id) { - this.id=id; - } - - @Override - public String getID() { - return id; - } - @Override - public void setHostIP(String Ip) { - this.hostIP=Ip; - } - - @Override - public String getHostIP() { - return hostIP; - } - - @Override - public void setTaskID(long taskId) { - this.taskId=taskId; - } - - @Override - public void setStageID(int id) { - this.stageID=id; - } - - @Override - public int getStageID() { - return stageID; - } - - @Override - public long getTaskID() { - return taskId; - } - - @Override - public void setStringExecutorID(String executorID) { - this.executorID=executorID; - } - - @Override - public String getExecutorID() { - return executorID; - } - - @Override - public void setTaskStatus(String status) { - this.taskStatus=status; - } - - @Override - public String getTaskStatus() { - return taskStatus; - } - - @Override - public void setIndex(int index) { - this.Index= index; - - } - - @Override - public int getIndex() { - return Index; - } - - @Override - public void setPartition(int partition) { - this.partition=partition; - } - - @Override - public int getPartition() { - return partition; - } - - @Override - public void setLaunchTime(long time) { - this.launchTime=time; - } - - @Override - public long getLaunchTime() { - return launchTime; - } - - @Override - public void setFinishTime(long time) { - this.finishTime=time; - } - - @Override - public long getFinishTime() { - return finishTime; - } - - @Override - public void setGettingTime(long time) { - this.gettingResultTime=time; - } - - @Override - public long getGettingTime() { - return gettingResultTime; - } - - @Override - public void setDurationTime(long time) { - this.durationTime=time; - } - - @Override - public long getDurationTime() { - return durationTime; - } - - - @Override - public void setTaskStatus(boolean status) { - this.status=status; - } - - @Override - public boolean getTaskSatus() { - return status; - } - - @Override - public void setTaskStatusForRunning(TaskStatusForRunning taskStatusForRunning) { - this.taskStatusForRunning=taskStatusForRunning; - } - - @Override - public TaskStatusForRunning getTaskStatusForRunning() { - return taskStatusForRunning; - } - - @Override - public TaskMetric getTaskMetric() { - return null; - } - - @Override - public void setTaskMetric(TaskMetric taskMetric) { - - } - - -} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java index c2141ee0..471275ab 100644 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java @@ -33,7 +33,6 @@ import org.apache.wayang.core.platform.Platform; import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.core.util.Formats; import org.apache.wayang.core.util.ReflectionUtils; -import org.apache.wayang.spark.monitoring.metrics.SparkListener; import org.apache.wayang.spark.execution.SparkContextReference; import org.apache.wayang.spark.execution.SparkExecutor; import org.apache.wayang.spark.operators.SparkCollectionSource; @@ -54,7 +53,6 @@ public class SparkPlatform extends Platform { private static final String CONFIG_NAME = "spark"; private static final String DEFAULT_CONFIG_FILE = "wayang-spark-defaults.properties"; - private static final String MONITORING_METRICS_FILE="wayang-spark-metrics.properties"; public static final String INITIALIZATION_MS_CONFIG_KEY = "wayang.spark.init.ms"; @@ -85,8 +83,12 @@ public class SparkPlatform extends Platform { "spark.io.compression.codec", "spark.driver.memory", "spark.executor.heartbeatInterval", - "spark.network.timeout" - // "spark.extraListeners" + "spark.network.timeout", + }; + + private static final String[] OPTIONAL_HADOOP_PROPERTIES = { + "fs.s3.awsAccessKeyId", + "fs.s3.awsSecretAccessKey" }; /** @@ -114,12 +116,10 @@ public class SparkPlatform extends Platform { * @return a {@link SparkContextReference} wrapping the {@link JavaSparkContext} */ public SparkContextReference getSparkContext(Job job) { - //System.out.println("In Job with "+job.isMontiorWithHackIT()); - //System.exit(0); + // NB: There must be only one JavaSparkContext per JVM. Therefore, it is not local to the executor. final SparkConf sparkConf; final Configuration configuration = job.getConfiguration(); - if (this.sparkContextReference != null && !this.sparkContextReference.isDisposed()) { final JavaSparkContext sparkContext = this.sparkContextReference.get(); this.logger.warn( @@ -127,8 +127,6 @@ public class SparkPlatform extends Platform { "Not all settings might be effective.", sparkContext.getConf().get("spark.master")); sparkConf = sparkContext.getConf(); - - } else { sparkConf = new SparkConf(true); } @@ -145,17 +143,22 @@ public class SparkPlatform extends Platform { if (job.getName() != null) { sparkConf.set("spark.app.name", job.getName()); } - // sparkConf.set("spark.extraListeners","org.apache.wayang.monitoring.spark.SparkListener"); + if (this.sparkContextReference == null || this.sparkContextReference.isDisposed()) { this.sparkContextReference = new SparkContextReference(job.getCrossPlatformExecutor(), new JavaSparkContext(sparkConf)); } final JavaSparkContext sparkContext = this.sparkContextReference.get(); - //SparkContext sc= sparkContext.sc(); - if(job.isMontiorWithHackIT()) { - sparkConf.set("spark.extraListeners","org.apache.wayang.spark.monitoring.spark_monitoring.SparkListener"); - sparkContext.sc().addSparkListener(new SparkListener()); + org.apache.hadoop.conf.Configuration hadoopconf = sparkContext.hadoopConfiguration(); + for (String property: OPTIONAL_HADOOP_PROPERTIES){ + System.out.println(property); + configuration.getOptionalStringProperty(property).ifPresent( + value -> hadoopconf.set(property, value) + ); } + + // Set up the JAR files. + //sparkContext.clearJars(); if (!sparkContext.isLocal()) { // Add Wayang JAR files. this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(SparkPlatform.class)); // wayang-spark diff --git a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-kafka.properties b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-kafka.properties deleted file mode 100644 index 7c66a686..00000000 --- a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-kafka.properties +++ /dev/null @@ -1,42 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Kafka producer configuration properties - -# Set the broker address and port number(s) -bootstrap.servers = localhost:9092 - -# Set the number of acknowledgements the producer requires -acks = all - -# Set the number of times to retry sending a message -retries=0 - -# Set the number of bytes to include in each batch of messages -batch.size=16384 - -# Set the amount of time (in milliseconds) the producer will wait before sending a batch of messages -linger.ms=1 - -# Set the amount of memory (in bytes) the producer can use to buffer messages -buffer.memory=33554432 - -# Set the serializer classes for key and value -key.serializer=org.apache.kafka.common.serialization.StringSerializer -value.serializer=org.apache.kafka.common.serialization.StringSerializer - -# Set the Kafka topic to produce messages to -kafka.topic=Topic diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml index 4a3f8077..1d2205ff 100644 --- a/wayang-platforms/wayang-spark/pom.xml +++ b/wayang-platforms/wayang-spark/pom.xml @@ -19,9 +19,9 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <packaging>pom</packaging> + <packaging>pom</packaging> - <parent> + <parent> <artifactId>wayang-platforms</artifactId> <groupId>org.apache.wayang</groupId> <version>0.6.1-SNAPSHOT</version> @@ -66,33 +66,38 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>3.2.3</version> + <version>3.2.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> - <version>2.7.7</version> + <version>3.1.2</version> </dependency> <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>3.4.0</version> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.12.261</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>3.1.2</version> </dependency> </dependencies> - <modules> - <module>wayang-spark_2.12</module> - </modules> + <modules> + <module>wayang-spark_2.12</module> + </modules> - <profiles> - <profile> - <id>java8</id> - <activation> - <jdk>1.8</jdk> - </activation> - <modules> - <module>wayang-spark_2.11</module> - </modules> - </profile> - </profiles> + <profiles> + <profile> + <id>java8</id> + <activation> + <jdk>1.8</jdk> + </activation> + <modules> + <module>wayang-spark_2.11</module> + </modules> + </profile> + </profiles> </project>
