Oh, I think I might have a clue as to what is going on. I notice that it will 
work properly when I only call it on Long. I think that it is using the same 
generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, I 
wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:

    As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

    Dylan

    On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:

        Timo,

        I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

        Here is the exact code that caused that error (while calling 
LatestNonNullLong):

        The registration of the below:
            env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
            env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


        The class itself:

        import java.time.LocalDate
        import java.util.Optional
        import org.apache.flink.table.api.DataTypes
        import org.apache.flink.table.catalog.DataTypeFactory
        import org.apache.flink.table.functions.AggregateFunction
        import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

        case class LatestNonNullAccumulator[T](
            var value: T = null.asInstanceOf[T],
            var date: LocalDate = null)

        class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

          override def createAccumulator(): LatestNonNullAccumulator[T] = {
            LatestNonNullAccumulator[T]()
          }

          override def getValue(acc: LatestNonNullAccumulator[T]): T = {
            acc.value
          }

          def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
            if (value != null) {
              Option(acc.date).fold {
                acc.value = value
                acc.date = date
              } { accDate =>
                if (date != null && date.isAfter(accDate)) {
                  acc.value = value
                  acc.date = date
                }
              }
            }
          }

          def merge(
              acc: LatestNonNullAccumulator[T],
              it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
            val iter = it.iterator()
            while (iter.hasNext) {
              val a = iter.next()
              if (a.value != null) {
                Option(acc.date).fold {
                  acc.value = a.value
                  acc.date = a.date
                } { accDate =>
                  Option(a.date).map { curDate =>
                    if (curDate.isAfter(accDate)) {
                      acc.value = a.value
                      acc.date = a.date
                    }
                  }
                }
              }
            }
          }

          def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
            acc.value = null.asInstanceOf[T]
            acc.date = null
          }

          override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
            TypeInference
              .newBuilder()
              .inputTypeStrategy(InputTypeStrategies
                .sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE())))
              .accumulatorTypeStrategy { callContext =>
                val accDataType = DataTypes.STRUCTURED(
                  classOf[LatestNonNullAccumulator[T]],
                  DataTypes.FIELD("value", 
callContext.getArgumentDataTypes.get(0)),
                  DataTypes.FIELD("date", DataTypes.DATE()))

                Optional.of(accDataType)
              }
              .outputTypeStrategy { callContext =>
                val outputDataType = callContext.getArgumentDataTypes().get(0);
                Optional.of(outputDataType);
              }
              .build()
          }
        }

        Regards,
        Dylan Forciea

        On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org> wrote:

            Hi Dylan,

            I'm assuming your are using Flink 1.12 and the Blink planner?

            Beginning from 1.12 you can use the "new" aggregate functions with 
a 
            better type inference. So TypeInformation will not be used in this 
stack.

            I tried to come up with an example that should explain the rough 
design. 
            I will include this example into the Flink code base. I hope this 
helps:



            import org.apache.flink.table.types.inference.InputTypeStrategies;

            public static class LastIfNotNull<T>
                     extends AggregateFunction<Row, 
LastIfNotNull.Accumulator<T>> {

                 public static class Accumulator<T> {
                     public T value;
                     public LocalDate date;
                 }

                 public void accumulate(Accumulator<T> acc, T input, LocalDate 
date) {
                     if (input != null) {
                         acc.value = input;
                         acc.date = date;
                     }
                 }

                 @Override
                 public Row getValue(Accumulator<T> acc) {
                     return Row.of(acc.value, acc.date);
                 }

                 @Override
                 public Accumulator<T> createAccumulator() {
                     return new Accumulator<>();
                 }

                 @Override
                 public TypeInference getTypeInference(DataTypeFactory 
typeFactory) {
                     return TypeInference.newBuilder()
                             .inputTypeStrategy(
                                     InputTypeStrategies.sequence(
                                             InputTypeStrategies.ANY,

            InputTypeStrategies.explicit(DataTypes.DATE())))
                             .accumulatorTypeStrategy(
                                     callContext -> {
                                         DataType accDataType =
                                                 DataTypes.STRUCTURED(
                                                         Accumulator.class,
                                                         DataTypes.FIELD(
                                                                 "value",

            callContext.getArgumentDataTypes().get(0)),
                                                         
DataTypes.FIELD("date", 
            DataTypes.DATE()));
                                         return Optional.of(accDataType);
                                     })
                             .outputTypeStrategy(
                                     callContext -> {
                                         DataType argDataType = 
            callContext.getArgumentDataTypes().get(0);
                                         DataType outputDataType =
                                                 DataTypes.ROW(
                                                         
DataTypes.FIELD("value", 
            argDataType),
                                                         
DataTypes.FIELD("date", 
            DataTypes.DATE()));
                                         return Optional.of(outputDataType);
                                     })
                             .build();
                 }
            }

            Regards,
            Timo



            On 20.01.21 01:04, Dylan Forciea wrote:
            > I am attempting to create an aggregate UDF that takes a generic 
            > parameter T, but for the life of me, I can’t seem to get it to 
work.
            > 
            > The UDF I’m trying to implement takes two input arguments, a 
value that 
            > is generic, and a date. It will choose the non-null value with 
the 
            > latest associated date. I had originally done this with separate 
Top 1 
            > queries connected with a left join, but the memory usage seems 
far 
            > higher than doing this with a custom aggregate function.
            > 
            > As a first attempt, I tried to use custom type inference to have 
it 
            > validate that the first argument type is the output type and have 
a 
            > single function, and also used DataTypes.STRUCTURE to try to 
define the 
            > shape of my accumulator. However, that resulted in an exception 
like 
            > this whenever I tried to use a non-string value as the first 
argument:
            > 
            > [error] Caused by: java.lang.ClassCastException: java.lang.Long 
cannot 
            > be cast to java.lang.String
            > 
            > [error]   at 
            > 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
            > Source)
            > 
            > [error]   at 
            > 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
            > 
            > [error]   at 
            > 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
            > 
            > [error]   at 
            > 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
            > 
            > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
            > 
            > [error]   at 
            > 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
            > 
            > [error]   at 
            > 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
            > 
            > [error]   at 
            > 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
            > 
            > [error]   at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
            > 
            > [error]   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
            > 
            > [error]   at java.lang.Thread.run(Thread.java:748)
            > 
            > Figuring that I can’t do something of that sort, I tried to 
follow the 
            > general approach in the Sum accumulator[1] in the Flink source 
code 
            > where separate classes are derived from a base class, and each 
            > advertises its accumulator shape, but ended up with the exact 
same stack 
            > trace as above when I tried to create and use a function 
specifically 
            > for a non-string type like Long.
            > 
            > Is there something I’m missing as far as how this is supposed to 
be 
            > done? Everything I try either results in a stack track like the 
above, 
            > or type erasure issues when trying to get type information for 
the 
            > accumulator. If I just copy the generic code multiple times and 
just 
            > directly use Long or String rather than using subclassing, then 
it works 
            > just fine. I appreciate any help I can get on this!
            > 
            > Regards,
            > 
            > Dylan Forciea
            > 
            > [1] 
            > 
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
            > 




Reply via email to