Re: problem with the heartbeat interval feature

2024-05-16 Thread Hongshun Wang
Hi Thomas,

In debezium dos says: For the connector to detect and process events from a
heartbeat table, you must add the table to the PostgreSQL publication
specified by the publication.name

property.
If this publication predates your Debezium deployment, the connector uses
the publications as defined. If the publication is not already configured
to automatically replicate changes FOR ALL TABLES in the database, you must
explicitly add the heartbeat table to the publication[2].

Thus, if you want use heart beat in cdc:

   1. add a heartbeat table to publication: ALTER PUBLICATION
   ** ADD TABLE **;
   2. set heartbeatInterval
   3. add debezium.heartbeat.action.query
   

[3]

However, when I use it it CDC, some exception occurs:

Caused by: java.lang.NullPointerException
at 
io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94)




It seems CDC don't add  a HeartbeatConnectionProvider  when configure
PostgresEventDispatcher:

//org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
=
new PostgresEventDispatcher<>(
dbzConfig,
topicSelector,
schema,
queue,
dbzConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);


In debezium, when PostgresConnectorTask start, it will  do it

//io.debezium.connector.postgresql.PostgresConnectorTask#start  final
PostgresEventDispatcher dispatcher = new
PostgresEventDispatcher<>(
connectorConfig,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
PostgresChangeRecordEmitter::updateSchema,
metadataProvider,
connectorConfig.createHeartbeat(
topicNamingStrategy,
schemaNameAdjuster,
() -> new
PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
// Postgres error
admin_shutdown, see
https://www.postgresql.org/docs/12/errcodes-appendix.html
  throw new DebeziumException("Could not
execute heartbeat action query (Error: " + sqlErrorId + ")",
exception);
case "57P03":
// Postgres error
cannot_connect_now, see
https://www.postgresql.org/docs/12/errcodes-appendix.html
  throw new RetriableException("Could not
execute heartbeat action query (Error: " + sqlErrorId + ")",
exception);
default:
break;
}
}),
schemaNameAdjuster,
signalProcessor);

Thus, I have create a new jira[4] to fix it.



 [1]
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/

[2]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms

[3]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query

[4] https://issues.apache.org/jira/browse/FLINK-35387


Best

Hongshun

On Thu, May 16, 2024 at 9:03 PM Thomas Peyric 
wrote:

> Hi Flink Community !
>
> I am using :
> * Flink
> * Flink CDC posgtres Connector
> * scala + sbt
>
> versions are :
>   * orgApacheKafkaVersion = "3.2.3"
>   * flinkVersion = "1.19.0"
>   * flinkKafkaVersion = "3.0.2-1.18"
>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>   * debeziumVersion = "1.9.8.Final"
>   * scalaVersion = "2.12.13"
>   * javaVersion = "11"
>
>
> the problem
> ---
>
> I have a problem with the heartbeat interval feature:
> * when I am querying PG with `select * from pg_replication_slots;` for
> checking if information are updated on each replication slots at defined
> interval
> * then confirmed_flush_lsn values are never updated
> PS: i have other 

Re: SSL Kafka PyFlink

2024-05-16 Thread gongzhongqiang
Hi Phil,

The kafka configuration keys of ssl maybe not correct. You can refer the
kafka document[1] to get the ssl configurations of client.


[1] https://kafka.apache.org/documentation/#security_configclients


Best,
Zhongqiang Gong

Phil Stavridis  于2024年5月17日周五 01:44写道:

> Hi,
>
> I have a PyFlink job that needs to read from a Kafka topic and the
> communication with the Kafka broker requires SSL.
> I have connected to the Kafka cluster with something like this using just
> Python.
>
> from confluent_kafka import Consumer, KafkaException, KafkaError
>
>
>
> def get_config(bootstrap_servers, ca_file, cert_file, key_file):
> config = {
> 'bootstrap.servers': bootstrap_servers,
> 'security.protocol': 'SSL',
> 'ssl.ca.location': ca_file,
> 'ssl.certificate.location': cert_file,
> 'ssl.key.location': key_file,
> 'ssl.endpoint.identification.algorithm': 'none',
> 'enable.ssl.certificate.verification': 'false',
> 'group.id': ‘my_group_id'
> }
>
>
> return config
>
>
>
> And have read messages from the Kafka topic.
>
> I am trying to set up something similar with Flink SQL:
>
> t_env.execute_sql(f"""
> CREATE TABLE logs (
> `user` ROW(`user_id` BIGINT),
> `timestamp` ROW(`secs` BIGINT)
> ) WITH (
> 'connector' = '{CONNECTOR_TYPE}',
> 'topic' = ‘{KAFKA_TOPIC}',
> 'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
> 'properties.group.id' = '{CONSUMER_GROUP}',
> 'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
> 'format' = '{MESSAGE_FORMAT}',
> 'properties.security.protocol' = 'SSL',
> 'properties.ssl.ca.location' = '{ca_file}',
> 'properties.ssl.certificate.location' = '{cert_file}',
> 'properties.ssl.key.location' = '{key_file}',
> 'properties.ssl.endpoint.identification.algorithm' = ''
> )
> """)
>
>
> But when this runs I am getting this error:
>
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered
> by OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]'
> (operator cbc357ccb763df2852fee8c4fc7d55f2).
> ...
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list
> subscribed topic partitions due to
> at
> ...
> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
> [logs].
> at
> ...
> ... 3 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
> SSL handshake failed
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at
> org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
> ... 10 more
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
> SSL handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> ...
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
> at
> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
> at sun.security.validator.Validator.validate(Validator.java:271)
> at
> sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
> at
> 

Re: [EXTERNAL]Re: Flink kafka connector for v 1.19.0

2024-05-16 Thread Hang Ruan
Hi, Niklas.

The kafka connector version 3.2.0[1] is for Flink 1.19 and it has a vote
thread[2] already. But there is not enough votes,

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-35138
[2] https://lists.apache.org/thread/7shs2wzb0jkfdyst3mh6d9pn3z1bo93c

Niklas Wilcke  于2024年5月16日周四 22:04写道:

> Hi Ahmed,
>
> are you aware of a blocker? I'm also a bit confused that after Flink 1.19
> being available for a month now the connectors still aren't. It would be
> great to get some insights or maybe a reference to an issue. From looking
> at the Github repos and the Jira I wasn't able to spot something obvious
> telling me that this matter is really in the focus. Thank you!
>
> Regards,
> Niklas
>
>
> On 10. May 2024, at 20:10, Ahmed Hamdy  wrote:
>
> Hi Aniket
>
> The community is currently working on releasing a new version for all the
> connectors that is compatible with 1.19. Please follow the announcements in
> Flink website[1] to get notified when it is available.
>
> 1-https://flink.apache.org/posts/
> Best Regards
> Ahmed Hamdy
>
>
> On Fri, 10 May 2024 at 18:14, Aniket Sule 
> wrote:
>
>> Hello,
>>
>> On the Flink downloads page, the latest stable version is Flink 1.19.0.
>> However, the Flink Kafka connector is v 3.1.0, that is compatible with
>> 1.18.x.
>>
>> Is there a timeline when the Kafka connector for v 1.19 will be released?
>> Is it possible to use the v3.1.0 connector with Flink v 1.19?
>>
>>
>>
>> Thanks and regards,
>>
>> Aniket Sule
>> Caution: External email. Do not click or open attachments unless you know
>> and trust the sender.
>>
>
>


Re: monitoring message latency for flink sql app

2024-05-16 Thread Hang Ruan
Hi, mete.

As Feng Jin said, I think you could make use of the metric `
currentEmitEventTimeLag`.
Besides that, if you develop your job with the DataStream API, you could
add a new operator to handle it by yourself.

Best,
Hang

Feng Jin  于2024年5月17日周五 02:44写道:

> Hi Mete
>
> You can refer to the metrics provided by the Kafka source connector.
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka//#monitoring
>
> Best,
> Feng
>
> On Thu, May 16, 2024 at 7:55 PM mete  wrote:
>
>> Hello,
>>
>> For an sql application using kafka as source (and kafka as sink) what
>> would be the recommended way to monitor for processing delay? For example,
>> i want to be able to alert if the app has a certain delay compared to some
>> event time field in the message.
>>
>> Best,
>> Mete
>>
>>
>>


Re: Flink 1.18.1 ,重启状态恢复

2024-05-16 Thread Yanfei Lei
看起来和 FLINK-34063 / FLINK-33863 是同样的问题,您可以升级到1.18.2 试试看。
[1] https://issues.apache.org/jira/browse/FLINK-33863
[2] https://issues.apache.org/jira/browse/FLINK-34063

陈叶超  于2024年5月16日周四 16:38写道:
>
> 升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
> 2024-04-09 13:03:48
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for 
> RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
> the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore operator state backend
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.IOException: invalid stream header
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
> at 
> org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
> at 
> org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
> at 
> org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
> at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
> ... 18 more
>


-- 
Best,
Yanfei


What is the best way to aggregate data over a long window

2024-05-16 Thread Sachin Mittal
Hi,
My pipeline step is something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.reduce(new DataReducer())
.name("reduce");


This works fine for secs = 300.
However once I increase the time window to say 1 hour or 3600 the state
size increases as now it has a lot more records to reduce.

Hence I need to allocate much more memory to the task manager.

However there is no upper limit to this memory allocated. If the volume of
data increases by say 10 fold I would have no option but to again increase
the memory.

Is there a better way to perform long window aggregation so overall this
step has a small memory footprint.

Thanks
Sachin


Re: monitoring message latency for flink sql app

2024-05-16 Thread Feng Jin
Hi Mete

You can refer to the metrics provided by the Kafka source connector.

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka//#monitoring

Best,
Feng

On Thu, May 16, 2024 at 7:55 PM mete  wrote:

> Hello,
>
> For an sql application using kafka as source (and kafka as sink) what
> would be the recommended way to monitor for processing delay? For example,
> i want to be able to alert if the app has a certain delay compared to some
> event time field in the message.
>
> Best,
> Mete
>
>
>


RE: monitoring message latency for flink sql app

2024-05-16 Thread Anton Sidorov
Hello mete.

I found this SO article
https://stackoverflow.com/questions/54293808/measuring-event-time-latency-with-flink-cep

If I'm not mistake, you can use Flink metrics system for operators and get
time of processing event in operator.

On 2024/05/16 11:54:44 mete wrote:

> Hello,

>

> For an sql application using kafka as source (and kafka as sink) what
would

> be the recommended way to monitor for processing delay? For example, i
want

> to be able to alert if the app has a certain delay compared to some event

> time field in the message.

>

> Best,

> Mete

>


problem with the heartbeat interval feature

2024-05-16 Thread Thomas Peyric
Hi Flink Community !

I am using :
* Flink
* Flink CDC posgtres Connector
* scala + sbt

versions are :
  * orgApacheKafkaVersion = "3.2.3"
  * flinkVersion = "1.19.0"
  * flinkKafkaVersion = "3.0.2-1.18"
  * flinkConnectorPostgresCdcVersion = "3.0.1"
  * debeziumVersion = "1.9.8.Final"
  * scalaVersion = "2.12.13"
  * javaVersion = "11"


the problem
---

I have a problem with the heartbeat interval feature:
* when I am querying PG with `select * from pg_replication_slots;` for
checking if information are updated on each replication slots at defined
interval
* then confirmed_flush_lsn values are never updated
PS: i have other replication slots managed directly with debezium (without
flink) and their confirmed_flush_lsn values are updated correctly (same pg
DB) depending of their own interval

```
 slot_name  |  plugin  | slot_type |  datoid   |
database  | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn  | confirmed_flush_lsn
+--+---+---+---+---+++--+--+--+-
 slot_table1| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10870 |  |   1630392036 |
712/697C0DB8 | 712/697C0DF0
 slot_table2| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10894 |  |   1630392033 |
712/697AD0A8 | 712/697AD0E0
 slot_table3| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10978 |  |   1630392034 |
712/697AD0A8 | 712/697AD0A8
```



My setup


I have configured 3 distinct DataStreamSource on 3 pg database tables using
this common method :

```
private def initEntityDataSource(conf: Config, env:
StreamExecutionEnvironment, entityName: String, columnList: String) = {

val dbzProps: Properties = new Properties()
dbzProps.setProperty("column.include.list", columnList)
// "public.tableX.column1,public.tableX.column2"

val postgresIncrementalSource:
PostgresSourceBuilder.PostgresIncrementalSource[String] =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
  .hostname(conf.getString("pg.hostname"))
  .port(conf.getInt("pg.port"))
  .database(conf.getString("pg.database"))
  .username(conf.getString("pg.username"))
  .password(conf.getString("pg.password"))
  .slotName(conf.getString(s"flink.${entityName}.slot_name"))
  // slot_tableX
  .decodingPluginName("pgoutput")
  .includeSchemaChanges(true)
  .deserializer(new JsonDebeziumDeserializationSchema())
  .closeIdleReaders(true)
  .heartbeatInterval(Duration.ofMillis(1))  //
<--// 10 seconds
  .connectTimeout(Duration.ofSeconds(10))
  // 10 Seconds
  .startupOptions(StartupOptions.initial())
  .schemaList("public")
  .tableList("public." +
conf.getString(s"flink.${entityName}.table_name"))   // public.tableX
  .debeziumProperties(dbzProps) //
<--// dbzProps
  .build()

env.fromSource(postgresIncrementalSource,
WatermarkStrategy.noWatermarks[String](), s"pg-projector-${entityName}")
  .setParallelism(1)

  }
```

After that I have converted each DataStreamSource into Table
And I join those 3 Table and convert result into a DataStream[Row]

On this new DataStream I do a keyBy for processing a custom
KeyedProcessFunction function

All of this is working fine and do its job

But heartbeat seems to not refresh values into
pg_replication_slots.confirmed_flush_lsn column


PS: I also try this :

1) instead of using the .heartbeatInterval() method to set the value of
interval ... i use debezium properties like this

```
dbzProps.setProperty("heartbeat.interval.ms", "1")// and also
"PT10S"
```

it seems there is no effect with this

2) it seems that debezium needs to create a kafka topic for managing
heartbeat. In theory, If the topic does not exist it will be automaitcally
created
But my kafka server does not authorize this auto creation ... so i create
this topic mannually with this name :
`__flink-heartbeat.postgres_cdc_source`

i also add this dbzProps for setting the good topic prefix

```
dbzProps.setProperty("topic.heartbeat.prefix", "__flink-heartbeat")
```

it seems there is no effect with this too




So ... Do you have any ideas ?

Thanks,

Thomas

-- 




You received this electronic message as part of a business or 
employment relationship with one or several Ask Locala entities. Its 
content is strictly confidential and is covered by the obligation of 
confidentiality and business secrecy. Any dissemination, copying, printing 
distribution, retention or use of the message’s content or any attachments 
that could be detrimental to Ask Locala is forbidden, even if it was 
forwarded by mailing lists. 
If you are not the intended recipient, please 
notify the sender of 

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread John Smith
Hi. No I have not changed the protocol.

On Thu, May 16, 2024, 3:20 AM Biao Geng  wrote:

> Hi John,
>
> Just want to check, have you ever changed the kafka protocol in your job
> after using the new cluster? The error message shows that it is caused by
> the kafka client and there is a similar error in this issue
> 
> .
>
> Best,
> Biao Geng
>
>
> John Smith  于2024年5月16日周四 09:01写道:
>
>> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
>> difference using Java 11 and it seems after a week of usage the below
>> exception happens.
>>
>> The task manager is...
>>
>> 32GB total
>>
>> And i have the ONLY following memory settings
>>
>> taskmanager.memory.flink.size: 16384m
>> taskmanager.memory.jvm-metaspace.size: 3072m
>>
>>
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ... 1 more
>>
>


Re: [EXTERNAL]Re: Flink kafka connector for v 1.19.0

2024-05-16 Thread Niklas Wilcke
Hi Ahmed,

are you aware of a blocker? I'm also a bit confused that after Flink 1.19 being 
available for a month now the connectors still aren't. It would be great to get 
some insights or maybe a reference to an issue. From looking at the Github 
repos and the Jira I wasn't able to spot something obvious telling me that this 
matter is really in the focus. Thank you!

Regards,
Niklas


> On 10. May 2024, at 20:10, Ahmed Hamdy  wrote:
> 
> Hi Aniket
> 
> The community is currently working on releasing a new version for all the 
> connectors that is compatible with 1.19. Please follow the announcements in 
> Flink website[1] to get notified when it is available.
> 
> 1-https://flink.apache.org/posts/
> Best Regards
> Ahmed Hamdy
> 
> 
> On Fri, 10 May 2024 at 18:14, Aniket Sule  > wrote:
>> Hello,
>> 
>> On the Flink downloads page, the latest stable version is Flink 1.19.0. 
>> However, the Flink Kafka connector is v 3.1.0, that is compatible with 
>> 1.18.x.
>> 
>> Is there a timeline when the Kafka connector for v 1.19 will be released? Is 
>> it possible to use the v3.1.0 connector with Flink v 1.19?
>> 
>>  
>> 
>> Thanks and regards,
>> 
>> Aniket Sule
>> 
>> Caution: External email. Do not click or open attachments unless you know 
>> and trust the sender. 



smime.p7s
Description: S/MIME cryptographic signature


monitoring message latency for flink sql app

2024-05-16 Thread mete
Hello,

For an sql application using kafka as source (and kafka as sink) what would
be the recommended way to monitor for processing delay? For example, i want
to be able to alert if the app has a certain delay compared to some event
time field in the message.

Best,
Mete


SSL Kafka PyFlink

2024-05-16 Thread Phil Stavridis
Hi,

I have a PyFlink job that needs to read from a Kafka topic and the 
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just 
Python.

from confluent_kafka import Consumer, KafkaException, KafkaError



def get_config(bootstrap_servers, ca_file, cert_file, key_file):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SSL',
'ssl.ca.location': ca_file,
'ssl.certificate.location': cert_file,
'ssl.key.location': key_file,
'ssl.endpoint.identification.algorithm': 'none',
'enable.ssl.certificate.verification': 'false',
'group.id': ‘my_group_id'
}


return config



And have read messages from the Kafka topic.

I am trying to set up something similar with Flink SQL:

t_env.execute_sql(f"""
CREATE TABLE logs (
`user` ROW(`user_id` BIGINT),
`timestamp` ROW(`secs` BIGINT)
) WITH (
'connector' = '{CONNECTOR_TYPE}',
'topic' = ‘{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id' = '{CONSUMER_GROUP}',
'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
'format' = '{MESSAGE_FORMAT}',
'properties.security.protocol' = 'SSL',
'properties.ssl.ca.location' = '{ca_file}',
'properties.ssl.certificate.location' = '{cert_file}',
'properties.ssl.key.location' = '{key_file}',
'properties.ssl.endpoint.identification.algorithm' = ''
)
""")


But when this runs I am getting this error:

Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator 
cbc357ccb763df2852fee8c4fc7d55f2).
...
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list 
subscribed topic partitions due to
at 
...
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs].
at 
...
... 3 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target
...
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
at java.lang.Thread.run(Thread.java:750)
Caused by: sun.security.validator.ValidatorException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at 
sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable 
to find valid certification path to requested target
at 
sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
at 

Get access to unmatching events in Apache Flink Cep

2024-05-16 Thread Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Flink 1.18.1 ,重启状态恢复

2024-05-16 Thread 陈叶超
升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
2024-04-09 13:03:48
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: invalid stream header
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
at 
org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
at 
org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
at 
org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
... 18 more



Get access to unmatching events in Apache Flink Cep

2024-05-16 Thread Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread Biao Geng
Hi John,

Just want to check, have you ever changed the kafka protocol in your job
after using the new cluster? The error message shows that it is caused by
the kafka client and there is a similar error in this issue

.

Best,
Biao Geng


John Smith  于2024年5月16日周四 09:01写道:

> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
> difference using Java 11 and it seems after a week of usage the below
> exception happens.
>
> The task manager is...
>
> 32GB total
>
> And i have the ONLY following memory settings
>
> taskmanager.memory.flink.size: 16384m
> taskmanager.memory.jvm-metaspace.size: 3072m
>
>
>
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...
> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
>