Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Timo Walther

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this stack.


I tried to come up with an example that should explain the rough design. 
I will include this example into the Flink code base. I hope this helps:




import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
extends AggregateFunction> {

public static class Accumulator {
public T value;
public LocalDate date;
}

public void accumulate(Accumulator acc, T input, LocalDate date) {
if (input != null) {
acc.value = input;
acc.date = date;
}
}

@Override
public Row getValue(Accumulator acc) {
return Row.of(acc.value, acc.date);
}

@Override
public Accumulator createAccumulator() {
return new Accumulator<>();
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.inputTypeStrategy(
InputTypeStrategies.sequence(
InputTypeStrategies.ANY,

InputTypeStrategies.explicit(DataTypes.DATE(
.accumulatorTypeStrategy(
callContext -> {
DataType accDataType =
DataTypes.STRUCTURED(
Accumulator.class,
DataTypes.FIELD(
"value",

callContext.getArgumentDataTypes().get(0)),
DataTypes.FIELD("date", 
DataTypes.DATE()));

return Optional.of(accDataType);
})
.outputTypeStrategy(
callContext -> {
DataType argDataType = 
callContext.getArgumentDataTypes().get(0);

DataType outputDataType =
DataTypes.ROW(
DataTypes.FIELD("value", 
argDataType),
DataTypes.FIELD("date", 
DataTypes.DATE()));

return Optional.of(outputDataType);
})
.build();
}
}

Regards,
Timo



On 20.01.21 01:04, Dylan Forciea wrote:
I am attempting to create an aggregate UDF that takes a generic 
parameter T, but for the life of me, I can’t seem to get it to work.


The UDF I’m trying to implement takes two input arguments, a value that 
is generic, and a date. It will choose the non-null value with the 
latest associated date. I had originally done this with separate Top 1 
queries connected with a left join, but the memory usage seems far 
higher than doing this with a custom aggregate function.


As a first attempt, I tried to use custom type inference to have it 
validate that the first argument type is the output type and have a 
single function, and also used DataTypes.STRUCTURE to try to define the 
shape of my accumulator. However, that resulted in an exception like 
this whenever I tried to use a non-string value as the first argument:


[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot 
be cast to java.lang.String


[error]   at 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
Source)


[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)


[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)


[error]   at 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)


[error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)

[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)


[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)


[error]   at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)


[error]   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)


[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)


[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)


[error]   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)


[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInp

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What 
you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

Here is the exact code that caused that error (while calling LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): 
Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext =>
val outputDataType = callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
  }
  .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther"  wrote:

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this stack.

I tried to come up with an example that should explain the rough design. 
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
 extends AggregateFunction> {

 public static class Accumulator {
 public T value;
 public LocalDate date;
 }

 public void accumulate(Accumulator acc, T input, LocalDate date) {
 if (input != null) {
 acc.value = input;
 acc.date = date;
 }
 }

 @Override
 public Row getValue(Accumulator acc) {
 return Row.of(acc.value, acc.date);
 }

 @Override
 public Accumulator createAccumulator() {
 return new Accumulator<>();
 }

 @Override
 public TypeInference getTypeInference(DataTypeFactory typeFactory) {
 return TypeInference.newBuilder()
 .inputTypeStrategy(
 InputTypeStrategies.sequence(
 InputTypeStrategies.ANY,

InputTypeStrategies.explicit(DataTypes.DATE(
 .accumulatorTypeStrategy(
 callContext -> {
 DataType accDataType =

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
As a side note, I also just tried to unify into a single function registration 
and used _ as the type parameter in the classOf calls there and within the 
TypeInference definition for the accumulator and still ended up with the exact 
same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. 
What you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext =>
val outputDataType = callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
  }
  .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther"  wrote:

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this 
stack.

I tried to come up with an example that should explain the rough 
design. 
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
 extends AggregateFunction> {

 public static class Accumulator {
 public T value;
 public LocalDate date;
 }

 public void accumulate(Accumulator acc, T input, LocalDate 
date) {
 if (input != null) {
 acc.value = input;
 acc.date = date;
 }
 }

 @Override
 public Row getValu

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Oh, I think I might have a clue as to what is going on. I notice that it will 
work properly when I only call it on Long. I think that it is using the same 
generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, I 
wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", 
callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo,

I converted what I had to Java, and ended up with the exact same issue as 
before where it will work if I only ever use it on 1 type, but not if I use it 
on multiple. Maybe this is a bug?

Dylan

On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:

Oh, I think I might have a clue as to what is going on. I notice that it 
will work properly when I only call it on Long. I think that it is using the 
same generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, 
I wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrateg

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther

Hi Dylan,

thanks for the investigation. I can now also reproduce it my code. Yes, 
this is a bug. I opened


https://issues.apache.org/jira/browse/FLINK-21070

and will try to fix this asap.

Thanks,
Timo

On 20.01.21 17:52, Dylan Forciea wrote:

Timo,

I converted what I had to Java, and ended up with the exact same issue as 
before where it will work if I only ever use it on 1 type, but not if I use it 
on multiple. Maybe this is a bug?

Dylan

On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:

 Oh, I think I might have a clue as to what is going on. I notice that it 
will work properly when I only call it on Long. I think that it is using the 
same generated code for the Converter for whatever was called first.

 Since in Scala I can't declare an object as static within the class 
itself, I wonder if it won't generate appropriate Converter code per subtype. I 
tried creating a subclass that is specific to the type within my class and 
returning that as the accumulator, but that didn't help. And, I can't refer to 
that class in the TypeInference since it isn't static and I get an error from 
Flink because of that. I'm going to see if I just write this UDF in Java with 
an embedded public static class like you have if it will solve my problems. 
I'll report back to let you know what I find. If that works, I'm not quite sure 
how to make it work in Scala.

 Regards,
 Dylan Forciea

 On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

 As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

 Dylan

 On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

 Timo,

 I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output.

 Here is the exact code that caused that error (while calling 
LatestNonNullLong):

 The registration of the below:
 env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
 env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


 The class itself:

 import java.time.LocalDate
 import java.util.Optional
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.catalog.DataTypeFactory
 import org.apache.flink.table.functions.AggregateFunction
 import 
org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference}

 case class LatestNonNullAccumulator[T](
 var value: T = null.asInstanceOf[T],
 var date: LocalDate = null)

 class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

   override def createAccumulator(): LatestNonNullAccumulator[T] = {
 LatestNonNullAccumulator[T]()
   }

   override def getValue(acc: LatestNonNullAccumulator[T]): T = {
 acc.value
   }

   def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
 if (value != null) {
   Option(acc.date).fold {
 acc.value = value
 acc.date = date
   } { accDate =>
 if (date != null && date.isAfter(accDate)) {
   acc.value = value
   acc.date = date
 }
   }
 }
   }

   def merge(
   acc: LatestNonNullAccumulator[T],
   it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = 
{
 val iter = it.iterator()
 while (iter.hasNext) {
   val a = iter.next()
   if (a.value != null) {
 Option(acc.date).fold {
   acc.value = a.value
   acc.date = a.date
 } { accDate =>
   Option(a.date).map { curDate =>
 if (curDate.isAfter(accDate)) {
   acc.value = a.value
   acc.date = a.date
 }
   }
 }
   }
 }
   }

   def resetAccumulator(acc: LatestNonNull

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther

I opened a PR. Feel free to try it out.

https://github.com/apache/flink/pull/14720

Btw:

>> env.createTemporarySystemFunction("LatestNonNullLong",
>> classOf[LatestNonNull[Long]])
>>
>> env.createTemporarySystemFunction("LatestNonNullString",
>> classOf[LatestNonNull[String]])

don't make a difference. The generics will be type erased in bytecode 
and only the class name matters.


Thanks,
Timo

On 21.01.21 11:36, Timo Walther wrote:

Hi Dylan,

thanks for the investigation. I can now also reproduce it my code. Yes, 
this is a bug. I opened


https://issues.apache.org/jira/browse/FLINK-21070

and will try to fix this asap.

Thanks,
Timo

On 20.01.21 17:52, Dylan Forciea wrote:

Timo,

I converted what I had to Java, and ended up with the exact same issue 
as before where it will work if I only ever use it on 1 type, but not 
if I use it on multiple. Maybe this is a bug?


Dylan

On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:

 Oh, I think I might have a clue as to what is going on. I notice 
that it will work properly when I only call it on Long. I think that 
it is using the same generated code for the Converter for whatever was 
called first.


 Since in Scala I can't declare an object as static within the 
class itself, I wonder if it won't generate appropriate Converter code 
per subtype. I tried creating a subclass that is specific to the type 
within my class and returning that as the accumulator, but that didn't 
help. And, I can't refer to that class in the TypeInference since it 
isn't static and I get an error from Flink because of that. I'm going 
to see if I just write this UDF in Java with an embedded public static 
class like you have if it will solve my problems. I'll report back to 
let you know what I find. If that works, I'm not quite sure how to 
make it work in Scala.


 Regards,
 Dylan Forciea

 On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

 As a side note, I also just tried to unify into a single 
function registration and used _ as the type parameter in the classOf 
calls there and within the TypeInference definition for the 
accumulator and still ended up with the exact same stack trace.


 Dylan

 On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

 Timo,

 I appreciate it! I am using Flink 1.12.0 right now with 
the Blink planner. What you proposed is roughly what I had come up 
with the first time around that resulted in the stack trace with the 
ClassCastException I had originally included. I saw that you had used 
a Row instead of just the value in our example, but changing it that 
way didn't seem to help, which makes sense since the problem seems to 
be in the code generated for the accumulator Converter and not the 
output.


 Here is the exact code that caused that error (while 
calling LatestNonNullLong):


 The registration of the below:
 
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
 
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])



 The class itself:

 import java.time.LocalDate
 import java.util.Optional
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.catalog.DataTypeFactory
 import org.apache.flink.table.functions.AggregateFunction
 import 
org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}


 case class LatestNonNullAccumulator[T](
 var value: T = null.asInstanceOf[T],
 var date: LocalDate = null)

 class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {


   override def createAccumulator(): 
LatestNonNullAccumulator[T] = {

 LatestNonNullAccumulator[T]()
   }

   override def getValue(acc: 
LatestNonNullAccumulator[T]): T = {

 acc.value
   }

   def accumulate(acc: LatestNonNullAccumulator[T], value: 
T, date: LocalDate): Unit = {

 if (value != null) {
   Option(acc.date).fold {
 acc.value = value
 acc.date = date
   } { accDate =>
 if (date != null && date.isAfter(accDate)) {
   acc.value = value
   acc.date = date
 }
   }
 }
   }

   def merge(
   acc: LatestNonNullAccumulator[T],
   it: 
java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {

 val iter = it.iterator()
 while (iter.hasNext) {
   val a = iter.next()
   if (a.value != null) {
 Option(acc.date).fold {
   acc.value = a.value
  

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Dylan Forciea
Timo,

Will do! I have been patching in a change locally that I have a PR [1] out for, 
so if this will end up in the next 1.12 patch release, I may add this in with 
it once it has been approved and merged.

On a side note, that PR has been out since the end of October (looks like I 
need to do a rebase to accommodate the code reformatting change that occurred 
since). Is there a process for getting somebody to review it? Not sure if with 
the New Year and the 1.12 release and follow-up if it just got lost in the 
commotion.

Regards,
Dylan Forciea

[1] https://github.com/apache/flink/pull/13787

On 1/21/21, 8:50 AM, "Timo Walther"  wrote:

I opened a PR. Feel free to try it out.

https://github.com/apache/flink/pull/14720

Btw:

 >> env.createTemporarySystemFunction("LatestNonNullLong",
 >> classOf[LatestNonNull[Long]])
 >>
 >> env.createTemporarySystemFunction("LatestNonNullString",
 >> classOf[LatestNonNull[String]])

don't make a difference. The generics will be type erased in bytecode 
and only the class name matters.

Thanks,
Timo

On 21.01.21 11:36, Timo Walther wrote:
> Hi Dylan,
> 
> thanks for the investigation. I can now also reproduce it my code. Yes, 
> this is a bug. I opened
> 
> https://issues.apache.org/jira/browse/FLINK-21070
> 
> and will try to fix this asap.
> 
> Thanks,
> Timo
> 
> On 20.01.21 17:52, Dylan Forciea wrote:
>> Timo,
>>
>> I converted what I had to Java, and ended up with the exact same issue 
>> as before where it will work if I only ever use it on 1 type, but not 
>> if I use it on multiple. Maybe this is a bug?
>>
>> Dylan
>>
>> On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:
>>
>>  Oh, I think I might have a clue as to what is going on. I notice 
>> that it will work properly when I only call it on Long. I think that 
>> it is using the same generated code for the Converter for whatever was 
>> called first.
>>
>>  Since in Scala I can't declare an object as static within the 
>> class itself, I wonder if it won't generate appropriate Converter code 
>> per subtype. I tried creating a subclass that is specific to the type 
>> within my class and returning that as the accumulator, but that didn't 
>> help. And, I can't refer to that class in the TypeInference since it 
>> isn't static and I get an error from Flink because of that. I'm going 
>> to see if I just write this UDF in Java with an embedded public static 
>> class like you have if it will solve my problems. I'll report back to 
>> let you know what I find. If that works, I'm not quite sure how to 
>> make it work in Scala.
>>
>>  Regards,
>>  Dylan Forciea
>>
>>  On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:
>>
>>  As a side note, I also just tried to unify into a single 
>> function registration and used _ as the type parameter in the classOf 
>> calls there and within the TypeInference definition for the 
>> accumulator and still ended up with the exact same stack trace.
>>
>>  Dylan
>>
>>  On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:
>>
>>  Timo,
>>
>>  I appreciate it! I am using Flink 1.12.0 right now with 
>> the Blink planner. What you proposed is roughly what I had come up 
>> with the first time around that resulted in the stack trace with the 
>> ClassCastException I had originally included. I saw that you had used 
>> a Row instead of just the value in our example, but changing it that 
>> way didn't seem to help, which makes sense since the problem seems to 
>> be in the code generated for the accumulator Converter and not the 
>> output.
>>
>>  Here is the exact code that caused that error (while 
>> calling LatestNonNullLong):
>>
>>  The registration of the below:
>>  
>> env.createTemporarySystemFunction("LatestNonNullLong", 
>> classOf[LatestNonNull[Long]])
>>  
>> env.createTemporarySystemFunction("LatestNonNullString", 
>> classOf[LatestNonNull[String]])
>>
>>
>>  The class itself:
>>
>>  import java.time.LocalDate
>>  import java.util.Optional
>>  import org.apache.flink.table.api.DataTypes
>>  import org.apache.flink.table.catalog.DataTypeFactory
>>  import org.apache.flink.table.functions.AggregateFunction
>>  import 
>> org.apache.flink.table.types.inference.{InputTypeStrategies, 
>> TypeInference}
>>
>>  case class LatestNonNullAccumulator[T](
>>  var value: T = null.asInstanceOf[T],
>>  var date: LocalDate = null)
>>
>>

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther

Hi Dylan,

I can help with a review for your PR tomorrow. In general, I would 
recommend to just ping people a couple of times that have been worked on 
the component before (see git blame) to get a review. We are all busy 
and need a bit of pushing from time to time ;-)


Thanks,
Timo

On 21.01.21 16:09, Dylan Forciea wrote:

Timo,

Will do! I have been patching in a change locally that I have a PR [1] out for, 
so if this will end up in the next 1.12 patch release, I may add this in with 
it once it has been approved and merged.

On a side note, that PR has been out since the end of October (looks like I 
need to do a rebase to accommodate the code reformatting change that occurred 
since). Is there a process for getting somebody to review it? Not sure if with 
the New Year and the 1.12 release and follow-up if it just got lost in the 
commotion.

Regards,
Dylan Forciea

[1] https://github.com/apache/flink/pull/13787

On 1/21/21, 8:50 AM, "Timo Walther"  wrote:

 I opened a PR. Feel free to try it out.

 https://github.com/apache/flink/pull/14720

 Btw:

  >> env.createTemporarySystemFunction("LatestNonNullLong",
  >> classOf[LatestNonNull[Long]])
  >>
  >> env.createTemporarySystemFunction("LatestNonNullString",
  >> classOf[LatestNonNull[String]])

 don't make a difference. The generics will be type erased in bytecode
 and only the class name matters.

 Thanks,
 Timo

 On 21.01.21 11:36, Timo Walther wrote:
 > Hi Dylan,
 >
 > thanks for the investigation. I can now also reproduce it my code. Yes,
 > this is a bug. I opened
 >
 > https://issues.apache.org/jira/browse/FLINK-21070
 >
 > and will try to fix this asap.
 >
 > Thanks,
 > Timo
 >
 > On 20.01.21 17:52, Dylan Forciea wrote:
 >> Timo,
 >>
 >> I converted what I had to Java, and ended up with the exact same issue
 >> as before where it will work if I only ever use it on 1 type, but not
 >> if I use it on multiple. Maybe this is a bug?
 >>
 >> Dylan
 >>
 >> On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:
 >>
 >>  Oh, I think I might have a clue as to what is going on. I notice
 >> that it will work properly when I only call it on Long. I think that
 >> it is using the same generated code for the Converter for whatever was
 >> called first.
 >>
 >>  Since in Scala I can't declare an object as static within the
 >> class itself, I wonder if it won't generate appropriate Converter code
 >> per subtype. I tried creating a subclass that is specific to the type
 >> within my class and returning that as the accumulator, but that didn't
 >> help. And, I can't refer to that class in the TypeInference since it
 >> isn't static and I get an error from Flink because of that. I'm going
 >> to see if I just write this UDF in Java with an embedded public static
 >> class like you have if it will solve my problems. I'll report back to
 >> let you know what I find. If that works, I'm not quite sure how to
 >> make it work in Scala.
 >>
 >>  Regards,
 >>  Dylan Forciea
 >>
 >>  On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:
 >>
 >>  As a side note, I also just tried to unify into a single
 >> function registration and used _ as the type parameter in the classOf
 >> calls there and within the TypeInference definition for the
 >> accumulator and still ended up with the exact same stack trace.
 >>
 >>  Dylan
 >>
 >>  On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:
 >>
 >>  Timo,
 >>
 >>  I appreciate it! I am using Flink 1.12.0 right now with
 >> the Blink planner. What you proposed is roughly what I had come up
 >> with the first time around that resulted in the stack trace with the
 >> ClassCastException I had originally included. I saw that you had used
 >> a Row instead of just the value in our example, but changing it that
 >> way didn't seem to help, which makes sense since the problem seems to
 >> be in the code generated for the accumulator Converter and not the
 >> output.
 >>
 >>  Here is the exact code that caused that error (while
 >> calling LatestNonNullLong):
 >>
 >>  The registration of the below:
 >>
 >> env.createTemporarySystemFunction("LatestNonNullLong",
 >> classOf[LatestNonNull[Long]])
 >>
 >> env.createTemporarySystemFunction("LatestNonNullString",
 >> classOf[LatestNonNull[String]])
 >>
 >>
 >>  The class itself:
 >>
 >>  import java.time.LocalDate
 >>  import java.util.Optional
 >>  import org.apache.flink.table.api.DataTypes
 >>  import org.apache.flink.table.catalog.DataTypeFactory
 >>  import org.apache.flin

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Dylan Forciea
I wanted to report that I tried out your PR, and it does solve my issue. I am 
able to create a generic LatestNonNull and it appears to do what is expected.

Thanks,
Dylan Forciea

On 1/21/21, 8:50 AM, "Timo Walther"  wrote:

I opened a PR. Feel free to try it out.

https://github.com/apache/flink/pull/14720

Btw:

 >> env.createTemporarySystemFunction("LatestNonNullLong",
 >> classOf[LatestNonNull[Long]])
 >>
 >> env.createTemporarySystemFunction("LatestNonNullString",
 >> classOf[LatestNonNull[String]])

don't make a difference. The generics will be type erased in bytecode 
and only the class name matters.

Thanks,
Timo

On 21.01.21 11:36, Timo Walther wrote:
> Hi Dylan,
> 
> thanks for the investigation. I can now also reproduce it my code. Yes, 
> this is a bug. I opened
> 
> https://issues.apache.org/jira/browse/FLINK-21070
> 
> and will try to fix this asap.
> 
> Thanks,
> Timo
> 
> On 20.01.21 17:52, Dylan Forciea wrote:
>> Timo,
>>
>> I converted what I had to Java, and ended up with the exact same issue 
>> as before where it will work if I only ever use it on 1 type, but not 
>> if I use it on multiple. Maybe this is a bug?
>>
>> Dylan
>>
>> On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:
>>
>>  Oh, I think I might have a clue as to what is going on. I notice 
>> that it will work properly when I only call it on Long. I think that 
>> it is using the same generated code for the Converter for whatever was 
>> called first.
>>
>>  Since in Scala I can't declare an object as static within the 
>> class itself, I wonder if it won't generate appropriate Converter code 
>> per subtype. I tried creating a subclass that is specific to the type 
>> within my class and returning that as the accumulator, but that didn't 
>> help. And, I can't refer to that class in the TypeInference since it 
>> isn't static and I get an error from Flink because of that. I'm going 
>> to see if I just write this UDF in Java with an embedded public static 
>> class like you have if it will solve my problems. I'll report back to 
>> let you know what I find. If that works, I'm not quite sure how to 
>> make it work in Scala.
>>
>>  Regards,
>>  Dylan Forciea
>>
>>  On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:
>>
>>  As a side note, I also just tried to unify into a single 
>> function registration and used _ as the type parameter in the classOf 
>> calls there and within the TypeInference definition for the 
>> accumulator and still ended up with the exact same stack trace.
>>
>>  Dylan
>>
>>  On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:
>>
>>  Timo,
>>
>>  I appreciate it! I am using Flink 1.12.0 right now with 
>> the Blink planner. What you proposed is roughly what I had come up 
>> with the first time around that resulted in the stack trace with the 
>> ClassCastException I had originally included. I saw that you had used 
>> a Row instead of just the value in our example, but changing it that 
>> way didn't seem to help, which makes sense since the problem seems to 
>> be in the code generated for the accumulator Converter and not the 
>> output.
>>
>>  Here is the exact code that caused that error (while 
>> calling LatestNonNullLong):
>>
>>  The registration of the below:
>>  
>> env.createTemporarySystemFunction("LatestNonNullLong", 
>> classOf[LatestNonNull[Long]])
>>  
>> env.createTemporarySystemFunction("LatestNonNullString", 
>> classOf[LatestNonNull[String]])
>>
>>
>>  The class itself:
>>
>>  import java.time.LocalDate
>>  import java.util.Optional
>>  import org.apache.flink.table.api.DataTypes
>>  import org.apache.flink.table.catalog.DataTypeFactory
>>  import org.apache.flink.table.functions.AggregateFunction
>>  import 
>> org.apache.flink.table.types.inference.{InputTypeStrategies, 
>> TypeInference}
>>
>>  case class LatestNonNullAccumulator[T](
>>  var value: T = null.asInstanceOf[T],
>>  var date: LocalDate = null)
>>
>>  class LatestNonNull[T] extends AggregateFunction[T, 
>> LatestNonNullAccumulator[T]] {
>>
>>override def createAccumulator(): 
>> LatestNonNullAccumulator[T] = {
>>  LatestNonNullAccumulator[T]()
>>}
>>
>>override def getValue(acc: 
>> LatestNonNullAccumulator[T]): T = {
>>  acc.value
>>