Re: problem with the heartbeat interval feature

2024-05-18 Thread Hongshun Wang
Hi Thomas,

I have reviewed the code and just
noticed that heartbeat.action.query is not mandatory. Debezium will
generate Heartbeat Events at regular intervals. Flink CDC will then
receive these Heartbeat Events and advance the offset[1]. Finally, the
source
reader
will commit the offset during checkpointing in the streaming phase[2].

Therefore, you may want to verify whether checkpointing is enabled and
if the process has entered the streaming phase (indicating that it is
only reading the WAL log).

[1]
https://github.com/apache/flink-cdc/blob/d386c7c1c2db3bb8590be6c20198286cf0567c97/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L119

[2]
https://github.com/apache/flink-cdc/blob/d386c7c1c2db3bb8590be6c20198286cf0567c97/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReaderWithCommit.java#L93

On Sat, May 18, 2024 at 12:34 AM Thomas Peyric 
wrote:

> thanks Hongshun for your response !
>
> Le ven. 17 mai 2024 à 07:51, Hongshun Wang  a
> écrit :
>
>> Hi Thomas,
>>
>> In debezium dos says: For the connector to detect and process events from
>> a heartbeat table, you must add the table to the PostgreSQL publication
>> specified by the publication.name
>> 
>>  property.
>> If this publication predates your Debezium deployment, the connector uses
>> the publications as defined. If the publication is not already configured
>> to automatically replicate changes FOR ALL TABLES in the database, you
>> must explicitly add the heartbeat table to the publication[2].
>>
>> Thus, if you want use heart beat in cdc:
>>
>>1. add a heartbeat table to publication: ALTER PUBLICATION
>>** ADD TABLE **;
>>2. set heartbeatInterval
>>3. add debezium.heartbeat.action.query
>>
>> 
>> [3]
>>
>> However, when I use it it CDC, some exception occurs:
>>
>> Caused by: java.lang.NullPointerException
>> at 
>> io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
>> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
>> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94)
>>
>>
>>
>>
>> It seems CDC don't add  a HeartbeatConnectionProvider  when configure
>> PostgresEventDispatcher:
>>
>> //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
>>  =
>> new PostgresEventDispatcher<>(
>> dbzConfig,
>> topicSelector,
>> schema,
>> queue,
>> dbzConfig.getTableFilters().dataCollectionFilter(),
>> DataChangeEvent::new,
>> metadataProvider,
>> schemaNameAdjuster);
>>
>>
>> In debezium, when PostgresConnectorTask start, it will  do it
>>
>> //io.debezium.connector.postgresql.PostgresConnectorTask#start  final 
>> PostgresEventDispatcher dispatcher = new PostgresEventDispatcher<>(
>> connectorConfig,
>> topicNamingStrategy,
>> schema,
>> queue,
>> connectorConfig.getTableFilters().dataCollectionFilter(),
>> DataChangeEvent::new,
>> PostgresChangeRecordEmitter::updateSchema,
>> metadataProvider,
>> connectorConfig.createHeartbeat(
>> topicNamingStrategy,
>> schemaNameAdjuster,
>> () -> new 
>> PostgresConnection(connectorConfig.getJdbcConfig(), 
>> PostgresConnection.CONNECTION_GENERAL),
>> exception -> {
>> String sqlErrorId = exception.getSQLState();
>> switch (sqlErrorId) {
>> case "57P01":
>> // Postgres error admin_shutdown, 
>> see https://www.postgresql.org/docs/12/errcodes-appendix.html
>> throw new DebeziumException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>> case "57P03":
>> // Postgres error 
>> cannot_connect_now, see 
>> https://www.postgresql.org/docs/12/errcodes-appendix.html
>> throw new RetriableException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>> 

Email submission

2024-05-18 Thread Michas Szacillo (BLOOMBERG/ 919 3RD A)
Sending my email to join the apache user mailing list. 

Email: mszaci...@bloomberg.net

Advice Needed: Setting Up Prometheus and Grafana Monitoring for Apache Flink on Kubernetes

2024-05-18 Thread Oliver Schmied
Dear Apache Flink Community,

I am currently trying to monitor an Apache Flink cluster deployed on Kubernetes using Prometheus and Grafana. Despite following the official guide (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/metrics-logging/)  on how to setup prometheus I have not been able to get Flink-specific metrics to appear in Prometheus. I am reaching out to seek your assistance, as I`ve tried many things but nothing worked.

 

# My setup:

* Kubernetes

* flink v.18 deployed as FlinkDeployment

with this manifest:

```apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: default
  name: flink-cluster
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    #Added
    kubernetes.operator.metrics.reporter.prommetrics.reporters: prom
    kubernetes.operator.metrics.reporter.prommetrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    kubernetes.operator.metrics.reporter.prom.port: 9249-9250
  serviceAccount: flink
  jobManager:
    resource:
  memory: "1048m"
  cpu: 1
  taskManager:
    resource:
  memory: "1048m"
  cpu: 1

```

* Prometheus operator install via

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack

 

* deployed a pod-monitor.yaml

```

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: flink-kubernetes-operator
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
  app: flink-cluster
  podMetricsEndpoints:
  - port: metrics

 

```

 

# The problem

 

* I can access prometheus fine and concerning the logs of the pod-monitor, it seems to collect flink specific metrics, but I can't access these metrics with flink

* Do I even setup prometheus correctly in my flink deployment manifest?


* I also added the following line to my values.yaml file, but apart from that I change nothing:

```

metrics:
  port: 


```

 

# My questions

 

* Can anyone see the mistake in my deployment?

* Or does anyone have a better idea on how to monitor my flink deployment?

 

 

I would be very grateful for your answers. Thank you very much.

 

Best regards,

Oliver