Hey Rui,

For geospatial udfs, I've configured these jars to my flink deployment:

# Flink-Hive

RUN wget -q -O
/opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar
\

  && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar
\

  && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar
http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar


# Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop

RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar
https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar
\

  && wget -q -O /opt/flink/lib/spatial-sdk-json.jar
https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar
\

  && wget -q -O /opt/flink/lib/esri-geometry-api.jar
https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar



As I mentioned above, I did not register the functions explicitly because
the 'CREATE FUNCTION ...'  statement did not work for me. If I run this
statement, e.g., "CREATE FUNCTION ST_GeomFromText AS
 'com.esri.hadoop.hive.ST_GeomFromText'" :


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Function flink_gaia.ST_GeomFromText already exists in
Catalog flink-hive.

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function
flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more



Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:05 PM Rui Li <lirui.fu...@gmail.com> wrote:

> Hi Youngwoo,
>
> Could you please share the function jar and DDL you used to create the
> function? I can try reproducing this issue locally.
>
> On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <yw...@apache.org>
> wrote:
>
>> Thanks Shengkai and Rui for looking into this.
>>
>> A snippet from my app. looks like following:
>>
>>     HiveCatalog hive = *new* HiveCatalog("flink-hive", "default",
>> "/tmp/hive");
>>
>>     tableEnv.registerCatalog("flink-hive", hive);
>>
>>
>>     tableEnv.useCatalog("flink-hive");
>>
>>     tableEnv.loadModule("flink-hive", *new* HiveModule("3.1.2"));
>>
>>
>>     tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);
>>
>>
>>     tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");
>>
>>     tableEnv.executeSql("USE flink_gaia");
>>
>>     tableEnv.executeSql("SHOW CURRENT CATALOG").print();
>>
>>     tableEnv.executeSql("SHOW CURRENT DATABASE").print();
>>
>>     tableEnv.executeSql("SHOW TABLES").print();
>>
>>     tableEnv.executeSql("SHOW FUNCTIONS").print();
>>
>>
>>
>>     // Test Hive UDF
>>
>>     tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>>
>>
>> And I got the following output and exception:
>>
>>
>> +----------------------+
>>
>> | current catalog name |
>>
>> +----------------------+
>>
>> |           flink-hive |
>>
>> +----------------------+
>>
>> 1 row in set
>>
>> +-----------------------+
>>
>> | current database name |
>>
>> +-----------------------+
>>
>> |            flink_gaia |
>>
>> +-----------------------+
>>
>> 1 row in set
>>
>> +----------------------+
>>
>> |           table name |
>>
>> +----------------------+
>>
>> |             geofence |
>>
>> |                 lcap |
>>
>> | lcap_temporal_fenced |
>>
>> +----------------------+
>>
>>
>> +--------------------------------+
>>
>> |                  function name |
>>
>> +--------------------------------+
>>
>> |                       regr_sxy |
>>
>> ......
>>
>>
>> 380 rows in set
>>
>>
>> (snip)
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: SQL validation failed. From line 1, column 18 to
>> line 1, column 31: No match found for function signature
>> ST_Point(<NUMERIC>, <NUMERIC>)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>>
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>
>>
>>
>>
>> Thanks,
>>
>> Youngwoo
>>
>>
>>
>> On Wed, Apr 28, 2021 at 1:44 PM Rui Li <lirui.fu...@gmail.com> wrote:
>>
>>> Hi Youngwoo,
>>>
>>> The catalog function is associated with a catalog and DB. Assuming you
>>> have created the function ST_Point in your metastore, could you verify
>>> whether the current catalog is your HiveCatalog and the current database is
>>> the database in which ST_Point is registered?
>>>
>>> On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <fskm...@gmail.com>
>>> wrote:
>>>
>>>> Hi.
>>>>
>>>> The order of the module may influence the load of the function.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-22383
>>>>
>>>> Youngwoo Kim (김영우) <yw...@apache.org> 于2021年4月28日周三 上午10:50写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've configured Hive metastore to use HiveCatalog in streaming
>>>>> application. So far, most of the features are working fine in hive
>>>>> integration.
>>>>>
>>>>> However, I have a problem in using Hive UDFs. Already done
>>>>> prerequisites to use Hive geospatial UDFs[1]
>>>>>
>>>>> To sanity check, I did run a query like below:
>>>>>
>>>>> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>>>>>
>>>>>
>>>>> Got an exception like this:
>>>>>
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error: SQL validation failed. From line 1, column 18 to
>>>>> line 1, column 63: No match found for function signature
>>>>> ST_Point(<NUMERIC>, <NUMERIC>)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>>>>
>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>>>>
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>>>
>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>>>>
>>>>> Caused by: org.apache.flink.table.api.ValidationException: SQL
>>>>> validation failed. From line 1, column 18 to line 1, column 63: No match
>>>>> found for function signature ST_Point(<NUMERIC>, <NUMERIC>)
>>>>>
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>>>>>
>>>>> at
>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>>>>>
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>>>>>
>>>>> at
>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>>>>>
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>>>>>
>>>>> at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)
>>>>>
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>>>>
>>>>> ... 11 more
>>>>>
>>>>> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>>>>> line 1, column 18 to line 1, column 63: No match found for function
>>>>> signature ST_Point(<NUMERIC>, <NUMERIC>)
>>>>>
>>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method)
>>>>>
>>>>> at
>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>>>
>>>>> at
>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>
>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>
>>>>> at
>>>>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>>>>
>>>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>>>>
>>>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)
>>>>>
>>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)
>>>>>
>>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)
>>>>>
>>>>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)
>>>>>
>>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)
>>>>>
>>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)
>>>>>
>>>>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
>>>>>
>>>>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>>>>>
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>>>>>
>>>>> ... 21 more
>>>>>
>>>>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
>>>>> match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)
>>>>>
>>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method)
>>>>>
>>>>> at
>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>>>
>>>>> at
>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>
>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>
>>>>> at
>>>>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>>>>
>>>>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>>>>>
>>>>> ... 51 more
>>>>>
>>>>>
>>>>> (snip)
>>>>>
>>>>>
>>>>> And also, there are no such functions from 'SHOW FUNCTIONS':
>>>>>
>>>>> tableEnv.executeSql("SHOW FUNCTIONS").print();
>>>>>
>>>>>
>>>>> ......
>>>>>
>>>>> (snip)
>>>>>
>>>>>
>>>>>
>>>>> Registering the functions explicitly does not work for me:
>>>>>
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error: Function flink_gaia.ST_GeomFromText already exists
>>>>> in Catalog flink-hive.
>>>>>
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>>>>
>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>>>>
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>>>
>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>>>>
>>>>> Caused by: org.apache.flink.table.api.ValidationException: Function
>>>>> flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.
>>>>>
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)
>>>>>
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)
>>>>>
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>>>>>
>>>>> at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)
>>>>>
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>>>>
>>>>> ... 11 more
>>>>>
>>>>> (snip)
>>>>>
>>>>>
>>>>>
>>>>> I hope to find out why the functions are missing. Flink(Ver. 1.12.2)
>>>>> job cluster is running on Kubernetes cluster via flink operator and the
>>>>> standalone metastore is running for only the Flink cluster without Hive
>>>>> deployments.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Youngwoo
>>>>>
>>>>> 1. https://github.com/Esri/spatial-framework-for-hadoop
>>>>>
>>>>
>>>
>>> --
>>> Best regards!
>>> Rui Li
>>>
>>
>
> --
> Best regards!
> Rui Li
>

Reply via email to