[ https://issues.apache.org/jira/browse/FLINK-30809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706868#comment-17706868 ]
Martijn Visser commented on FLINK-30809: ---------------------------------------- How have you determined that the problem is the Elasticsearch sink, and not the MySQL CDC source? Is updated record in MySQL actually visible in your test? > flink-connector-elasticsearch7 updates data pipeline does not work > ------------------------------------------------------------------ > > Key: FLINK-30809 > URL: https://issues.apache.org/jira/browse/FLINK-30809 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch > Affects Versions: elasticsearch-3.0.0 > Environment: Flink Version: 1.15.3 > Flink-CDC Version: 2.3.0 > Mysql Version: 5.7 > Elasticsearch Version: 7.17.7 > During the test, these jar packages were added under flink/lib: > flink-sql-connector-elasticsearch7-1.15.3.jar > flink-sql-connector-mysql-cdc-2.3.0.jar > mysql-connector-java-8.0.27.jar > Reporter: iduanyingjie > Priority: Major > > create elasticsearch in docker > {code:yaml} > version: '2.1' > services: > elasticsearch: > image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7 > environment: > - cluster.name=docker-cluster > - bootstrap.memory_lock=true > - "ES_JAVA_OPTS=-Xms512m -Xmx512m" > - discovery.type=single-node > ports: > - "9200:9200" > - "9300:9300" > ulimits: > memlock: > soft: -1 > hard: -1 > nofile: > soft: 65536 > hard: 65536 > kibana: > image: docker.elastic.co/kibana/kibana:7.17.7 > ports: > - "5601:5601" > {code} > create table: records in mysql > {code:sql} > CREATE TABLE records ( > id bigint unsigned NOT NULL AUTO_INCREMENT, > user_id bigint unsigned NOT NULL, > create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, > PRIMARY KEY (id) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; > {code} > insert some datas > {code:sql} > INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, > '2023-01-20 12:25:11'); > INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, > '2023-01-20 12:25:30'); > INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, > '2023-01-20 12:25:37'); > {code} > create pipeline in es: > {code:java} > PUT /_ingest/pipeline/set_ingest_timestamp_fields > { > "processors": [ > { > "set": { > "field": "ingest_timestamp", > "value": "{{_ingest.timestamp}}" > } > } > ] > }{code} > create index in es: > {code:java} > PUT enriched_records > { > "settings": { > "default_pipeline": "set_ingest_timestamp_fields", > "number_of_shards": "1", > "number_of_replicas": "0" > } > }{code} > excute flink sql: > {code:sql} > CREATE TABLE records ( > id INT, > user_id INT, > create_time TIMESTAMP(3), > proc_time AS PROCTIME(), > operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '123456', > 'database-name' = 'test', > 'table-name' = 'records', > 'server-time-zone' = 'UTC' > ); > CREATE TABLE enriched_records ( > id INT, > user_id INT, > create_time TIMESTAMP(3), > proc_time TIMESTAMP_LTZ(3), > operation_time TIMESTAMP_LTZ(3), > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'http://localhost:9200', > 'index' = 'enriched_records' > ); > INSERT INTO enriched_records > SELECT > o.id, > o.user_id, > o.create_time, > o.proc_time, > o.operation_time > FROM records AS o; > {code} > We query the data in Elasticsearch use GET /enriched_records/_search and we > find that each record has an ingest_timestamp field and the value is the > recent time. > {code:json} > { > "_index":"enriched_records", > "_type":"_doc", > "_id":"3", > "_score":1, > "_source":{ > "operation_time":"1970-01-01 00:00:00Z", > "create_time":"2023-01-20 12:25:37", > "user_id":789, > "ingest_timestamp":"2023-01-28T05:21:40.539754251Z", > "id":3, > "proc_time":"2023-01-28 05:21:40.233Z" > } > } {code} > When we modify a record in MySQL, the value of the ingest_timestamp field > does not change, and it seems that the pipeline set for this index is not > working at this moment. > {code:json} > { > "_index":"enriched_records", > "_type":"_doc", > "_id":"3", > "_score":1, > "_source":{ > "operation_time":"2023-01-28 05:25:05Z", > "create_time":"2023-01-20 12:25:37", > "user_id":987, > "ingest_timestamp":"2023-01-28T05:21:40.539754251Z", > "id":3, > "proc_time":"2023-01-28 05:25:05.529Z" > } > } > {code} > If we directly modify a field in Elasticsearch, we can find that the value of > the ingest_timestamp field will change. > -- This message was sent by Atlassian Jira (v8.20.10#820010)