This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c477ee6bd3c7 [SPARK-54275][SQL] Clean up unused code from `pipeline`
module
c477ee6bd3c7 is described below
commit c477ee6bd3c74e96f2ef85dcf0ab273099b99979
Author: yangjie01 <[email protected]>
AuthorDate: Mon Nov 10 09:58:51 2025 -0800
[SPARK-54275][SQL] Clean up unused code from `pipeline` module
### What changes were proposed in this pull request?
This pr aims to Clean up the unused code from the pipeline module before
the release of Apache Spark 4.1.0.
### Why are the changes needed?
Code clean up.
### Does this PR introduce _any_ user-facing change?
No, this is a new module added in the 4.1 cycle.
### How was this patch tested?
Pass Github Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52972 from LuciferYang/pipeline-clean.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../pipelines/graph/DataflowGraphTransformer.scala | 8 ----
.../spark/sql/pipelines/graph/GraphErrors.scala | 19 --------
.../spark/sql/pipelines/logging/EventHelpers.scala | 52 ----------------------
.../spark/sql/pipelines/utils/PipelineTest.scala | 5 ---
4 files changed, 84 deletions(-)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
index 4121591c46b2..c80978a5957d 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
@@ -109,14 +109,6 @@ class DataflowGraphTransformer(graph: DataflowGraph)
extends AutoCloseable {
this
}
- private def defaultOnFailedDependentTables(
- failedTableDependencies: Map[TableIdentifier, Seq[Table]]): Unit = {
- require(
- failedTableDependencies.isEmpty,
- "Dependency failure happened and some tables were not resolved"
- )
- }
-
/**
* Example graph: [Flow1, Flow 2] -> ST -> Flow3 -> MV
* Order of processing: Flow1, Flow2, ST, Flow3, MV.
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
index 53db669e687d..c835665a0f38 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala
@@ -38,25 +38,6 @@ object GraphErrors {
)
}
- /**
- * Throws when the catalog or schema name in the "USE CATALOG | SCHEMA"
command is invalid
- *
- * @param command string "USE CATALOG" or "USE SCHEMA"
- * @param name the invalid catalog or schema name
- * @param reason the reason why the name is invalid
- */
- def invalidNameInUseCommandError(
- command: String,
- name: String,
- reason: String
- ): SparkException = {
- new SparkException(
- errorClass = "INVALID_NAME_IN_USE_COMMAND",
- messageParameters = Map("command" -> command, "name" -> name, "reason"
-> reason),
- cause = null
- )
- }
-
/**
* Throws when a table path is unresolved, i.e. the table identifier
* does not exist in the catalog.
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/EventHelpers.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/EventHelpers.scala
deleted file mode 100644
index 37f8dc67c385..000000000000
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/EventHelpers.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pipelines.logging
-
-import java.sql.Timestamp
-import java.time.{Instant, ZoneId}
-import java.time.format.DateTimeFormatter
-
-/** Contains helpers and implicits for working with [[PipelineEvent]]s. */
-object EventHelpers {
-
- /** A format string that defines how timestamps are serialized in a
[[PipelineEvent]]. */
- private val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
- // Currently only the UTC timezone is supported. Eventually we want to allow
the user to specify
- // the timezone as a pipeline level setting using the SESSION_LOCAL_TIMEZONE
key, and it should
- // not be possible to change this setting during a pipeline run.
- private val zoneId: ZoneId = ZoneId.of("UTC")
-
- private val formatter: DateTimeFormatter = DateTimeFormatter
- .ofPattern(timestampFormat)
- .withZone(zoneId)
-
- /** Converts a timestamp to a string in ISO 8601 format. */
- def formatTimestamp(ts: Timestamp): String = {
- val instant = Instant.ofEpochMilli(ts.getTime)
- formatter.format(instant)
- }
-
- /** Converts an ISO 8601 formatted timestamp to a {@link
java.sql.Timestamp}. */
- def parseTimestamp(timeString: String): Timestamp = {
- if (timeString.isEmpty) {
- new Timestamp(0L)
- } else {
- val instant = Instant.from(formatter.parse(timeString))
- new Timestamp(instant.toEpochMilli)
- }
- }
-}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
index f9d6aba9e22d..fd97570bc528 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
@@ -231,11 +231,6 @@ abstract class PipelineTest
ignoreFieldCase: Boolean = false
)
- /** Holds a parsed version along with the original json of a test. */
- private case class TestSequence(json: Seq[String], rows: Seq[Row]) {
- require(json.size == rows.size)
- }
-
/**
* Helper method to verify unresolved column error message. We expect three
elements to be present
* in the message: error class, unresolved column name, list of suggested
columns.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]