Hi Andy,
Thanks for sharing the code snippet.
I am not sure if you miss something in the snippet, because some function
signature are not matched, e.g.,
@Override
public StructType bufferSchema() {
return new UserDefineType(schema, unboundedEncoder);
}
Maybe you define a class UserDefineType which extends StructType.
Anyway, I noticed that in this line:
data.add(unboundedEncoder.toRow(input));
If you read the comment of "toRow", you will find it says:
Note that multiple calls to toRow are allowed to return the same actual
[[InternalRow]] object. Thus, the caller should copy the result before
making another call if required.
I think it is why you get a list of the same entries.
So you may need to change it to:
data.add(unboundedEncoder.toRow(input).copy());
Andy Dang wrote
> Hi Liang-Chi,
>
> The snippet of code is below. If I bind the encoder early (the schema
> doesn't change throughout the execution), the final result is a list of
> the
> same entries.
>
> @RequiredArgsConstructor
> public class UDAF extends UserDefinedAggregateFunction {
>
> // Do not resolve and bind this expression encoder eagerly
> private final ExpressionEncoder
> <Row>
> unboundedEncoder;
> private final StructType schema;
>
> @Override
> public StructType inputSchema() {
> return schema;
> }
>
> @Override
> public StructType bufferSchema() {
> return new UserDefineType(schema, unboundedEncoder);
> }
>
> @Override
> public DataType dataType() {
> return DataTypes.createArrayType(schema);
> }
>
> @Override
> public void initialize(MutableAggregationBuffer buffer) {
> buffer.update(0, new InternalRow[0]);
> }
>
> @Override
> public void update(MutableAggregationBuffer buffer, Row input) {
> UserDefineType data = buffer.getAs(0);
>
> data.add(unboundedEncoder.toRow(input));
>
> buffer.update(0, data);
> }
>
> @Override
> public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
> // merge
> buffer1.update(0, data1);
> }
>
> @Override
> public Object evaluate(Row buffer) {
> UserDefineType data = buffer.getAs(0);
>
> // need to return Row here instead of Internal Row
> return data.rows();
> }
>
> static ExpressionEncoder
> <Row>
> resolveAndBind(ExpressionEncoder
> <Row>
> encoder) {
> val attributes =
> JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
> return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
> SimpleAnalyzer$.MODULE$);
> }
> }
>
> // Wrap around a list of InternalRow
> class TopKDataType extends UserDefinedType
> <TopKDataType>
> {
> private final ExpressionEncoder
> <Row>
> unboundedEncoder;
> private final List
> <InternalRow>
> data;
>
> public Row[] rows() {
> val encoder = resolveAndBind(this.unboundedEncoder);
>
> return data.stream().map(encoder::fromRow).toArray(Row[]::new);
> }
> }
>
> -------
> Regards,
> Andy
>
> On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh <
> viirya@
> > wrote:
>
>>
>> Can you show how you use the encoder in your UDAF?
>>
>>
>> Andy Dang wrote
>> > One more question about the behavior of ExpressionEncoder
>> >
> <Row>
>> > .
>> >
>> > I have a UDAF that has ExpressionEncoder
>> >
> <Row>
>> > as a member variable.
>> >
>> > However, if call resolveAndBind() eagerly on this encoder, it appears
>> to
>> > break the UDAF. Bascially somehow the deserialized row are all the same
>> > during the merge step. Is it the expected behavior of Encoders?
>> >
>> > -------
>> > Regards,
>> > Andy
>> >
>> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang <
>>
>> > namd88@
>>
>> > > wrote:
>> >
>> >> Perfect. The API in Java is bit clumsy though
>> >>
>> >> What I ended up doing in Java (the val is from lombok, if anyone's
>> >> wondering):
>> >> val attributes = JavaConversions.asJavaCollection(schema.
>> >> toAttributes()).stream().map(Attribute::toAttribute).
>> >> collect(Collectors.toList());
>> >> val encoder =
>> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils.
>> scalaSeq(attributes),
>> >> SimpleAnalyzer$.MODULE$);
>> >>
>> >>
>> >> -------
>> >> Regards,
>> >> Andy
>> >>
>> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh <
>>
>> > viirya@
>>
>> > > wrote:
>> >>
>> >>>
>> >>> You need to resolve and bind the encoder.
>> >>>
>> >>> ExpressionEncoder
>> >
> <Row>
>> > enconder = RowEncoder.apply(struct).resol
>> >>> veAndBind();
>> >>>
>> >>>
>> >>> Andy Dang wrote
>> >>> > Hi all,
>> >>> > (cc-ing dev since I've hit some developer API corner)
>> >>> >
>> >>> > What's the best way to convert an InternalRow to a Row if I've got
>> an
>> >>> > InternalRow and the corresponding Schema.
>> >>> >
>> >>> > Code snippet:
>> >>> > @Test
>> >>> > public void foo() throws Exception {
>> >>> > Row row = RowFactory.create(1);
>> >>> > StructType struct = new StructType().add("id",
>> >>> > DataTypes.IntegerType);
>> >>> > ExpressionEncoder
>> >>> >
>> >
> <Row>
>> >>> > enconder = RowEncoder.apply(struct);
>> >>> > InternalRow internalRow = enconder.toRow(row);
>> >>> > System.out.println("Internal row size: " +
>> >>> > internalRow.numFields());
>> >>> > Row roundTrip = enconder.fromRow(internalRow);
>> >>> > System.out.println("Round trip: " + roundTrip.size());
>> >>> > }
>> >>> >
>> >>> > The code fails at the line encoder.fromRow() with the exception:
>> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot
>> evaluate
>> >>> > expression: getcolumnbyordinal(0, IntegerType)
>> >>> >
>> >>> > -------
>> >>> > Regards,
>> >>> > Andy
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> -----
>> >>> Liang-Chi Hsieh | @viirya
>> >>> Spark Technology Center
>> >>> http://www.spark.tc/
>> >>> --
>> >>> View this message in context: http://apache-spark-developers
>> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to-
>> >>> a-Row-tp20460p20465.html
>> >>> Sent from the Apache Spark Developers List mailing list archive at
>> >>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe e-mail:
>>
>> > [email protected]
>>
>> >>>
>> >>>
>> >>
>>
>>
>>
>>
>>
>> -----
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> --
>> View this message in context: http://apache-spark-
>> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-
>> tp20460p20487.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail:
> [email protected]
>>
>>
-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-tp20460p20506.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]