Hi Flink folks,

In a Flink job using the SQL API that I’m working on, I have a custom POJO
data type with a generic field, and I would like to be able to call a
user-defined function on this field. I included a similar function below
with the business logic stubbed out, but this example has the return type
I'm looking for.

I have no issues using custom functions of this type when they're used in a
select statement and the `getResultType` method is excluded from the
user-defined function class, but I am unable to get the type information to
resolve correctly in contexts like order by and group by statements. It
still doesn't work even if the `getResultType` method defines the specific
type for a given object explicitly because the job compiler within Flink
seems to be assuming the return type from the `eval` method is just an
Object (type erasure...), and it fails to generate the object code because
it's detecting invalid casts to the desired output type. Without the
`getResultType` method, it just fails to detect type entirely. This seems
to be fine when it's just a select, but if I try to make it do any
operation (like group by) I get the following error:
"org.apache.flink.api.common.InvalidProgramException: This type
(GenericType<java.lang.Object>) cannot be used as key."

Does anyone know if there's a way to get Flink to pay attention to the type
information from `getResultType` when compiling the `eval` method so that
the types work out? Or another way to work around the type erasure on the
eval method without defining explicit user-defined function classes for
each type?

Thanks for your help!

Morrisa



Code snippet:


package flink_generics_testing;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;

/**

* Reads custom values from a table and performs a function on those values.

* T should be able to be a String, long, float, boolean, or Date

*

* @param <T> The expected type of the table column values.

*/

public class CustomScalarFunction<T> extends ScalarFunction {

  private static final long serialVersionUID = -5537657771138360838L;

  private final Class<T> desiredType;

  /**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing the
function on.

   */

  public CustomScalarFunction(Class<T> desiredType) {

      this.desiredType = desiredType;

  }


  public T eval(T value) {

      return value;

  }

  @Override

  public TypeInformation<?> getResultType(Class<?>[] signature) {

      return TypeInformation.of(desiredType);

  }

  @Override

  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

      return new TypeInformation<?>[]{

              TypeInformation.of(desiredType)

      };

  }

}


-- 
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]

Reply via email to