Hi,
   从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的 
PreparedStatement 包不对。
   
具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
我理解如果 phoenix 支持标准的 SQL 协议的话,直接用提供的 JDBCRowDataLookupFunction 也可以?


Best ,
Hailong
在 2020-12-01 16:40:48,"hoose" <xiaoby...@qq.com> 写道:
>flinksql消费kafka,自定义的 connector phoenix 查询维表
>任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
>2020-11-24 00:52:38,534 ERROR 
>com.custom.jdbc.table.JdbcRowDataLookupFunction&nbsp;[] - JDBC executeBatch 
>error, retry times = 2
>java.sql.SQLException: null
>       at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145)
>  [sql-client-1.0-SNAPSHOT.jar:?]
>       at LookupFunction$2.flatMap(Unknown Source) 
> [flink-table-blink_2.11-1.11.1.jar:?]
>       at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
>  [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
>  [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>Caused by: org.apache.calcite.avatica.NoSuchStatementException
>       at 
> org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       ... 20 more
>2020-11-24 00:52:40,539 ERROR 
>org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC 
>executeBatch error, retry times = 3
>java.sql.SQLException: null
>       at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145)
>  [sql-client-1.0-SNAPSHOT.jar:?]
>       at LookupFunction$2.flatMap(Unknown Source) 
> [flink-table-blink_2.11-1.11.1.jar:?]
>       at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
>  [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
>  [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>  [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>  [flink-dist_2.11-1.11.1.jar:1.11.1]
>Caused by: org.apache.calcite.avatica.NoSuchStatementException
>       at 
> org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) 
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       ... 20 more
>2020-11-24 00:52:40,635 WARN&nbsp; 
>org.apache.flink.runtime.taskmanager.Task&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
>&nbsp; &nbsp; &nbsp; switched from RUNNING to FAILED.
>java.lang.RuntimeException: Execution of JDBC statement failed.
>
>
>
>各位大佬帮我看下哪的问题?
>感谢

回复