This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b87d5f7750a [SPARK-40551][SQL] DataSource V2: Add APIs for delta-based row-level operations b87d5f7750a is described below commit b87d5f7750a533acb45b2b75474cdde5dc7d92a0 Author: Anton Okolnychyi <aokolnyc...@apple.com> AuthorDate: Thu Oct 13 11:50:20 2022 -0700 [SPARK-40551][SQL] DataSource V2: Add APIs for delta-based row-level operations ### What changes were proposed in this pull request? This PR adds DS v2 APIs for handling row-level operations for data sources that support deltas of rows. ### Why are the changes needed? These changes are part of the approved SPIP in SPARK-35801. ### Does this PR introduce _any_ user-facing change? Yes, this PR adds new DS v2 APIs per [design doc](https://docs.google.com/document/d/12Ywmc47j3l2WF4anG5vL4qlrhT2OKigb7_EbIKhxg60). ### How was this patch tested? Tests will be part of the implementation PR. Closes #38004 from aokolnychyi/spark-40551. Lead-authored-by: Anton Okolnychyi <aokolnyc...@apple.com> Co-authored-by: aokolnychyi <aokolnyc...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/connector/write/DeltaBatchWrite.java} | 19 ++++--- .../spark/sql/connector/write/DeltaWrite.java} | 21 +++++--- .../sql/connector/write/DeltaWriteBuilder.java} | 21 +++++--- .../spark/sql/connector/write/DeltaWriter.java | 63 ++++++++++++++++++++++ .../sql/connector/write/DeltaWriterFactory.java} | 22 +++++--- .../sql/connector/write/LogicalWriteInfo.java | 18 +++++++ .../spark/sql/connector/write/SupportsDelta.java} | 26 ++++++--- .../sql/connector/write/LogicalWriteInfoImpl.scala | 7 ++- 8 files changed, 161 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaBatchWrite.java similarity index 69% copy from sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaBatchWrite.java index b1492e42981..86c48b85dcd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaBatchWrite.java @@ -15,12 +15,17 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.write +package org.apache.spark.sql.connector.write; -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.annotation.Experimental; -private[sql] case class LogicalWriteInfoImpl( - queryId: String, - schema: StructType, - options: CaseInsensitiveStringMap) extends LogicalWriteInfo +/** + * An interface that defines how to write a delta of rows during batch processing. + * + * @since 3.4.0 + */ +@Experimental +public interface DeltaBatchWrite extends BatchWrite { + @Override + DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWrite.java similarity index 65% copy from sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWrite.java index b1492e42981..eb230598ef4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWrite.java @@ -15,12 +15,19 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.write +package org.apache.spark.sql.connector.write; -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.annotation.Experimental; -private[sql] case class LogicalWriteInfoImpl( - queryId: String, - schema: StructType, - options: CaseInsensitiveStringMap) extends LogicalWriteInfo +/** + * A logical representation of a data source write that handles a delta of rows. + * + * @since 3.4.0 + */ +@Experimental +public interface DeltaWrite extends Write { + @Override + default DeltaBatchWrite toBatch() { + throw new UnsupportedOperationException(description() + ": Delta batch write is not supported"); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriteBuilder.java similarity index 67% copy from sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriteBuilder.java index b1492e42981..dde3214170f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriteBuilder.java @@ -15,12 +15,19 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.write +package org.apache.spark.sql.connector.write; -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.annotation.Experimental; -private[sql] case class LogicalWriteInfoImpl( - queryId: String, - schema: StructType, - options: CaseInsensitiveStringMap) extends LogicalWriteInfo +/** + * An interface for building a {@link DeltaWrite}. + * + * @since 3.4.0 + */ +@Experimental +public interface DeltaWriteBuilder extends WriteBuilder { + @Override + default DeltaWrite build() { + throw new UnsupportedOperationException(getClass().getName() + " does not implement build"); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriter.java new file mode 100644 index 00000000000..0cc6cb48801 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write; + +import java.io.IOException; + +import org.apache.spark.annotation.Experimental; + +/** + * A data writer returned by {@link DeltaWriterFactory#createWriter(int, long)} and is + * responsible for writing a delta of rows. + * + * @since 3.4.0 + */ +@Experimental +public interface DeltaWriter<T> extends DataWriter<T> { + /** + * Deletes a row. + * + * @param metadata values for metadata columns that were projected but are not part of the row ID + * @param id a row ID to delete + * @throws IOException if failure happens during disk/network IO like writing files + */ + void delete(T metadata, T id) throws IOException; + + /** + * Updates a row. + * + * @param metadata values for metadata columns that were projected but are not part of the row ID + * @param id a row ID to update + * @param row a row with updated values + * @throws IOException if failure happens during disk/network IO like writing files + */ + void update(T metadata, T id, T row) throws IOException; + + /** + * Inserts a new row. + * + * @param row a row to insert + * @throws IOException if failure happens during disk/network IO like writing files + */ + void insert(T row) throws IOException; + + @Override + default void write(T row) throws IOException { + insert(row); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriterFactory.java similarity index 59% copy from sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriterFactory.java index b1492e42981..0f9c1f91833 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriterFactory.java @@ -15,12 +15,20 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.write +package org.apache.spark.sql.connector.write; -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.InternalRow; -private[sql] case class LogicalWriteInfoImpl( - queryId: String, - schema: StructType, - options: CaseInsensitiveStringMap) extends LogicalWriteInfo +/** + * A factory for creating {@link DeltaWriter}s returned by + * {@link DeltaBatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which is responsible for + * creating and initializing writers at the executor side. + * + * @since 3.4.0 + */ +@Experimental +public interface DeltaWriterFactory extends DataWriterFactory { + @Override + DeltaWriter<InternalRow> createWriter(int partitionId, long taskId); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java index e472a130187..bdf1bb3b9c0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.connector.write; +import java.util.Optional; + import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -45,4 +47,20 @@ public interface LogicalWriteInfo { * the schema of the input data from Spark to data source. */ StructType schema(); + + /** + * the schema of the ID columns from Spark to data source. + */ + default Optional<StructType> rowIdSchema() { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement rowIdSchema"); + } + + /** + * the schema of the input metadata from Spark to data source. + */ + default Optional<StructType> metadataSchema() { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement metadataSchema"); + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsDelta.java similarity index 57% copy from sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsDelta.java index b1492e42981..6315b65f610 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsDelta.java @@ -15,12 +15,24 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.write +package org.apache.spark.sql.connector.write; -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; -private[sql] case class LogicalWriteInfoImpl( - queryId: String, - schema: StructType, - options: CaseInsensitiveStringMap) extends LogicalWriteInfo +/** + * A mix-in interface for {@link RowLevelOperation}. Data sources can implement this interface + * to indicate they support handling deltas of rows. + * + * @since 3.4.0 + */ +@Experimental +public interface SupportsDelta extends RowLevelOperation { + @Override + DeltaWriteBuilder newWriteBuilder(LogicalWriteInfo info); + + /** + * Returns the row ID column references that should be used for row equality. + */ + NamedReference[] rowId(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala index b1492e42981..8c0828d8a27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala @@ -17,10 +17,15 @@ package org.apache.spark.sql.connector.write +import java.util.Optional + import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap private[sql] case class LogicalWriteInfoImpl( queryId: String, schema: StructType, - options: CaseInsensitiveStringMap) extends LogicalWriteInfo + options: CaseInsensitiveStringMap, + override val rowIdSchema: Optional[StructType] = Optional.empty[StructType], + override val metadataSchema: Optional[StructType] = Optional.empty[StructType]) + extends LogicalWriteInfo --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org