[ https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17132743#comment-17132743 ]
Everett Rush edited comment on SPARK-27249 at 6/10/20, 9:52 PM: ---------------------------------------------------------------- I checked out the code and the design. Thanks for the great effort, but this doesn't quite meet the need. The transformer contract can still be met with a transform function that returns a Dataframe. I would like something more like this. {code:java} @DeveloperApi abstract class MultiColumnTransformer[T<: MultiColumnTransformer[T]] extends Transformer with HasOutputCol with Logging { def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T] protected def outputDataType: DataType protected def transformFunc: Iterator[Row] => Iterator[Row] override def transformSchema(schema: StructType): StructType = { if (schema.fieldNames.contains($(outputCol))) { throw new IllegalArgumentException(s"Output column ${$(outputCol)} already exists.") } val outputFields = schema.fields :+ StructField($(outputCol), outputDataType, nullable = false) StructType(outputFields) } def transform(dataset: DataFrame, targetSchema: StructType): DataFrame = { val targetEncoder = RowEncoder(targetSchema) dataset.mapPartitions(transformFunc)(targetEncoder) } override def transform(dataset: Dataset[_]): DataFrame = { val dataframe = dataset.toDF() val targetSchema = transformSchema(dataframe.schema, logging = true) transform(dataframe, targetSchema) } override def copy(extra: ParamMap): T = defaultCopy(extra) } {code} was (Author: enrush): I checked out the code and the design. Thanks for the great effort, but this doesn't quite meet the need. The transformer contract can still be met with a transform function that returns a Dataframe. I would like something more like this. {{@DeveloperApi}} {{abstract class MultiColumnTransformer[T<: MultiColumnTransformer[T]]}} {{ extends Transformer with HasOutputCol with Logging {}} {{ def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T]}} {{ /** Returns the data type of the output column. */}} {{ protected def outputDataType: DataType}} {{ protected def *transformFunc*: Iterator[Row] => Iterator[Row]}} {{ override def *transformSchema*(schema: StructType): StructType = {}} {{ }}} {{ def *transform*(dataset: DataFrame, targetSchema: StructType): DataFrame = {}} {{ val targetEncoder = RowEncoder(targetSchema)}} {{ dataset.mapPartitions(transformFunc)(targetEncoder)}} {{ }}} {{ override def *transform*(dataset: Dataset[_]): DataFrame = {}} {{ val dataframe = dataset.toDF()}} {{ val targetSchema = transformSchema(dataframe.schema, logging = true)}} {{ transform(dataframe, targetSchema)}} {{ }}} {{ override def copy(extra: ParamMap): T = defaultCopy(extra)}} {{}}} > Developers API for Transformers beyond UnaryTransformer > ------------------------------------------------------- > > Key: SPARK-27249 > URL: https://issues.apache.org/jira/browse/SPARK-27249 > Project: Spark > Issue Type: New Feature > Components: ML > Affects Versions: 3.1.0 > Reporter: Everett Rush > Priority: Minor > Labels: starter > Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png > > Original Estimate: 96h > Remaining Estimate: 96h > > It would be nice to have a developers' API for dataset transformations that > need more than one column from a row (ie UnaryTransformer inputs one column > and outputs one column) or that contain objects too expensive to initialize > repeatedly in a UDF such as a database connection. > > Design: > Abstract class PartitionTransformer extends Transformer and defines the > partition transformation function as Iterator[Row] => Iterator[Row] > NB: This parallels the UnaryTransformer createTransformFunc method > > When developers subclass this transformer, they can provide their own schema > for the output Row in which case the PartitionTransformer creates a row > encoder and executes the transformation. Alternatively the developer can set > output Datatype and output col name. Then the PartitionTransformer class will > create a new schema, a row encoder, and execute the transformation. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org