[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-06-10 Thread Everett Rush (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132743#comment-17132743
 ] 

Everett Rush edited comment on SPARK-27249 at 6/10/20, 9:52 PM:


I checked out the code and the design. Thanks for the great effort, but this 
doesn't quite meet the need. The transformer contract can still be met with a 
transform function that returns a Dataframe. 

I would like something more like this.

 

{code:java}
@DeveloperApi
abstract class MultiColumnTransformer[T<: MultiColumnTransformer[T]]
  extends Transformer with HasOutputCol with Logging {

  def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T]

  protected def outputDataType: DataType


  protected def transformFunc: Iterator[Row] => Iterator[Row]


  override def transformSchema(schema: StructType): StructType = {
if (schema.fieldNames.contains($(outputCol))) {
  throw new IllegalArgumentException(s"Output column ${$(outputCol)} 
already exists.")
}
val outputFields = schema.fields :+
  StructField($(outputCol), outputDataType, nullable = false)
StructType(outputFields)
  }

  def transform(dataset: DataFrame, targetSchema: StructType): DataFrame = {
val targetEncoder = RowEncoder(targetSchema)
dataset.mapPartitions(transformFunc)(targetEncoder)
  }

  override def transform(dataset: Dataset[_]): DataFrame = {
val dataframe = dataset.toDF()
val targetSchema = transformSchema(dataframe.schema, logging = true)
transform(dataframe, targetSchema)
  }

  override def copy(extra: ParamMap): T = defaultCopy(extra)
}
{code}



was (Author: enrush):
I checked out the code and the design. Thanks for the great effort, but this 
doesn't quite meet the need. The transformer contract can still be met with a 
transform function that returns a Dataframe. 

I would like something more like this.

 

{{@DeveloperApi}}
{{abstract class MultiColumnTransformer[T<: MultiColumnTransformer[T]]}}
{{ extends Transformer with HasOutputCol with Logging {}}


{{ def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T]}}

{{ /** Returns the data type of the output column. */}}
{{ protected def outputDataType: DataType}}

{{ protected def *transformFunc*: Iterator[Row] => Iterator[Row]}}


{{ override def *transformSchema*(schema: StructType): StructType = {}}
{{ }}}

{{ def *transform*(dataset: DataFrame, targetSchema: StructType): DataFrame = 
{}}

{{ val targetEncoder = RowEncoder(targetSchema)}}
{{ dataset.mapPartitions(transformFunc)(targetEncoder)}}
{{ }}}

{{ override def *transform*(dataset: Dataset[_]): DataFrame = {}}
{{ val dataframe = dataset.toDF()}}
{{ val targetSchema = transformSchema(dataframe.schema, logging = true)}}
{{ transform(dataframe, targetSchema)}}
{{ }}}

{{ override def copy(extra: ParamMap): T = defaultCopy(extra)}}
{{}}}

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-04-17 Thread Nick Afshartous (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086029#comment-17086029
 ] 

Nick Afshartous edited comment on SPARK-27249 at 4/17/20, 7:54 PM:
---

[~enrush] Hi Everett,

Using the {{Iterator}} approach would seem to deviate from the existing 
{{Transformer}} contract.  More specifically, the {{transform}} function should 
output a new {{DataFrame}} object.  What I would propose is adding a new 
function {{Transformer.compose}} to allow the composition of {{Transformers}}.  

{code}
  def compose(other: Transformer): Transformer = {
new Transformer {
  override def transform(dataset: Dataset[_]): DataFrame = {
other.transform(this.transform(dataset));
  }
...
{code}

Then one could {{compose}} {{Transformers}} which effectively would enable 
multi-column transformations.  

{code}
 val dataFrame = ...
 val transformers = List(transformer1, transformer2, transformer3)
 val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y))

 multiColumnTransformer.transform(dataFrame)
{code}

I'd be happy to submit a PR if this meets your requirments.


was (Author: nafshartous):
[~enrush] Hi Everett,

Using the {{Iterator}} approach would seem to deviate from the existing 
{{Transformer}} contract.  More specifically, the {{transform}} function should 
output a new {{DataFrame}} object.  What I would propose is adding a new 
function {{Transformer.compose}} to allow the composition of {{Transformers}}.  

{code}
  def compose(other: Transformer): Transformer = {
new Transformer {
  override def transform(dataset: Dataset[_]): DataFrame = {
other.transform(this.transform(dataset));
  }
...
{code}

Then one could {{compose}} {{Transformers}} which effectively would enable 
multi-column transformations.  

{code}
 val dataFrame = ...
 val transformers = List(transformer1, transformer2, transformer3)
 val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y))

multiColumnTransformer.transform(dataFrame)
{code}

I'd be happy to submit a PR if this meets your requirments.

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-01-17 Thread Nick Afshartous (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018329#comment-17018329
 ] 

Nick Afshartous edited comment on SPARK-27249 at 1/17/20 9:29 PM:
--

[~enrush] Hi Everett, 
The {{Dataset}} API has an experimental function {{mapPartitions}} for 
transforming {{Datasets}}.  Does this satisfy your requirements ?  

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset


was (Author: nafshartous):
[~enrush] Hi Everett, 
The {{Dataset}} API has an experimental function {{mapPartitions}} for 
transforming {{Dataset}}s.  Does this satisfy your requirements ?  

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.0.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-01-17 Thread Nick Afshartous (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018329#comment-17018329
 ] 

Nick Afshartous edited comment on SPARK-27249 at 1/17/20 9:28 PM:
--

[~enrush] Hi Everett, 
The {{Dataset}} API has an experimental function {{mapPartitions}} for 
transforming {{Dataset}}s.  Does this satisfy your requirements ?  

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset


was (Author: nafshartous):
[~enrush] Hi Everett, 
The {{Dataset}} API has an experimental function {{mapPartitions}} for 
transforming {{Dataset}} .  Does this satisfy your requirements ?  

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.0.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-01-09 Thread Nick Afshartous (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010756#comment-17010756
 ] 

Nick Afshartous edited comment on SPARK-27249 at 1/9/20 7:49 PM:
-

I could try and look into this.  Could someone validate that this feature is 
still needed ?  

[~enrush] It would also be helpful if you could provide a code example 
illustrating how the {{PartitionTransformer}} would be used.


was (Author: nafshartous):
I could try and look into this.  Could someone validate that this feature is 
still needed ?  

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.0.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org