Re: problem with the heartbeat interval feature

2024-05-18 Thread Hongshun Wang
  throw new DebeziumException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>> case "57P03":
>> // Postgres error 
>> cannot_connect_now, see 
>> https://www.postgresql.org/docs/12/errcodes-appendix.html
>> throw new RetriableException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>> default:
>> break;
>> }
>> }),
>> schemaNameAdjuster,
>> signalProcessor);
>>
>> Thus, I have create a new jira[4] to fix it.
>>
>>
>>
>>  [1]
>> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/
>>
>> [2]
>> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms
>>
>> [3]
>> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query
>>
>> [4] https://issues.apache.org/jira/browse/FLINK-35387
>>
>>
>> Best
>>
>> Hongshun
>>
>> On Thu, May 16, 2024 at 9:03 PM Thomas Peyric <
>> thomas.pey...@asklocala.com> wrote:
>>
>>> Hi Flink Community !
>>>
>>> I am using :
>>> * Flink
>>> * Flink CDC posgtres Connector
>>> * scala + sbt
>>>
>>> versions are :
>>>   * orgApacheKafkaVersion = "3.2.3"
>>>   * flinkVersion = "1.19.0"
>>>   * flinkKafkaVersion = "3.0.2-1.18"
>>>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>>>   * debeziumVersion = "1.9.8.Final"
>>>   * scalaVersion = "2.12.13"
>>>   * javaVersion = "11"
>>>
>>>
>>> the problem
>>> ---
>>>
>>> I have a problem with the heartbeat interval feature:
>>> * when I am querying PG with `select * from pg_replication_slots;` for
>>> checking if information are updated on each replication slots at defined
>>> interval
>>> * then confirmed_flush_lsn values are never updated
>>> PS: i have other replication slots managed directly with debezium
>>> (without flink) and their confirmed_flush_lsn values are updated correctly
>>> (same pg DB) depending of their own interval
>>>
>>> ```
>>>  slot_name  |  plugin  | slot_type |  datoid   |
>>> database  | temporary | active | active_pid | xmin | catalog_xmin |
>>> restart_lsn  | confirmed_flush_lsn
>>>
>>> +--+---+---+---+---+++--+--+--+-
>>>  slot_table1| pgoutput | logical   | 811518778 | the_db
>>>| f | t  |  10870 |  |   1630392036 |
>>> 712/697C0DB8 | 712/697C0DF0
>>>  slot_table2| pgoutput | logical   | 811518778 | the_db
>>>| f | t  |  10894 |  |   1630392033 |
>>> 712/697AD0A8 | 712/697AD0E0
>>>  slot_table3| pgoutput | logical   | 811518778 | the_db
>>>| f | t  |  10978 |  |   1630392034 |
>>> 712/697AD0A8 | 712/697AD0A8
>>> ```
>>>
>>>
>>>
>>> My setup
>>> 
>>>
>>> I have configured 3 distinct DataStreamSource on 3 pg database tables
>>> using this common method :
>>>
>>> ```
>>> private def initEntityDataSource(conf: Config, env:
>>> StreamExecutionEnvironment, entityName: String, columnList: String) = {
>>>
>>> val dbzProps: Properties = new Properties()
>>> dbzProps.setProperty("column.include.list", columnList)
>>>   // "public.tableX.column1,public.tableX.column2"
>>>
>>> val postgresIncrementalSource:
>>> PostgresSourceBuilder.PostgresIncrementalSource[String] =
>>> PostgresSourceBuilder.PostgresIncrementalSource.builder()
>>>   .hostname(conf.getString("pg.hostname"))
>>>   .port(conf.getInt("pg.port"))
>>>   .database(conf.getString("pg.database"))

Re: problem with the heartbeat interval feature

2024-05-17 Thread Thomas Peyric
 On Thu, May 16, 2024 at 9:03 PM Thomas Peyric 
> wrote:
>
>> Hi Flink Community !
>>
>> I am using :
>> * Flink
>> * Flink CDC posgtres Connector
>> * scala + sbt
>>
>> versions are :
>>   * orgApacheKafkaVersion = "3.2.3"
>>   * flinkVersion = "1.19.0"
>>   * flinkKafkaVersion = "3.0.2-1.18"
>>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>>   * debeziumVersion = "1.9.8.Final"
>>   * scalaVersion = "2.12.13"
>>   * javaVersion = "11"
>>
>>
>> the problem
>> ---
>>
>> I have a problem with the heartbeat interval feature:
>> * when I am querying PG with `select * from pg_replication_slots;` for
>> checking if information are updated on each replication slots at defined
>> interval
>> * then confirmed_flush_lsn values are never updated
>> PS: i have other replication slots managed directly with debezium
>> (without flink) and their confirmed_flush_lsn values are updated correctly
>> (same pg DB) depending of their own interval
>>
>> ```
>>  slot_name  |  plugin  | slot_type |  datoid   |
>> database  | temporary | active | active_pid | xmin | catalog_xmin |
>> restart_lsn  | confirmed_flush_lsn
>>
>> +--+---+---+---+---+++--+--+--+-
>>  slot_table1| pgoutput | logical   | 811518778 | the_db
>>  | f | t  |  10870 |  |   1630392036 |
>> 712/697C0DB8 | 712/697C0DF0
>>  slot_table2| pgoutput | logical   | 811518778 | the_db
>>  | f | t  |  10894 |  |   1630392033 |
>> 712/697AD0A8 | 712/697AD0E0
>>  slot_table3| pgoutput | logical   | 811518778 | the_db
>>  | f | t  |  10978 |  |   1630392034 |
>> 712/697AD0A8 | 712/697AD0A8
>> ```
>>
>>
>>
>> My setup
>> 
>>
>> I have configured 3 distinct DataStreamSource on 3 pg database tables
>> using this common method :
>>
>> ```
>> private def initEntityDataSource(conf: Config, env:
>> StreamExecutionEnvironment, entityName: String, columnList: String) = {
>>
>> val dbzProps: Properties = new Properties()
>> dbzProps.setProperty("column.include.list", columnList)
>> // "public.tableX.column1,public.tableX.column2"
>>
>> val postgresIncrementalSource:
>> PostgresSourceBuilder.PostgresIncrementalSource[String] =
>> PostgresSourceBuilder.PostgresIncrementalSource.builder()
>>   .hostname(conf.getString("pg.hostname"))
>>   .port(conf.getInt("pg.port"))
>>   .database(conf.getString("pg.database"))
>>   .username(conf.getString("pg.username"))
>>   .password(conf.getString("pg.password"))
>>   .slotName(conf.getString(s"flink.${entityName}.slot_name"))
>> // slot_tableX
>>   .decodingPluginName("pgoutput")
>>   .includeSchemaChanges(true)
>>   .deserializer(new JsonDebeziumDeserializationSchema())
>>   .closeIdleReaders(true)
>>   .heartbeatInterval(Duration.ofMillis(1))
>>  // <--// 10 seconds
>>   .connectTimeout(Duration.ofSeconds(10))
>> // 10 Seconds
>>   .startupOptions(StartupOptions.initial())
>>   .schemaList("public")
>>   .tableList("public." +
>> conf.getString(s"flink.${entityName}.table_name"))   // public.tableX
>>   .debeziumProperties(dbzProps)
>> // <--// dbzProps
>>   .build()
>>
>> env.fromSource(postgresIncrementalSource,
>> WatermarkStrategy.noWatermarks[String](), s"pg-projector-${entityName}")
>>   .setParallelism(1)
>>
>>   }
>> ```
>>
>> After that I have converted each DataStreamSource into Table
>> And I join those 3 Table and convert result into a DataStream[Row]
>>
>> On this new DataStream I do a keyBy for processing a custom
>> KeyedProcessFunction function
>>
>> All of this is working fine and do its job
>>
>> But heartbeat seems to not refresh values into
>> pg_replication_slots.confirmed_flush_lsn column
>>
>>
>> PS: I also try this :
>>
>> 1) instead of using the .heartbeatInterval() method to set the value of
>> interval ... i use debezium properties like this

Re: problem with the heartbeat interval feature

2024-05-16 Thread Hongshun Wang
Hi Thomas,

In debezium dos says: For the connector to detect and process events from a
heartbeat table, you must add the table to the PostgreSQL publication
specified by the publication.name
<https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name>
property.
If this publication predates your Debezium deployment, the connector uses
the publications as defined. If the publication is not already configured
to automatically replicate changes FOR ALL TABLES in the database, you must
explicitly add the heartbeat table to the publication[2].

Thus, if you want use heart beat in cdc:

   1. add a heartbeat table to publication: ALTER PUBLICATION
   ** ADD TABLE **;
   2. set heartbeatInterval
   3. add debezium.heartbeat.action.query
   
<https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query>
[3]

However, when I use it it CDC, some exception occurs:

Caused by: java.lang.NullPointerException
at 
io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94)




It seems CDC don't add  a HeartbeatConnectionProvider  when configure
PostgresEventDispatcher:

//org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
=
new PostgresEventDispatcher<>(
dbzConfig,
topicSelector,
schema,
queue,
dbzConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);


In debezium, when PostgresConnectorTask start, it will  do it

//io.debezium.connector.postgresql.PostgresConnectorTask#start  final
PostgresEventDispatcher dispatcher = new
PostgresEventDispatcher<>(
connectorConfig,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
PostgresChangeRecordEmitter::updateSchema,
metadataProvider,
connectorConfig.createHeartbeat(
topicNamingStrategy,
schemaNameAdjuster,
() -> new
PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
// Postgres error
admin_shutdown, see
https://www.postgresql.org/docs/12/errcodes-appendix.html
  throw new DebeziumException("Could not
execute heartbeat action query (Error: " + sqlErrorId + ")",
exception);
case "57P03":
// Postgres error
cannot_connect_now, see
https://www.postgresql.org/docs/12/errcodes-appendix.html
  throw new RetriableException("Could not
execute heartbeat action query (Error: " + sqlErrorId + ")",
exception);
default:
break;
}
}),
schemaNameAdjuster,
signalProcessor);

Thus, I have create a new jira[4] to fix it.



 [1]
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/

[2]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms

[3]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query

[4] https://issues.apache.org/jira/browse/FLINK-35387


Best

Hongshun

On Thu, May 16, 2024 at 9:03 PM Thomas Peyric 
wrote:

> Hi Flink Community !
>
> I am using :
> * Flink
> * Flink CDC posgtres Connector
> * scala + sbt
>
> versions are :
>   * orgApacheKafkaVersion = "3.2.3"
>   * flinkVersion = "1.19.0"
>   * flinkKafkaVersion = "3.0.2-1.18"
>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>   * debeziumVersion = "1.9.8.Final"
>   * scalaVersion = "2.12.13"
>   * javaVersion = "11"
>
>
> the problem
> ---
>
> I have a problem with the heartbeat interval feature:
> * w

problem with the heartbeat interval feature

2024-05-16 Thread Thomas Peyric
Hi Flink Community !

I am using :
* Flink
* Flink CDC posgtres Connector
* scala + sbt

versions are :
  * orgApacheKafkaVersion = "3.2.3"
  * flinkVersion = "1.19.0"
  * flinkKafkaVersion = "3.0.2-1.18"
  * flinkConnectorPostgresCdcVersion = "3.0.1"
  * debeziumVersion = "1.9.8.Final"
  * scalaVersion = "2.12.13"
  * javaVersion = "11"


the problem
---

I have a problem with the heartbeat interval feature:
* when I am querying PG with `select * from pg_replication_slots;` for
checking if information are updated on each replication slots at defined
interval
* then confirmed_flush_lsn values are never updated
PS: i have other replication slots managed directly with debezium (without
flink) and their confirmed_flush_lsn values are updated correctly (same pg
DB) depending of their own interval

```
 slot_name  |  plugin  | slot_type |  datoid   |
database  | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn  | confirmed_flush_lsn
+--+---+---+---+---+++--+--+--+-
 slot_table1| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10870 |  |   1630392036 |
712/697C0DB8 | 712/697C0DF0
 slot_table2| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10894 |  |   1630392033 |
712/697AD0A8 | 712/697AD0E0
 slot_table3| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10978 |  |   1630392034 |
712/697AD0A8 | 712/697AD0A8
```



My setup


I have configured 3 distinct DataStreamSource on 3 pg database tables using
this common method :

```
private def initEntityDataSource(conf: Config, env:
StreamExecutionEnvironment, entityName: String, columnList: String) = {

val dbzProps: Properties = new Properties()
dbzProps.setProperty("column.include.list", columnList)
// "public.tableX.column1,public.tableX.column2"

val postgresIncrementalSource:
PostgresSourceBuilder.PostgresIncrementalSource[String] =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
  .hostname(conf.getString("pg.hostname"))
  .port(conf.getInt("pg.port"))
  .database(conf.getString("pg.database"))
  .username(conf.getString("pg.username"))
  .password(conf.getString("pg.password"))
  .slotName(conf.getString(s"flink.${entityName}.slot_name"))
  // slot_tableX
  .decodingPluginName("pgoutput")
  .includeSchemaChanges(true)
  .deserializer(new JsonDebeziumDeserializationSchema())
  .closeIdleReaders(true)
  .heartbeatInterval(Duration.ofMillis(1))  //
<--// 10 seconds
  .connectTimeout(Duration.ofSeconds(10))
  // 10 Seconds
  .startupOptions(StartupOptions.initial())
  .schemaList("public")
  .tableList("public." +
conf.getString(s"flink.${entityName}.table_name"))   // public.tableX
  .debeziumProperties(dbzProps) //
<--// dbzProps
  .build()

env.fromSource(postgresIncrementalSource,
WatermarkStrategy.noWatermarks[String](), s"pg-projector-${entityName}")
  .setParallelism(1)

  }
```

After that I have converted each DataStreamSource into Table
And I join those 3 Table and convert result into a DataStream[Row]

On this new DataStream I do a keyBy for processing a custom
KeyedProcessFunction function

All of this is working fine and do its job

But heartbeat seems to not refresh values into
pg_replication_slots.confirmed_flush_lsn column


PS: I also try this :

1) instead of using the .heartbeatInterval() method to set the value of
interval ... i use debezium properties like this

```
dbzProps.setProperty("heartbeat.interval.ms", "1")// and also
"PT10S"
```

it seems there is no effect with this

2) it seems that debezium needs to create a kafka topic for managing
heartbeat. In theory, If the topic does not exist it will be automaitcally
created
But my kafka server does not authorize this auto creation ... so i create
this topic mannually with this name :
`__flink-heartbeat.postgres_cdc_source`

i also add this dbzProps for setting the good topic prefix

```
dbzProps.setProperty("topic.heartbeat.prefix", "__flink-heartbeat")
```

it seems there is no effect with this too




So ... Do you have any ideas ?

Thanks,

Thomas

-- 




You received this electronic message as part of a business or 
employment relationship with one or several Ask Locala entities. Its 
content is strictly confidential and is covered by the obligation of 
confidentiality and business secrecy. Any dissemination, co