[ 
https://issues.apache.org/jira/browse/FLINK-24950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaojunchen updated FLINK-24950:
--------------------------------
    Description: 
Dear all friends:

I try to execute a hive ddl sql with stream table api on flink-1.13.2, the code 
like:

```java

String hiveDDL = ResourceUtil.readClassPathSource("hive-ddl.sql");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String name = "hive";
String defaultDatabase = "stream";
String hiveConfDir = "conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("hive", hive);
tableEnv.useCatalog("hive");
tableEnv.useDatabase("stream");

tableEnv.executeSql("DROP TABLE IF EXISTS dimension_table");
// 设置HIVE方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql(hiveDDL);

```

the hive server in cdh5.14.2, and the ddl sql like:

```sql

CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING
)
PARTITIONED BY (
pt_year STRING,
pt_month STRING,
pt_day STRING
)
TBLPROPERTIES (
– using default partition-name order to load the latest partition every 12h 
(the most recommended and convenient way)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name', – option with default 
value, can be ignored.

– using partition file create-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',
'streaming-source.monitor-interval' = '12 h',

– using partition-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 
00:00:00'
)

```

then run it, but throw NullPointerException, like:

```

2021-11-18 15:33:00,387 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] 
- Setting hive conf dir as conf
2021-11-18 15:33:00,481 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable 
to load native-hadoop library for your platform... using builtin-java classes 
where applicable
2021-11-18 15:33:01,345 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] 
- Created HiveCatalog 'hive'
2021-11-18 15:33:01,371 INFO [hive.metastore] - Trying to connect to metastore 
with URI thrift://cdh-dev-node-119:9083
2021-11-18 15:33:01,441 INFO [hive.metastore] - Opened a connection to 
metastore, current connections: 1
2021-11-18 15:33:01,521 INFO [hive.metastore] - Connected to metastore.
2021-11-18 15:33:01,856 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] 
- Connected to Hive metastore
2021-11-18 15:33:01,899 INFO [org.apache.flink.table.catalog.CatalogManager] - 
Set the current default catalog as [hive] and the current default database as 
[stream].
2021-11-18 15:33:03,290 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created local directory: 
/var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/681dd0aa-ba35-4a0e-b069-3ad48f030774_resources
2021-11-18 15:33:03,298 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created HDFS directory: 
/tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
2021-11-18 15:33:03,305 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created local directory: 
/var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
2021-11-18 15:33:03,311 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created HDFS directory: 
/tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774/_tmp_space.db
2021-11-18 15:33:03,314 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
No Tez session required at this point. hive.execution.engine=mr.
Exception in thread "main" java.lang.NullPointerException
    at 
org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(HiveShimV100.java:422)
    at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    at com.hacker.flinksql.hive.HiveSqlTest.main(HiveSqlTest.java:48)

```

I found the error code in flink-1.13.2, 

org.apache.flink.table.catalog.hive.client.HiveShimV100.java - line:422

this method params is null, the code:

```

@Override
public void registerTemporaryFunction(String funcName, Class funcClass) {
try

{ registerTemporaryFunction.invoke(null, funcName, funcClass); }

catch (IllegalAccessException | InvocationTargetException e)

{ throw new FlinkHiveException("Failed to register temp function", e); }

}

```

my maven dependency

```

<properties>
<hadoop.version>2.6.0-cdh5.14.2</hadoop.version>
<hive.version>1.1.0-cdh5.14.2</hive.version>
</properties>

<!-- flink sql core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>

<!-- hive catalog -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>

<!-- catalog hadoop dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.15.2</version>
<scope>provided</scope>
</dependency>

```

  was:
Dear all friends:

I try to execute a hive ddl sql with stream table api on flink-1.13.2, the code 
like:

```java

String hiveDDL = ResourceUtil.readClassPathSource("hive-ddl.sql");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String name = "hive";
String defaultDatabase = "stream";
String hiveConfDir = "conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("hive", hive);
tableEnv.useCatalog("hive");
tableEnv.useDatabase("stream");

tableEnv.executeSql("DROP TABLE IF EXISTS dimension_table");
// 设置HIVE方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql(hiveDDL);

```

the hive server in cdh5.14.2, and the ddl sql like:

```sql

CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING
)
PARTITIONED BY (
pt_year STRING,
pt_month STRING,
pt_day STRING
)
TBLPROPERTIES (
-- using default partition-name order to load the latest partition every 12h 
(the most recommended and convenient way)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name', -- option with default 
value, can be ignored.

-- using partition file create-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',
'streaming-source.monitor-interval' = '12 h'

-- using partition-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 
00:00:00'
)

```

then run it, but throw NullPointerException, like:

```

2021-11-18 15:33:00,387 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] 
- Setting hive conf dir as conf
2021-11-18 15:33:00,481 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable 
to load native-hadoop library for your platform... using builtin-java classes 
where applicable
2021-11-18 15:33:01,345 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] 
- Created HiveCatalog 'hive'
2021-11-18 15:33:01,371 INFO [hive.metastore] - Trying to connect to metastore 
with URI thrift://cdh-dev-node-119:9083
2021-11-18 15:33:01,441 INFO [hive.metastore] - Opened a connection to 
metastore, current connections: 1
2021-11-18 15:33:01,521 INFO [hive.metastore] - Connected to metastore.
2021-11-18 15:33:01,856 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] 
- Connected to Hive metastore
2021-11-18 15:33:01,899 INFO [org.apache.flink.table.catalog.CatalogManager] - 
Set the current default catalog as [hive] and the current default database as 
[stream].
2021-11-18 15:33:03,290 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created local directory: 
/var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/681dd0aa-ba35-4a0e-b069-3ad48f030774_resources
2021-11-18 15:33:03,298 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created HDFS directory: 
/tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
2021-11-18 15:33:03,305 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created local directory: 
/var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
2021-11-18 15:33:03,311 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
Created HDFS directory: 
/tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774/_tmp_space.db
2021-11-18 15:33:03,314 INFO [org.apache.hadoop.hive.ql.session.SessionState] - 
No Tez session required at this point. hive.execution.engine=mr.
Exception in thread "main" java.lang.NullPointerException
    at 
org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(HiveShimV100.java:422)
    at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    at com.hacker.flinksql.hive.HiveSqlTest.main(HiveSqlTest.java:48)

```

I found the error code in flink-1.13.2, 

org.apache.flink.table.catalog.hive.client.HiveShimV100.java - line:422

this method params is null, the code:

```

@Override
public void registerTemporaryFunction(String funcName, Class funcClass) {
try {
registerTemporaryFunction.invoke(null, funcName, funcClass);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new FlinkHiveException("Failed to register temp function", e);
}
}

```

my maven dependency

```

<properties>
<hadoop.version>2.6.0-cdh5.14.2</hadoop.version>
<hive.version>1.1.0-cdh5.14.2</hive.version>
</properties>

<!-- flink sql core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>

<!-- hive catalog -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>

<!-- catalog hadoop dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.15.2</version>
<scope>provided</scope>
</dependency>

```


> Use Hive Dialect execute Hive DDL, But throw a NullPointerException 
> --------------------------------------------------------------------
>
>                 Key: FLINK-24950
>                 URL: https://issues.apache.org/jira/browse/FLINK-24950
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.13.2
>         Environment: flink-1.13.2
> cdh5.14.2
> jdk8
>            Reporter: xiaojunchen
>            Priority: Major
>              Labels: flink-connector-hive, flinksql
>
> Dear all friends:
> I try to execute a hive ddl sql with stream table api on flink-1.13.2, the 
> code like:
> ```java
> String hiveDDL = ResourceUtil.readClassPathSource("hive-ddl.sql");
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode().build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
> String name = "hive";
> String defaultDatabase = "stream";
> String hiveConfDir = "conf";
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
> tableEnv.registerCatalog("hive", hive);
> tableEnv.useCatalog("hive");
> tableEnv.useDatabase("stream");
> tableEnv.executeSql("DROP TABLE IF EXISTS dimension_table");
> // 设置HIVE方言
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql(hiveDDL);
> ```
> the hive server in cdh5.14.2, and the ddl sql like:
> ```sql
> CREATE TABLE dimension_table (
> product_id STRING,
> product_name STRING,
> unit_price DECIMAL(10, 4),
> pv_count BIGINT,
> like_count BIGINT,
> comment_count BIGINT,
> update_time TIMESTAMP(3),
> update_user STRING
> )
> PARTITIONED BY (
> pt_year STRING,
> pt_month STRING,
> pt_day STRING
> )
> TBLPROPERTIES (
> – using default partition-name order to load the latest partition every 12h 
> (the most recommended and convenient way)
> 'streaming-source.enable' = 'true',
> 'streaming-source.partition.include' = 'latest',
> 'streaming-source.monitor-interval' = '12 h',
> 'streaming-source.partition-order' = 'partition-name', – option with default 
> value, can be ignored.
> – using partition file create-time order to load the latest partition every 
> 12h
> 'streaming-source.enable' = 'true',
> 'streaming-source.partition.include' = 'latest',
> 'streaming-source.partition-order' = 'create-time',
> 'streaming-source.monitor-interval' = '12 h',
> – using partition-time order to load the latest partition every 12h
> 'streaming-source.enable' = 'true',
> 'streaming-source.partition.include' = 'latest',
> 'streaming-source.monitor-interval' = '12 h',
> 'streaming-source.partition-order' = 'partition-time',
> 'partition.time-extractor.kind' = 'default',
> 'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 
> 00:00:00'
> )
> ```
> then run it, but throw NullPointerException, like:
> ```
> 2021-11-18 15:33:00,387 INFO 
> [org.apache.flink.table.catalog.hive.HiveCatalog] - Setting hive conf dir as 
> conf
> 2021-11-18 15:33:00,481 WARN [org.apache.hadoop.util.NativeCodeLoader] - 
> Unable to load native-hadoop library for your platform... using builtin-java 
> classes where applicable
> 2021-11-18 15:33:01,345 INFO 
> [org.apache.flink.table.catalog.hive.HiveCatalog] - Created HiveCatalog 'hive'
> 2021-11-18 15:33:01,371 INFO [hive.metastore] - Trying to connect to 
> metastore with URI thrift://cdh-dev-node-119:9083
> 2021-11-18 15:33:01,441 INFO [hive.metastore] - Opened a connection to 
> metastore, current connections: 1
> 2021-11-18 15:33:01,521 INFO [hive.metastore] - Connected to metastore.
> 2021-11-18 15:33:01,856 INFO 
> [org.apache.flink.table.catalog.hive.HiveCatalog] - Connected to Hive 
> metastore
> 2021-11-18 15:33:01,899 INFO [org.apache.flink.table.catalog.CatalogManager] 
> - Set the current default catalog as [hive] and the current default database 
> as [stream].
> 2021-11-18 15:33:03,290 INFO [org.apache.hadoop.hive.ql.session.SessionState] 
> - Created local directory: 
> /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/681dd0aa-ba35-4a0e-b069-3ad48f030774_resources
> 2021-11-18 15:33:03,298 INFO [org.apache.hadoop.hive.ql.session.SessionState] 
> - Created HDFS directory: 
> /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
> 2021-11-18 15:33:03,305 INFO [org.apache.hadoop.hive.ql.session.SessionState] 
> - Created local directory: 
> /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
> 2021-11-18 15:33:03,311 INFO [org.apache.hadoop.hive.ql.session.SessionState] 
> - Created HDFS directory: 
> /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774/_tmp_space.db
> 2021-11-18 15:33:03,314 INFO [org.apache.hadoop.hive.ql.session.SessionState] 
> - No Tez session required at this point. hive.execution.engine=mr.
> Exception in thread "main" java.lang.NullPointerException
>     at 
> org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(HiveShimV100.java:422)
>     at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
>     at com.hacker.flinksql.hive.HiveSqlTest.main(HiveSqlTest.java:48)
> ```
> I found the error code in flink-1.13.2, 
> org.apache.flink.table.catalog.hive.client.HiveShimV100.java - line:422
> this method params is null, the code:
> ```
> @Override
> public void registerTemporaryFunction(String funcName, Class funcClass) {
> try
> { registerTemporaryFunction.invoke(null, funcName, funcClass); }
> catch (IllegalAccessException | InvocationTargetException e)
> { throw new FlinkHiveException("Failed to register temp function", e); }
> }
> ```
> my maven dependency
> ```
> <properties>
> <hadoop.version>2.6.0-cdh5.14.2</hadoop.version>
> <hive.version>1.1.0-cdh5.14.2</hive.version>
> </properties>
> <!-- flink sql core -->
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>1.7.5</version>
> <scope>provided</scope>
> </dependency>
> <!-- hive catalog -->
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.hive</groupId>
> <artifactId>hive-exec</artifactId>
> <version>${hive.version}</version>
> <scope>provided</scope>
> </dependency>
> <!-- catalog hadoop dependency -->
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>2.6.0-cdh5.15.2</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-mapreduce-client-core</artifactId>
> <version>2.6.0-cdh5.15.2</version>
> <scope>provided</scope>
> </dependency>
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to