[
https://issues.apache.org/jira/browse/PHOENIX-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17955595#comment-17955595
]
ASF GitHub Bot commented on PHOENIX-7407:
-----------------------------------------
rejeb commented on code in PR #145:
URL:
https://github.com/apache/phoenix-connectors/pull/145#discussion_r2120282719
##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala:
##########
@@ -14,66 +14,37 @@
package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
-import org.apache.phoenix.mapreduce.PhoenixOutputFormat
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.util.SchemaUtil
-import org.apache.spark.sql.DataFrame
-
-import scala.collection.JavaConversions._
+import org.apache.phoenix.spark.PhoenixDataFrameHelper.phoenixConfig
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.spark.sql.{DataFrame, SaveMode}
@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class DataFrameFunctions(data: DataFrame) extends Serializable {
def saveToPhoenix(parameters: Map[String, String]): Unit = {
- saveToPhoenix(parameters("table"), zkUrl =
parameters.get("zkUrl"), tenantId = parameters.get("TenantId"),
-
skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
- }
- def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
- zkUrl: Option[String] = None, tenantId: Option[String] =
None, skipNormalizingIdentifier: Boolean = false): Unit = {
-
- // Retrieve the schema field names and normalize to Phoenix, need to do
this outside of mapPartitions
- val fieldArray = getFieldArray(skipNormalizingIdentifier, data)
-
-
- // Create a configuration object to use for saving
- @transient val outConfig =
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl,
tenantId, Some(conf))
-
- // Retrieve the zookeeper URL
- val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
-
- // Map the row objects into PhoenixRecordWritable
- val phxRDD = data.rdd.mapPartitions{ rows =>
-
- // Create a within-partition config to retrieve the ColumnInfo list
- @transient val partitionConfig =
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal,
tenantId)
- @transient val columns =
PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
-
- rows.map { row =>
- val rec = new PhoenixRecordWritable(columns)
- row.toSeq.foreach { e => rec.add(e) }
- (null, rec)
- }
- }
-
- // Save it
- phxRDD.saveAsNewAPIHadoopFile(
- Option(
- conf.get("mapreduce.output.fileoutputformat.outputdir")
- ).getOrElse(
- Option(conf.get("mapred.output.dir")).getOrElse("")
- ),
- classOf[NullWritable],
- classOf[PhoenixRecordWritable],
- classOf[PhoenixOutputFormat[PhoenixRecordWritable]],
- outConfig
+ saveToPhoenix(
+ tableName = parameters("table"),
+ zkUrl = parameters.get("zkUrl"),
+ tenantId = parameters.get("TenantId"),
+ skipNormalizingIdentifier =
parameters.contains("skipNormalizingIdentifier")
)
}
- def getFieldArray(skipNormalizingIdentifier: Boolean = false, data:
DataFrame) = {
- if (skipNormalizingIdentifier) {
- data.schema.fieldNames.map(x => x)
- } else {
- data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
- }
+ def saveToPhoenix(tableName: String,
+ conf: Configuration = new Configuration,
+ zkUrl: Option[String] = None,
+ tenantId: Option[String] = None,
+ skipNormalizingIdentifier: Boolean = false): Unit = {
+ data
+ .write
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
Review Comment:
This was in spark2 connector, the Append mode is available in spark3.
Would you like to add Append mode support for spark2 ?
If it is the case, I think it is better to do it in a seperate ticket.
> Remove deprecated datasource V1 code from spark2 and spark3 connector
> ---------------------------------------------------------------------
>
> Key: PHOENIX-7407
> URL: https://issues.apache.org/jira/browse/PHOENIX-7407
> Project: Phoenix
> Issue Type: Improvement
> Reporter: rejeb ben rejeb
> Assignee: rejeb ben rejeb
> Priority: Major
>
> The pupose of this jira is to remove deprecated datasource V1 code. It is
> safe to remove these classes since they are used internally by spark and not
> referenced directly in applications code.
> But in order to not impact old applications, all V1 interfaces (utlity
> methods and the source type "org.apache.phoenix.spark") will be kept and code
> will modified to use new connector version classes.
> As dfiscussed on dev mailing list, one acceptable side effect is that spark3
> SameMode will accept both "Append" and "Overwrite" values. However behavior
> will be the same.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)