Ah, I missed that bit of documentation  my bad :). That totally explains
the behavior!

Thanks a lot!

-------
Regards,
Andy

On Sat, Jan 7, 2017 at 10:11 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Hi Andy,
>
> Thanks for sharing the code snippet.
>
> I am not sure if you miss something in the snippet, because some function
> signature are not matched, e.g.,
>
>     @Override
>     public StructType bufferSchema() {
>         return new UserDefineType(schema, unboundedEncoder);
>     }
>
>
> Maybe you define a class UserDefineType which extends StructType.
>
> Anyway, I noticed that in this line:
>
>         data.add(unboundedEncoder.toRow(input));
>
> If you read the comment of "toRow", you will find it says:
>
> Note that multiple calls to toRow are allowed to return the same actual
> [[InternalRow]] object.  Thus, the caller should copy the result before
> making another call if required.
>
> I think it is why you get a list of the same entries.
>
> So you may need to change it to:
>
>         data.add(unboundedEncoder.toRow(input).copy());
>
>
>
> Andy Dang wrote
> > Hi Liang-Chi,
> >
> > The snippet of code is below. If I bind the encoder early (the schema
> > doesn't change throughout the execution), the final result is a list of
> > the
> > same entries.
> >
> > @RequiredArgsConstructor
> > public class UDAF extends UserDefinedAggregateFunction {
> >
> >     // Do not resolve and bind this expression encoder eagerly
> >     private final ExpressionEncoder
> > <Row>
> >  unboundedEncoder;
> >     private final StructType schema;
> >
> >     @Override
> >     public StructType inputSchema() {
> >         return schema;
> >     }
> >
> >     @Override
> >     public StructType bufferSchema() {
> >         return new UserDefineType(schema, unboundedEncoder);
> >     }
> >
> >     @Override
> >     public DataType dataType() {
> >         return DataTypes.createArrayType(schema);
> >     }
> >
> >     @Override
> >     public void initialize(MutableAggregationBuffer buffer) {
> >         buffer.update(0, new InternalRow[0]);
> >     }
> >
> >     @Override
> >     public void update(MutableAggregationBuffer buffer, Row input) {
> >         UserDefineType data = buffer.getAs(0);
> >
> >         data.add(unboundedEncoder.toRow(input));
> >
> >         buffer.update(0, data);
> >     }
> >
> >     @Override
> >     public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
> >         // merge
> >         buffer1.update(0, data1);
> >     }
> >
> >     @Override
> >     public Object evaluate(Row buffer) {
> >         UserDefineType data = buffer.getAs(0);
> >
> >        // need to return Row here instead of Internal Row
> >         return data.rows();
> >     }
> >
> >     static ExpressionEncoder
> > <Row>
> >  resolveAndBind(ExpressionEncoder
> > <Row>
> > encoder) {
> >         val attributes =
> > JavaConversions.asJavaCollection(encoder.schema().toAttributes()).
> stream().map(Attribute::toAttribute).collect(Collectors.toList());
> >         return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
> > SimpleAnalyzer$.MODULE$);
> >     }
> > }
> >
> > // Wrap around a list of InternalRow
> > class TopKDataType extends UserDefinedType
> > <TopKDataType>
> >  {
> >     private final ExpressionEncoder
> > <Row>
> >  unboundedEncoder;
> >     private final List
> > <InternalRow>
> >  data;
> >
> >    public Row[] rows() {
> >         val encoder = resolveAndBind(this.unboundedEncoder);
> >
> >         return data.stream().map(encoder::fromRow).toArray(Row[]::new);
> >     }
> > }
> >
> > -------
> > Regards,
> > Andy
> >
> > On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh &lt;
>
> > viirya@
>
> > &gt; wrote:
> >
> >>
> >> Can you show how you use the encoder in your UDAF?
> >>
> >>
> >> Andy Dang wrote
> >> > One more question about the behavior of ExpressionEncoder
> >> >
> > <Row>
> >> > .
> >> >
> >> > I have a UDAF that has ExpressionEncoder
> >> >
> > <Row>
> >> >  as a member variable.
> >> >
> >> > However, if call resolveAndBind() eagerly on this encoder, it appears
> >> to
> >> > break the UDAF. Bascially somehow the deserialized row are all the
> same
> >> > during the merge step. Is it the expected behavior of Encoders?
> >> >
> >> > -------
> >> > Regards,
> >> > Andy
> >> >
> >> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang &lt;
> >>
> >> > namd88@
> >>
> >> > &gt; wrote:
> >> >
> >> >> Perfect. The API in Java is bit clumsy though
> >> >>
> >> >> What I ended up doing in Java (the val is from lombok, if anyone's
> >> >> wondering):
> >> >>         val attributes = JavaConversions.asJavaCollection(schema.
> >> >> toAttributes()).stream().map(Attribute::toAttribute).
> >> >> collect(Collectors.toList());
> >> >>         val encoder =
> >> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils.
> >> scalaSeq(attributes),
> >> >> SimpleAnalyzer$.MODULE$);
> >> >>
> >> >>
> >> >> -------
> >> >> Regards,
> >> >> Andy
> >> >>
> >> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh &lt;
> >>
> >> > viirya@
> >>
> >> > &gt; wrote:
> >> >>
> >> >>>
> >> >>> You need to resolve and bind the encoder.
> >> >>>
> >> >>> ExpressionEncoder
> >> >
> > <Row>
> >> >  enconder = RowEncoder.apply(struct).resol
> >> >>> veAndBind();
> >> >>>
> >> >>>
> >> >>> Andy Dang wrote
> >> >>> > Hi all,
> >> >>> > (cc-ing dev since I've hit some developer API corner)
> >> >>> >
> >> >>> > What's the best way to convert an InternalRow to a Row if I've got
> >> an
> >> >>> > InternalRow and the corresponding Schema.
> >> >>> >
> >> >>> > Code snippet:
> >> >>> >     @Test
> >> >>> >     public void foo() throws Exception {
> >> >>> >         Row row = RowFactory.create(1);
> >> >>> >         StructType struct = new StructType().add("id",
> >> >>> > DataTypes.IntegerType);
> >> >>> >         ExpressionEncoder
> >> >>> >
> >> >
> > <Row>
> >> >>> >  enconder = RowEncoder.apply(struct);
> >> >>> >         InternalRow internalRow = enconder.toRow(row);
> >> >>> >         System.out.println("Internal row size: " +
> >> >>> > internalRow.numFields());
> >> >>> >         Row roundTrip = enconder.fromRow(internalRow);
> >> >>> >         System.out.println("Round trip: " + roundTrip.size());
> >> >>> >     }
> >> >>> >
> >> >>> > The code fails at the line encoder.fromRow() with the exception:
> >> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot
> >> evaluate
> >> >>> > expression: getcolumnbyordinal(0, IntegerType)
> >> >>> >
> >> >>> > -------
> >> >>> > Regards,
> >> >>> > Andy
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> -----
> >> >>> Liang-Chi Hsieh | @viirya
> >> >>> Spark Technology Center
> >> >>> http://www.spark.tc/
> >> >>> --
> >> >>> View this message in context: http://apache-spark-developers
> >> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to-
> >> >>> a-Row-tp20460p20465.html
> >> >>> Sent from the Apache Spark Developers List mailing list archive at
> >> >>> Nabble.com.
> >> >>>
> >> >>> ------------------------------------------------------------
> ---------
> >> >>> To unsubscribe e-mail:
> >>
> >> > dev-unsubscribe@.apache
> >>
> >> >>>
> >> >>>
> >> >>
> >>
> >>
> >>
> >>
> >>
> >> -----
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >> --
> >> View this message in context: http://apache-spark-
> >> developers-list.1001551.n3.nabble.com/Converting-an-
> InternalRow-to-a-Row-
> >> tp20460p20487.html
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-
> tp20460p20506.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to