[ 
https://issues.apache.org/jira/browse/PHOENIX-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17955595#comment-17955595
 ] 

ASF GitHub Bot commented on PHOENIX-7407:
-----------------------------------------

rejeb commented on code in PR #145:
URL: 
https://github.com/apache/phoenix-connectors/pull/145#discussion_r2120282719


##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala:
##########
@@ -14,66 +14,37 @@
 package org.apache.phoenix.spark
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
-import org.apache.phoenix.mapreduce.PhoenixOutputFormat
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.util.SchemaUtil
-import org.apache.spark.sql.DataFrame
-
-import scala.collection.JavaConversions._
+import org.apache.phoenix.spark.PhoenixDataFrameHelper.phoenixConfig
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.spark.sql.{DataFrame, SaveMode}
 
 @deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class DataFrameFunctions(data: DataFrame) extends Serializable {
   def saveToPhoenix(parameters: Map[String, String]): Unit = {
-               saveToPhoenix(parameters("table"), zkUrl = 
parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
-               
skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
-   }
-  def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
-                    zkUrl: Option[String] = None, tenantId: Option[String] = 
None, skipNormalizingIdentifier: Boolean = false): Unit = {
-
-    // Retrieve the schema field names and normalize to Phoenix, need to do 
this outside of mapPartitions
-    val fieldArray = getFieldArray(skipNormalizingIdentifier, data)
-    
-
-    // Create a configuration object to use for saving
-    @transient val outConfig = 
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, 
tenantId, Some(conf))
-
-    // Retrieve the zookeeper URL
-    val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
-
-    // Map the row objects into PhoenixRecordWritable
-    val phxRDD = data.rdd.mapPartitions{ rows =>
- 
-       // Create a within-partition config to retrieve the ColumnInfo list
-       @transient val partitionConfig = 
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, 
tenantId)
-       @transient val columns = 
PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
- 
-       rows.map { row =>
-         val rec = new PhoenixRecordWritable(columns)
-         row.toSeq.foreach { e => rec.add(e) }
-         (null, rec)
-       }
-    }
-
-    // Save it
-    phxRDD.saveAsNewAPIHadoopFile(
-      Option(
-        conf.get("mapreduce.output.fileoutputformat.outputdir")
-      ).getOrElse(
-        Option(conf.get("mapred.output.dir")).getOrElse("")
-      ),
-      classOf[NullWritable],
-      classOf[PhoenixRecordWritable],
-      classOf[PhoenixOutputFormat[PhoenixRecordWritable]],
-      outConfig
+    saveToPhoenix(
+      tableName = parameters("table"),
+      zkUrl = parameters.get("zkUrl"),
+      tenantId = parameters.get("TenantId"),
+      skipNormalizingIdentifier = 
parameters.contains("skipNormalizingIdentifier")
     )
   }
 
-  def getFieldArray(skipNormalizingIdentifier: Boolean = false, data: 
DataFrame) = {
-    if (skipNormalizingIdentifier) {
-      data.schema.fieldNames.map(x => x)
-    } else {
-      data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
-    }
+  def saveToPhoenix(tableName: String,
+                    conf: Configuration = new Configuration,
+                    zkUrl: Option[String] = None,
+                    tenantId: Option[String] = None,
+                    skipNormalizingIdentifier: Boolean = false): Unit = {
+    data
+      .write
+      .format("phoenix")
+      .mode(SaveMode.Overwrite)

Review Comment:
   This was in spark2 connector, the Append mode is available in spark3. 
   Would you like to add Append mode support for spark2 ? 
   If it is the case, I think it is better to do it in a seperate ticket.





> Remove deprecated datasource V1 code from spark2 and spark3 connector
> ---------------------------------------------------------------------
>
>                 Key: PHOENIX-7407
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7407
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: rejeb ben rejeb
>            Assignee: rejeb ben rejeb
>            Priority: Major
>
> The pupose of this jira is to remove deprecated datasource V1 code. It is 
> safe to remove these classes since they are used internally by spark and not 
> referenced directly in applications code.
> But in order to not impact old applications, all V1 interfaces (utlity 
> methods and the source type "org.apache.phoenix.spark") will be kept and code 
> will modified to use new connector version classes.
> As dfiscussed on dev mailing list, one acceptable side effect is that spark3 
> SameMode will accept both "Append" and "Overwrite" values. However behavior 
> will be the same.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to