Hi Flink Community !
I am using :
* Flink
* Flink CDC posgtres Connector
* scala + sbt
versions are :
* orgApacheKafkaVersion = "3.2.3"
* flinkVersion = "1.19.0"
* flinkKafkaVersion = "3.0.2-1.18"
* flinkConnectorPostgresCdcVersion = "3.0.1"
* debeziumVersion = "1.9.8.Final"
* scalaVersion = "2.12.13"
* javaVersion = "11"
the problem
-----------
I have a problem with the heartbeat interval feature:
* when I am querying PG with `select * from pg_replication_slots;` for
checking if information are updated on each replication slots at defined
interval
* then confirmed_flush_lsn values are never updated
PS: i have other replication slots managed directly with debezium (without
flink) and their confirmed_flush_lsn values are updated correctly (same pg
DB) depending of their own interval
```
slot_name | plugin | slot_type | datoid |
database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn | confirmed_flush_lsn
----------------------------+----------+-----------+-----------+-------------------+-----------+--------+------------+------+--------------+--------------+---------------------
slot_table1 | pgoutput | logical | 811518778 | the_db
| f | t | 10870 | | 1630392036 |
712/697C0DB8 | 712/697C0DF0
slot_table2 | pgoutput | logical | 811518778 | the_db
| f | t | 10894 | | 1630392033 |
712/697AD0A8 | 712/697AD0E0
slot_table3 | pgoutput | logical | 811518778 | the_db
| f | t | 10978 | | 1630392034 |
712/697AD0A8 | 712/697AD0A8
```
My setup
--------
I have configured 3 distinct DataStreamSource on 3 pg database tables using
this common method :
```
private def initEntityDataSource(conf: Config, env:
StreamExecutionEnvironment, entityName: String, columnList: String) = {
val dbzProps: Properties = new Properties()
dbzProps.setProperty("column.include.list", columnList)
// "public.tableX.column1,public.tableX.column2"
val postgresIncrementalSource:
PostgresSourceBuilder.PostgresIncrementalSource[String] =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
.hostname(conf.getString("pg.hostname"))
.port(conf.getInt("pg.port"))
.database(conf.getString("pg.database"))
.username(conf.getString("pg.username"))
.password(conf.getString("pg.password"))
.slotName(conf.getString(s"flink.${entityName}.slot_name"))
// slot_tableX
.decodingPluginName("pgoutput")
.includeSchemaChanges(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.closeIdleReaders(true)
.heartbeatInterval(Duration.ofMillis(10000)) //
<-- // 10 seconds
.connectTimeout(Duration.ofSeconds(10))
// 10 Seconds
.startupOptions(StartupOptions.initial())
.schemaList("public")
.tableList("public." +
conf.getString(s"flink.${entityName}.table_name")) // public.tableX
.debeziumProperties(dbzProps) //
<-- // dbzProps
.build()
env.fromSource(postgresIncrementalSource,
WatermarkStrategy.noWatermarks[String](), s"pg-projector-${entityName}")
.setParallelism(1)
}
```
After that I have converted each DataStreamSource into Table
And I join those 3 Table and convert result into a DataStream[Row]
On this new DataStream I do a keyBy for processing a custom
KeyedProcessFunction function
All of this is working fine and do its job
But heartbeat seems to not refresh values into
pg_replication_slots.confirmed_flush_lsn column
PS: I also try this :
1) instead of using the .heartbeatInterval() method to set the value of
interval ... i use debezium properties like this
```
dbzProps.setProperty("heartbeat.interval.ms", "10000") // and also
"PT10S"
```
it seems there is no effect with this
2) it seems that debezium needs to create a kafka topic for managing
heartbeat. In theory, If the topic does not exist it will be automaitcally
created
But my kafka server does not authorize this auto creation ... so i create
this topic mannually with this name :
`__flink-heartbeat.postgres_cdc_source`
i also add this dbzProps for setting the good topic prefix
```
dbzProps.setProperty("topic.heartbeat.prefix", "__flink-heartbeat")
```
it seems there is no effect with this too
So ... Do you have any ideas ?
Thanks,
Thomas
--
You received this electronic message as part of a business or
employment relationship with one or several Ask Locala entities. Its
content is strictly confidential and is covered by the obligation of
confidentiality and business secrecy. Any dissemination, copying, printing
distribution, retention or use of the message’s content or any attachments
that could be detrimental to Ask Locala is forbidden, even if it was
forwarded by mailing lists.
If you are not the intended recipient, please
notify the sender of the error without delay and delete permanently this
email and any files from your system and destroy any printed copies.