Re: flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 Leonard Xu
Hello

1. 使用CDC功能的话请用1.11.1,该版本修复了一个CDC的bug[1]
2. 另外你这个异常栈是没有找到对应的 connector jar,确认下用的是 
flink-sql-connector-elasticsearch6_2.11-1.11.0 这个jar.

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-18461 


> 在 2020年8月12日,13:31,jindy_liu <286729...@qq.com> 写道:
> 
> 建表如下:
> 
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED 
> ) WITH (
> 'connector'='kafka',
> 'topic'='test',
> 'properties.group.id'='c_mysql_binlog_es',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='latest-offset',
> 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true'
> );
> 
> 
> # 输出表至es
> CREATE TABLE test_mirror_es (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED 
> ) WITH (
>  'connector' = 'elasticsearch-7',
>  'hosts' = 'http://localhost:9200',
>  'index' = 'test_mirror'
> );
> 
> INSERT into test_mirror_es SELECT * from test where test.id >=0;
> 
> 日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a source for reading table
> 'default_catalog.default_database.test_mirror_es'.
> 
> 完整日志如下:
> 
> 
> 2020-08-12 13:07:20,815 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 10
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 5
> 2020-08-12 13:07:20,821 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: state.savepoints.dir,
> hdfs://localhost:9000/flink-1.11.0/flink-savepoints
> 2020-08-12 13:07:20,821 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-08-12 13:07:21,198 INFO 
> org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
> 'execution.restart-strategy.type' not specified. Using default value:
> fallback
> 2020-08-12 13:07:22,099 INFO 
> org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
> config: {taskmanager.memory.process.size=1728m,
> jobmanager.execution.failover-strategy=region,
> jobmanager.rpc.address=localhost, execution.target=remote,
> jobmanager.memory.process.size=1600m,
> state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
> jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
> execution.attached=true, execution.shutdown-on-attached-exit=false,
> pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
> parallelism.default=5, taskmanager.numberOfTaskSlots=10,
> pipeline.classpaths=[]}
> 2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient 
> 
> [] - Command history file path: /root/.flink-sql-history
> 2020-08-12 13:07:46,637 INFO 
> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
> job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
> default: INSERT into test_mirror_es SELECT * from test where id >0`
> 2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration
> 
> [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
> of key 'rest.address'
> 2020-08-12 13:10:17,512 INFO 
> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
> job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
> default: INSERT into test_mirror_es SELECT * from test where id >0`
> 2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration
> 
> [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
> of key 'rest.address'
> 2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient 
> 
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
> statement.
>   at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
>   at

flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 jindy_liu
建表如下:

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
 'connector'='kafka',
 'topic'='test',
 'properties.group.id'='c_mysql_binlog_es',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='latest-offset',
 'format'='canal-json',
 'canal-json.ignore-parse-errors'='true'
);


# 输出表至es
CREATE TABLE test_mirror_es (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'test_mirror'
);

INSERT into test_mirror_es SELECT * from test where test.id >=0;

日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
create a source for reading table
'default_catalog.default_database.test_mirror_es'.

完整日志如下:


2020-08-12 13:07:20,815 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 10
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: parallelism.default, 5
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.savepoints.dir,
hdfs://localhost:9000/flink-1.11.0/flink-savepoints
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-08-12 13:07:21,198 INFO 
org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2020-08-12 13:07:22,099 INFO 
org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
config: {taskmanager.memory.process.size=1728m,
jobmanager.execution.failover-strategy=region,
jobmanager.rpc.address=localhost, execution.target=remote,
jobmanager.memory.process.size=1600m,
state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
parallelism.default=5, taskmanager.numberOfTaskSlots=10,
pipeline.classpaths=[]}
2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient   
  
[] - Command history file path: /root/.flink-sql-history
2020-08-12 13:07:46,637 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration  
  
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:17,512 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration  
  
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient   
  
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
statement.
at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at