Re: pyflink1.12 连接Mysql报错 : Missing required options

2020-12-21 文章 Wei Zhong
Hi,

正如报错中提示的,with参数里需要的是"url"参数,你可以尝试将connector.url改成url试试看会不会报错了。

> 在 2020年12月21日,13:44,肖越 <18242988...@163.com> 写道:
> 
> 在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
> #DDL定义
> source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\
> 
>symbol_id VARCHAR,biz_date VARCHAR,\
> 
>ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\
> 
>is_valid DECIMAL,time_mark TIMESTAMP) WITH (
> 
>'connector' = 'jdbc',
> 
>'connector.url' = 'jdbc:mysql://ip:port/db_base',
> 
>'connector.table' = 'ts_pf_sec_yldrate',
> 
>'table-name' = 'ts_pf_sec_yldrate',
> 
>'connector.driver' = 'com.mysql.jdbc.Driver',
> 
>'connector.username' = 'xxx',
> 
>'connector.password' = 'xxx')
> 
> """
> 错误信息:
> Traceback (most recent call last):
>  File 
> "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
>  line 67, in 
>print(join.to_pandas().head(6))
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
>  line 807, in to_pandas
>.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 147, in deco
>return f(*a, **kw)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>  line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> : org.apache.flink.table.api.ValidationException: Unable to create a source 
> for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'connector.driver'='com.mysql.jdbc.Driver'
> 'connector.password'='xxx'
> 'connector.table'='ts_pf_sec_yldrate'
> 'connector.url'='jdbc:mysql://ip:port/db_base'
> 'connector.username'='xxx'
> 'table-name'='ts_pf_sec_yldrate'
> at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
> at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
> at 
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
> at 
> org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
> at 
> 

pyflink1.12 连接Mysql报错 : Missing required options

2020-12-20 文章 肖越
在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
#DDL定义
source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\

symbol_id VARCHAR,biz_date VARCHAR,\

ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\

is_valid DECIMAL,time_mark TIMESTAMP) WITH (

'connector' = 'jdbc',

'connector.url' = 'jdbc:mysql://ip:port/db_base',

'connector.table' = 'ts_pf_sec_yldrate',

'table-name' = 'ts_pf_sec_yldrate',

'connector.driver' = 'com.mysql.jdbc.Driver',

'connector.username' = 'xxx',

'connector.password' = 'xxx')

"""
错误信息:
Traceback (most recent call last):
  File 
"C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 67, in 
print(join.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 807, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.ValidationException: Unable to create a source for 
reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.


Table options are:


'connector'='jdbc'
'connector.driver'='com.mysql.jdbc.Driver'
'connector.password'='xxx'
'connector.table'='ts_pf_sec_yldrate'
'connector.url'='jdbc:mysql://ip:port/db_base'
'connector.username'='xxx'
'table-name'='ts_pf_sec_yldrate'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at 
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
at 
org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at