This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c477ee6bd3c7 [SPARK-54275][SQL] Clean up  unused code from `pipeline` 
module
c477ee6bd3c7 is described below

commit c477ee6bd3c74e96f2ef85dcf0ab273099b99979
Author: yangjie01 <[email protected]>
AuthorDate: Mon Nov 10 09:58:51 2025 -0800

    [SPARK-54275][SQL] Clean up  unused code from `pipeline` module
    
    ### What changes were proposed in this pull request?
    This pr aims to Clean up the unused code from the pipeline module before 
the release of Apache Spark 4.1.0.
    
    ### Why are the changes needed?
    Code clean up.
    
    ### Does this PR introduce _any_ user-facing change?
    No, this is a new module added in the 4.1 cycle.
    
    ### How was this patch tested?
    Pass Github Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52972 from LuciferYang/pipeline-clean.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../pipelines/graph/DataflowGraphTransformer.scala |  8 ----
 .../spark/sql/pipelines/graph/GraphErrors.scala    | 19 --------
 .../spark/sql/pipelines/logging/EventHelpers.scala | 52 ----------------------
 .../spark/sql/pipelines/utils/PipelineTest.scala   |  5 ---
 4 files changed, 84 deletions(-)

diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
index 4121591c46b2..c80978a5957d 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
@@ -109,14 +109,6 @@ class DataflowGraphTransformer(graph: DataflowGraph) 
extends AutoCloseable {
     this
   }
 
-  private def defaultOnFailedDependentTables(
-      failedTableDependencies: Map[TableIdentifier, Seq[Table]]): Unit = {
-    require(
-      failedTableDependencies.isEmpty,
-      "Dependency failure happened and some tables were not resolved"
-    )
-  }
-
   /**
    * Example graph: [Flow1, Flow 2] -> ST -> Flow3 -> MV
    * Order of processing: Flow1, Flow2, ST, Flow3, MV.
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
index 53db669e687d..c835665a0f38 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
@@ -38,25 +38,6 @@ object GraphErrors {
     )
   }
 
-  /**
-   * Throws when the catalog or schema name in the "USE CATALOG | SCHEMA" 
command is invalid
-   *
-   * @param command string "USE CATALOG" or "USE SCHEMA"
-   * @param name the invalid catalog or schema name
-   * @param reason the reason why the name is invalid
-   */
-  def invalidNameInUseCommandError(
-      command: String,
-      name: String,
-      reason: String
-  ): SparkException = {
-    new SparkException(
-      errorClass = "INVALID_NAME_IN_USE_COMMAND",
-      messageParameters = Map("command" -> command, "name" -> name, "reason" 
-> reason),
-      cause = null
-    )
-  }
-
   /**
    * Throws when a table path is unresolved, i.e. the table identifier
    * does not exist in the catalog.
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/EventHelpers.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/EventHelpers.scala
deleted file mode 100644
index 37f8dc67c385..000000000000
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/EventHelpers.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pipelines.logging
-
-import java.sql.Timestamp
-import java.time.{Instant, ZoneId}
-import java.time.format.DateTimeFormatter
-
-/** Contains helpers and implicits for working with [[PipelineEvent]]s. */
-object EventHelpers {
-
-  /** A format string that defines how timestamps are serialized in a 
[[PipelineEvent]]. */
-  private val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
-  // Currently only the UTC timezone is supported. Eventually we want to allow 
the user to specify
-  // the timezone as a pipeline level setting using the SESSION_LOCAL_TIMEZONE 
key, and it should
-  // not be possible to change this setting during a pipeline run.
-  private val zoneId: ZoneId = ZoneId.of("UTC")
-
-  private val formatter: DateTimeFormatter = DateTimeFormatter
-    .ofPattern(timestampFormat)
-    .withZone(zoneId)
-
-  /** Converts a timestamp to a string in ISO 8601 format. */
-  def formatTimestamp(ts: Timestamp): String = {
-    val instant = Instant.ofEpochMilli(ts.getTime)
-    formatter.format(instant)
-  }
-
-  /** Converts an ISO 8601 formatted timestamp to a {@link 
java.sql.Timestamp}.  */
-  def parseTimestamp(timeString: String): Timestamp = {
-    if (timeString.isEmpty) {
-      new Timestamp(0L)
-    } else {
-      val instant = Instant.from(formatter.parse(timeString))
-      new Timestamp(instant.toEpochMilli)
-    }
-  }
-}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
index f9d6aba9e22d..fd97570bc528 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
@@ -231,11 +231,6 @@ abstract class PipelineTest
       ignoreFieldCase: Boolean = false
   )
 
-  /** Holds a parsed version along with the original json of a test. */
-  private case class TestSequence(json: Seq[String], rows: Seq[Row]) {
-    require(json.size == rows.size)
-  }
-
   /**
    * Helper method to verify unresolved column error message. We expect three 
elements to be present
    * in the message: error class, unresolved column name, list of suggested 
columns.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to