viirya commented on code in PR #44119:
URL: https://github.com/apache/spark/pull/44119#discussion_r1424921268


##########
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala:
##########
@@ -167,6 +173,229 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
     runCommand(overwrite)
   }
 
+  /**
+   * Specifies the merge condition.
+   *
+   * Sets the condition, provided as a `String`, to be used for merging data. 
This condition
+   * is converted internally to a `Column` and used to determine how rows from 
the source
+   * DataFrame are matched with rows in the target table.
+   *
+   * @param condition a `String` representing the merge condition.
+   * @return the current `DataFrameWriterV2` instance with the specified merge 
condition set.
+   */
+  def on(condition: String): DataFrameWriterV2[T] = {
+    on(Column(condition))
+  }
+
+  /**
+   * Specifies the merge condition.
+   *
+   * Sets the condition to be used for merging data. This condition is used to 
determine
+   * how rows from the source DataFrame are matched with rows in the target 
table.
+   *
+   * @param condition a `Column` representing the merge condition.
+   * @return the current `DataFrameWriterV2` instance with the specified merge 
condition set.
+   */
+  def on(condition: Column): DataFrameWriterV2[T] = {
+    this.on = Some(condition)
+    this
+  }
+
+  /**
+   * Initialize a `WhenMatched` object without any condition.
+   *
+   * This `WhenMatched` can be followed by one of the following merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @return a new `WhenMatched` object.
+   */
+  def whenMatched(): WhenMatched[T] = {
+    new WhenMatched[T](this, None)
+  }
+
+  /**
+   * Initialize a `WhenMatched` object with a condition.
+   *
+   * This `WhenMatched` action will be executed if and only if the specified 
`condition`
+   * is satisfied.
+   *
+   * This `WhenMatched` can be followed by one of the following merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenMatched` object configured with the specified 
condition.
+   */
+  def whenMatched(condition: Column): WhenMatched[T] = {
+    new WhenMatched[T](this, Some(condition.expr))
+  }
+
+  /**
+   * Initialize a `WhenMatched` object with a specified condition.
+   *
+   * This `WhenMatched` action will be executed if and only if the given 
`condition`
+   * is satisfied. The condition is represented as a `String` and internally 
converted
+   * to a `Column`.
+   *
+   * The `WhenMatched` instance can perform one of the following merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `String` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenMatched` object configured with the specified 
condition.
+   */
+  def whenMatched(condition: String): WhenMatched[T] = {
+    whenMatched(Column(condition))
+  }
+
+  /**
+   * Initialize a `WhenNotMatched` object without any condition.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @return a new `WhenNotMatched` object.
+   */
+  def whenNotMatched(): WhenNotMatched[T] = {
+    new WhenNotMatched[T](this, None)
+  }
+
+  /**
+   * Initialize a `WhenNotMatched` object with a condition.
+   *
+   * This `WhenNotMatched` action will be executed if and only if the 
specified `condition`
+   * is satisfied.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatched` object configured with the specified 
condition.
+   */
+  def whenNotMatched(condition: Column): WhenNotMatched[T] = {
+    new WhenNotMatched[T](this, Some(condition.expr))
+  }
+
+  /**
+   * Initialize a `WhenNotMatched` object with a condition.
+   *
+   * This `WhenNotMatched` action will be executed if and only if the 
specified `condition`
+   * is satisfied. The condition is represented as a `String` and internally 
converted
+   * to a `Column`.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `String` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatched` object configured with the specified 
condition.
+   */
+  def whenNotMatched(condition: String): WhenNotMatched[T] = {
+    whenNotMatched(Column(condition))
+  }
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` object without any condition.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @return a new `WhenNotMatchedBySource` object.
+   */
+  def whenNotMatchedBySource(): WhenNotMatchedBySource[T] = {
+    new WhenNotMatchedBySource[T](this, None)
+  }
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` object with a condition.
+   *
+   * This `WhenNotMatchedBySource` action will be executed if and only if the 
specified `condition`
+   * is satisfied.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatchedBySource` object configured with the 
specified condition.
+   */
+  def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T] = {
+    new WhenNotMatchedBySource[T](this, Some(condition.expr))
+  }
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` object with a condition.
+   *
+   * This `WhenNotMatchedBySource` action will be executed if and only if the 
specified `condition`
+   * is satisfied. The condition is represented as a `String` and internally 
converted
+   * to a `Column`.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `String` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatchedBySource` object configured with the 
specified condition.
+   */
+  def whenNotMatchedBySource(condition: String): WhenNotMatchedBySource[T] = {
+    whenNotMatchedBySource(Column(condition))
+  }
+
+  /**
+   * Executes the merge operation.
+   */
+  def merge(): Unit = {
+    val merge = MergeIntoTable(

Review Comment:
   Do we need to check if `on` is specified (i.e., `Some`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to