xiaojunchen created FLINK-24950: ----------------------------------- Summary: Use Hive Dialect execute Hive DDL, But throw a NullPointerException Key: FLINK-24950 URL: https://issues.apache.org/jira/browse/FLINK-24950 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.13.2 Environment: flink-1.13.2
cdh5.14.2 jdk8 Reporter: xiaojunchen Dear all friends: I try to execute a hive ddl sql with stream table api on flink-1.13.2, the code like: ```java String hiveDDL = ResourceUtil.readClassPathSource("hive-ddl.sql"); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); String name = "hive"; String defaultDatabase = "stream"; String hiveConfDir = "conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("hive", hive); tableEnv.useCatalog("hive"); tableEnv.useDatabase("stream"); tableEnv.executeSql("DROP TABLE IF EXISTS dimension_table"); // 设置HIVE方言 tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql(hiveDDL); ``` the hive server in cdh5.14.2, and the ddl sql like: ```sql CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING ) PARTITIONED BY ( pt_year STRING, pt_month STRING, pt_day STRING ) TBLPROPERTIES ( -- using default partition-name order to load the latest partition every 12h (the most recommended and convenient way) 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-name', -- option with default value, can be ignored. -- using partition file create-time order to load the latest partition every 12h 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.partition-order' = 'create-time', 'streaming-source.monitor-interval' = '12 h' -- using partition-time order to load the latest partition every 12h 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-time', 'partition.time-extractor.kind' = 'default', 'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' ) ``` then run it, but throw NullPointerException, like: ``` 2021-11-18 15:33:00,387 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Setting hive conf dir as conf 2021-11-18 15:33:00,481 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2021-11-18 15:33:01,345 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Created HiveCatalog 'hive' 2021-11-18 15:33:01,371 INFO [hive.metastore] - Trying to connect to metastore with URI thrift://cdh-dev-node-119:9083 2021-11-18 15:33:01,441 INFO [hive.metastore] - Opened a connection to metastore, current connections: 1 2021-11-18 15:33:01,521 INFO [hive.metastore] - Connected to metastore. 2021-11-18 15:33:01,856 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Connected to Hive metastore 2021-11-18 15:33:01,899 INFO [org.apache.flink.table.catalog.CatalogManager] - Set the current default catalog as [hive] and the current default database as [stream]. 2021-11-18 15:33:03,290 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created local directory: /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/681dd0aa-ba35-4a0e-b069-3ad48f030774_resources 2021-11-18 15:33:03,298 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created HDFS directory: /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774 2021-11-18 15:33:03,305 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created local directory: /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774 2021-11-18 15:33:03,311 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created HDFS directory: /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774/_tmp_space.db 2021-11-18 15:33:03,314 INFO [org.apache.hadoop.hive.ql.session.SessionState] - No Tez session required at this point. hive.execution.engine=mr. Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(HiveShimV100.java:422) at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.hacker.flinksql.hive.HiveSqlTest.main(HiveSqlTest.java:48) ``` I found the error code in flink-1.13.2, org.apache.flink.table.catalog.hive.client.HiveShimV100.java - line:422 this method params is null, the code: ``` @Override public void registerTemporaryFunction(String funcName, Class funcClass) { try { registerTemporaryFunction.invoke(null, funcName, funcClass); } catch (IllegalAccessException | InvocationTargetException e) { throw new FlinkHiveException("Failed to register temp function", e); } } ``` my maven dependency ``` <properties> <hadoop.version>2.6.0-cdh5.14.2</hadoop.version> <hive.version>1.1.0-cdh5.14.2</hive.version> </properties> <!-- flink sql core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> <scope>provided</scope> </dependency> <!-- hive catalog --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency> <!-- catalog hadoop dependency --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.15.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.15.2</version> <scope>provided</scope> </dependency> ``` -- This message was sent by Atlassian Jira (v8.20.1#820001)