[ 
https://issues.apache.org/jira/browse/FLINK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456984#comment-17456984
 ] 

zoucao commented on FLINK-24528:
--------------------------------

+1,  the same problem we occurred.

> Flink HBase Asyc Lookup throw NPE if rowkey is null
> ---------------------------------------------------
>
>                 Key: FLINK-24528
>                 URL: https://issues.apache.org/jira/browse/FLINK-24528
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>    Affects Versions: 1.13.0
>            Reporter: zhisheng
>            Priority: Major
>
> Flink SQL DDL create HBase table, if set 'lookup.async' = 'true', when the 
> rowkey is null, may throw NPE:
> {code:java}
> 2021-10-12 21:11:07,100 INFO  
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction [] - 
> start close ...2021-10-12 21:11:07,100 INFO  
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction [] - 
> start close ...2021-10-12 21:11:07,103 WARN  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> LookupJoin(table=[default_catalog.default_database.dim_user_guid_relation], 
> joinType=[LeftOuterJoin], async=[true], lookup=[rowkey=userGuid], 
> select=[userGuid, last_time, rowkey, cf]) -> Calc(select=[userGuid AS 
> user_guid, cf.user_new_id AS user_new_id, last_time AS 
> usr_pwtx_ectx_driver_last_seek_order_time, _UTF-16LE'prfl.usr' AS metric]) -> 
> Sink: Sink(table=[default_catalog.default_database.print_table], 
> fields=[user_guid, user_new_id, usr_pwtx_ectx_driver_last_seek_order_time, 
> metric]) (1/1)#0 (06bf3d7b0c341101e070796e20f7e571) switched from RUNNING to 
> FAILED.java.lang.NullPointerException: null at 
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RawAsyncTableImpl.get(RawAsyncTableImpl.java:249)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncTableImpl.get(AsyncTableImpl.java:96)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.fetchResult(HBaseRowDataAsyncLookupFunction.java:187)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.eval(HBaseRowDataAsyncLookupFunction.java:174)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> LookupFunction$24.asyncInvoke(Unknown Source) ~[?:?] at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.asyncInvoke(AsyncLookupJoinRunner.java:139)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.asyncInvoke(AsyncLookupJoinRunner.java:53)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:195)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_201] Suppressed: java.lang.Exception: 
> java.lang.NoClassDefFoundError: 
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:723)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_201] Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils at 
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:266)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> LookupFunction$24.close(Unknown Source) ~[?:?] at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.close(AsyncLookupJoinRunner.java:154)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 5 more Caused by: 
> java.lang.ClassNotFoundException: 
> org.apache.flink.hbase.shaded.org.apache.commons.io.IOUtils at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_201] at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_201] at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[?:1.8.0_201] 
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_201] at 
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:266)
>  ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at 
> LookupFunction$24.close(Unknown Source) ~[?:?] at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.close(AsyncLookupJoinRunner.java:154)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 5 more
> {code}
> the HBaseRowDataAsyncLookupFunction code in flink 1.13 is:
> [https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java#L172]
>  
> the HBaseRowDataLookupFunction code is :
> [https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java#L108]
>  
> the code will return null 
> {code:java}
> Get get = serde.createGet(rowKey);
> public Get createGet(Object rowKey) {
>    checkArgument(keyEncoder != null, "row key is not set.");
>    rowWithRowKey.setField(0, rowKey);
>    byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
>    if (rowkey.length == 0) {
>       // drop dirty records, rowkey shouldn't be zero length
>       return null;
>    }
>    Get get = new Get(rowkey);
>    for (int f = 0; f < families.length; f++) {
>       byte[] family = families[f];
>       for (byte[] qualifier : qualifiers[f]) {
>          get.addColumn(family, qualifier);
>       }
>    }
>    return get;
> }
> {code}
> we should add
> {code:java}
> if(get != null) {
>    ...
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to