Hi Liang-Chi,
The snippet of code is below. If I bind the encoder early (the schema
doesn't change throughout the execution), the final result is a list of the
same entries.
@RequiredArgsConstructor
public class UDAF extends UserDefinedAggregateFunction {
// Do not resolve and bind this expression encoder eagerly
private final ExpressionEncoder<Row> unboundedEncoder;
private final StructType schema;
@Override
public StructType inputSchema() {
return schema;
}
@Override
public StructType bufferSchema() {
return new UserDefineType(schema, unboundedEncoder);
}
@Override
public DataType dataType() {
return DataTypes.createArrayType(schema);
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, new InternalRow[0]);
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
UserDefineType data = buffer.getAs(0);
data.add(unboundedEncoder.toRow(input));
buffer.update(0, data);
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
// merge
buffer1.update(0, data1);
}
@Override
public Object evaluate(Row buffer) {
UserDefineType data = buffer.getAs(0);
// need to return Row here instead of Internal Row
return data.rows();
}
static ExpressionEncoder<Row> resolveAndBind(ExpressionEncoder<Row>
encoder) {
val attributes =
JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
SimpleAnalyzer$.MODULE$);
}
}
// Wrap around a list of InternalRow
class TopKDataType extends UserDefinedType<TopKDataType> {
private final ExpressionEncoder<Row> unboundedEncoder;
private final List<InternalRow> data;
public Row[] rows() {
val encoder = resolveAndBind(this.unboundedEncoder);
return data.stream().map(encoder::fromRow).toArray(Row[]::new);
}
}
-------
Regards,
Andy
On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh <[email protected]> wrote:
>
> Can you show how you use the encoder in your UDAF?
>
>
> Andy Dang wrote
> > One more question about the behavior of ExpressionEncoder
> > <Row>
> > .
> >
> > I have a UDAF that has ExpressionEncoder
> > <Row>
> > as a member variable.
> >
> > However, if call resolveAndBind() eagerly on this encoder, it appears to
> > break the UDAF. Bascially somehow the deserialized row are all the same
> > during the merge step. Is it the expected behavior of Encoders?
> >
> > -------
> > Regards,
> > Andy
> >
> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang <
>
> > namd88@
>
> > > wrote:
> >
> >> Perfect. The API in Java is bit clumsy though
> >>
> >> What I ended up doing in Java (the val is from lombok, if anyone's
> >> wondering):
> >> val attributes = JavaConversions.asJavaCollection(schema.
> >> toAttributes()).stream().map(Attribute::toAttribute).
> >> collect(Collectors.toList());
> >> val encoder =
> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils.
> scalaSeq(attributes),
> >> SimpleAnalyzer$.MODULE$);
> >>
> >>
> >> -------
> >> Regards,
> >> Andy
> >>
> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh <
>
> > viirya@
>
> > > wrote:
> >>
> >>>
> >>> You need to resolve and bind the encoder.
> >>>
> >>> ExpressionEncoder
> > <Row>
> > enconder = RowEncoder.apply(struct).resol
> >>> veAndBind();
> >>>
> >>>
> >>> Andy Dang wrote
> >>> > Hi all,
> >>> > (cc-ing dev since I've hit some developer API corner)
> >>> >
> >>> > What's the best way to convert an InternalRow to a Row if I've got an
> >>> > InternalRow and the corresponding Schema.
> >>> >
> >>> > Code snippet:
> >>> > @Test
> >>> > public void foo() throws Exception {
> >>> > Row row = RowFactory.create(1);
> >>> > StructType struct = new StructType().add("id",
> >>> > DataTypes.IntegerType);
> >>> > ExpressionEncoder
> >>> >
> > <Row>
> >>> > enconder = RowEncoder.apply(struct);
> >>> > InternalRow internalRow = enconder.toRow(row);
> >>> > System.out.println("Internal row size: " +
> >>> > internalRow.numFields());
> >>> > Row roundTrip = enconder.fromRow(internalRow);
> >>> > System.out.println("Round trip: " + roundTrip.size());
> >>> > }
> >>> >
> >>> > The code fails at the line encoder.fromRow() with the exception:
> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> >>> > expression: getcolumnbyordinal(0, IntegerType)
> >>> >
> >>> > -------
> >>> > Regards,
> >>> > Andy
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> -----
> >>> Liang-Chi Hsieh | @viirya
> >>> Spark Technology Center
> >>> http://www.spark.tc/
> >>> --
> >>> View this message in context: http://apache-spark-developers
> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to-
> >>> a-Row-tp20460p20465.html
> >>> Sent from the Apache Spark Developers List mailing list archive at
> >>> Nabble.com.
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe e-mail:
>
> > [email protected]
>
> >>>
> >>>
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-
> tp20460p20487.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [email protected]
>
>