Ways to detect a scaling event within a flink operator at runtime

2024-05-23 Thread Chetas Joshi
Hello,

On a k8s cluster, I have the flink-k8s-operator running 1.8 with autoscaler
= enabled (in-place) and a flinkDeployment (application mode) running
1.18.1.

The flinkDeployment i.e. the flink streaming application has a mock data
producer as the source.  The source generates data points every X milli
to be processed (aggregated) by the downstream operators. The aggregated
results are written to Iceberg.

The pipeline starts with default-parallelism = 1 i..e all the job vertexes
start with par = 1 and X = 0 so all data points are generated continuously.
Due to the lag associated with the aggregation and sink, the
source experiences backpressure and hence the autoscaler triggers a
scale-up. I want to slow down the speed of data production by source after
the first scale-up event. What are the ways I can detect the scale-up event
so that the source can dynamically adjust (increase) X at run-time? I am
wondering if there is a way to detect if the parallelism of any of the
job-vertex in the flink execution graph has gone above 1 within the source
operator at runtime.

This is a test pipeline (flink app) and the goal is to test the scale-up
and scale-down events thus I need to increase X in order to have a
scale-down event get triggered afterwards.

Thank you
Chetas


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Péter Váry
If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the wire
which contain not only the data, but the schema as well.
With Iceberg we currently only send the row data, and expect to receive the
schema on job start - this is more performant than sending the schema all
the time, but has the obvious issue that it is not able to handle the
schema changes. Another part of the dynamic schema synchronization is the
update of the Iceberg table schema - the schema should be updated for all
of the writers and the committer / but only a single schema change commit
is needed (allowed) to the Iceberg table.

This is a very interesting, but non-trivial change.

[1]
https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java

Andrew Otto  ezt írta (időpont: 2024. máj. 23., Cs,
21:59):

> Ah I see, so just auto-restarting to pick up new stuff.
>
> I'd love to understand how Paimon does this.  They have a database sync
> action
> 
> which will sync entire databases, handle schema evolution, and I'm pretty
> sure (I think I saw this in my local test) also pick up new tables.
>
>
> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>
> I'm sure that Paimon table format is great, but at Wikimedia Foundation we
> are on the Iceberg train.  Imagine if there was a flink-cdc full database
> sync to Flink IcebergSink!
>
>
>
>
> On Thu, May 23, 2024 at 3:47 PM Péter Váry 
> wrote:
>
>> I will ask Marton about the slides.
>>
>> The solution was something like this in a nutshell:
>> - Make sure that on job start the latest Iceberg schema is read from the
>> Iceberg table
>> - Throw a SuppressRestartsException when data arrives with the wrong
>> schema
>> - Use Flink Kubernetes Operator to restart your failed jobs by setting
>> kubernetes.operator.job.restart.failed
>>
>> Thanks, Peter
>>
>> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>>
>>> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
>>> there are slides somewhere?
>>>
>>> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
>>> sanabria.miranda.car...@gmail.com> wrote:
>>>
 Hi everyone!

 I have found in the Flink Forward website the following presentation: 
 "Self-service
 ingestion pipelines with evolving schema via Flink and Iceberg
 "
 by Márton Balassi from the 2023 conference in Seattle, but I cannot find
 the recording anywhere. I have found the recordings of the other
 presentations in the Ververica Academy website
 , but not this one.

 Does anyone know where I can find it? Or at least the slides?

 We are using Flink with the Iceberg sink connector to write streaming
 events to Iceberg tables, and we are researching how to handle schema
 evolution properly. I saw that presentation and I thought it could be of
 great help to us.

 Thanks in advance!

 Carlos

>>>


Pulsar connector resets existing subscription

2024-05-23 Thread Igor Basov
Hi everyone,

I have a problem with how Flink deals with the existing subscription in a
Pulsar topic.

   - Subscription has some accumulated backlog
   - Flink job is deployed from a clear state (no checkpoints)
   - Flink job uses the same subscription name as the existing one; the
   start cursor is the default one (earliest)

Based on the docs here
,
the priority for setting up the cursor position should be: checkpoint >
existed subscription position > StartCursor. So, since there are no
checkpoints, I expect the job to get the existing position from Pulsar and
start reading from there.
But that’s not what I see. As soon as the job is connected to the topic, I
see the number of messages in the subscription backlog jumping to a new
high, and JM logs show messages:

Seeking subscription to the message -1:-1:-1
Successfully reset subscription to the message -1:-1:-1

Apparently, Flink ignored the existing subscription position and reset its
cursor position to the earliest.
The related code seems to be here
,
but I’m not sure if it takes into account the existence of subscriptions.

Flink: 1.18.1
Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18

Thanks in advance!

Best regards,
Igor


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Andrew Otto
Ah I see, so just auto-restarting to pick up new stuff.

I'd love to understand how Paimon does this.  They have a database sync
action

which will sync entire databases, handle schema evolution, and I'm pretty
sure (I think I saw this in my local test) also pick up new tables.

https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45

I'm sure that Paimon table format is great, but at Wikimedia Foundation we
are on the Iceberg train.  Imagine if there was a flink-cdc full database
sync to Flink IcebergSink!




On Thu, May 23, 2024 at 3:47 PM Péter Váry 
wrote:

> I will ask Marton about the slides.
>
> The solution was something like this in a nutshell:
> - Make sure that on job start the latest Iceberg schema is read from the
> Iceberg table
> - Throw a SuppressRestartsException when data arrives with the wrong schema
> - Use Flink Kubernetes Operator to restart your failed jobs by setting
> kubernetes.operator.job.restart.failed
>
> Thanks, Peter
>
> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>
>> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
>> there are slides somewhere?
>>
>> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
>> sanabria.miranda.car...@gmail.com> wrote:
>>
>>> Hi everyone!
>>>
>>> I have found in the Flink Forward website the following presentation: 
>>> "Self-service
>>> ingestion pipelines with evolving schema via Flink and Iceberg
>>> "
>>> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
>>> the recording anywhere. I have found the recordings of the other
>>> presentations in the Ververica Academy website
>>> , but not this one.
>>>
>>> Does anyone know where I can find it? Or at least the slides?
>>>
>>> We are using Flink with the Iceberg sink connector to write streaming
>>> events to Iceberg tables, and we are researching how to handle schema
>>> evolution properly. I saw that presentation and I thought it could be of
>>> great help to us.
>>>
>>> Thanks in advance!
>>>
>>> Carlos
>>>
>>


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Péter Váry
I will ask Marton about the slides.

The solution was something like this in a nutshell:
- Make sure that on job start the latest Iceberg schema is read from the
Iceberg table
- Throw a SuppressRestartsException when data arrives with the wrong schema
- Use Flink Kubernetes Operator to restart your failed jobs by setting
kubernetes.operator.job.restart.failed

Thanks, Peter

On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:

> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
> there are slides somewhere?
>
> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
> sanabria.miranda.car...@gmail.com> wrote:
>
>> Hi everyone!
>>
>> I have found in the Flink Forward website the following presentation: 
>> "Self-service
>> ingestion pipelines with evolving schema via Flink and Iceberg
>> "
>> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
>> the recording anywhere. I have found the recordings of the other
>> presentations in the Ververica Academy website
>> , but not this one.
>>
>> Does anyone know where I can find it? Or at least the slides?
>>
>> We are using Flink with the Iceberg sink connector to write streaming
>> events to Iceberg tables, and we are researching how to handle schema
>> evolution properly. I saw that presentation and I thought it could be of
>> great help to us.
>>
>> Thanks in advance!
>>
>> Carlos
>>
>


Re: issues with Flink kinesis connector

2024-05-23 Thread Nick Hecht
Thank you for your help!

On Thu, May 23, 2024 at 1:40 PM Aleksandr Pilipenko 
wrote:

> Hi Nick,
>
> You need to use another method to add sink to your job - sinkTo.
> KinesisStreamsSink implements newer Sink interface, while addSink expect
> old SinkFunction. You can see this by looking at method signatures[1] and
> in usage examples in documentation[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink
>
> Best,
> Aleksandr
>
>
> On Thu, 23 May 2024 at 17:19, Nick Hecht 
> wrote:
>
>> Hello,
>>
>> I am currently having issues trying to use the python flink 1.18
>> Datastream api with the Amazon Kinesis Data Streams Connector.
>>
>> From the documentation
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
>>  I have downloaded the "flink-connector-kinesis" jar
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar
>>
>> and i have added it in my code:
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>> env.enable_checkpointing(5000)
>>
>> env.add_jars(
>> "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar"
>> ,
>> )
>>
>> and it has worked perfectly so far when setting up my kinesis source,  i
>> recently added a kinesis sink to complete my pipeline (was testing with
>> print before)
>>
>> # ds = ds.print() sink = KinesisStreamsSink.builder() \
>> .set_kinesis_client_properties(config) \
>> .set_serialization_schema(SimpleStringSchema()) \
>> .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
>> .set_stream_name(stream_name) \
>> .build()
>>
>> ds = ds.add_sink(sink)
>>
>> s_env.execute('pipeline')
>>
>> now when i run my python flink application it throws an error at my
>> add_sink call with the following exception:
>>
>> > python locations_flink_app.py
>>
>> 2024-05-23 14:53:10,219 - INFO -
>> apache_beam.typehints.native_type_compatibility - 315 - Using Any for
>> unsupported type: typing.Sequence[~T]
>> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
>> module named google.cloud.bigquery_storage_v1. As a result, the
>> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
>> Traceback (most recent call last):
>>   File "locations_flink_app.py", line 90, in 
>> setup_flink_app(s_env, props)
>>   File "locations_flink_app.py", line 71, in setup_flink_app
>> ds = ds.add_sink(create_sink(
>>   File
>> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
>> line 819, in add_sink
>> return
>> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
>>   File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py",
>> line 1322, in __call__
>> return_value = get_return_value(
>>   File
>> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line
>> 146, in deco
>> return f(*a, **kw)
>>   File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line
>> 330, in get_return_value
>> raise Py4JError(
>> py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
>> Trace:
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
>> does not exist
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> when i open the jar i downloaded
>> (flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
>> the classes i need
>> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
>> has  KinesisStreamsSink.class[class
>> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]
>>
>> If I remove the sink the source still works perfectly fine
>> (FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
>> I'm using should have everything.
>>
>> anyone else have similar issues?  or know what I might need to do?
>>
>>
>> Thank you,
>>
>> Nick Hecht
>>
>


Re: issues with Flink kinesis connector

2024-05-23 Thread Aleksandr Pilipenko
Hi Nick,

You need to use another method to add sink to your job - sinkTo.
KinesisStreamsSink implements newer Sink interface, while addSink expect
old SinkFunction. You can see this by looking at method signatures[1] and
in usage examples in documentation[2]

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink

Best,
Aleksandr


On Thu, 23 May 2024 at 17:19, Nick Hecht  wrote:

> Hello,
>
> I am currently having issues trying to use the python flink 1.18
> Datastream api with the Amazon Kinesis Data Streams Connector.
>
> From the documentation
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
>  I have downloaded the "flink-connector-kinesis" jar
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar
>
> and i have added it in my code:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
> env.enable_checkpointing(5000)
>
> env.add_jars(
> "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar",
> )
>
> and it has worked perfectly so far when setting up my kinesis source,  i
> recently added a kinesis sink to complete my pipeline (was testing with
> print before)
>
> # ds = ds.print() sink = KinesisStreamsSink.builder() \
> .set_kinesis_client_properties(config) \
> .set_serialization_schema(SimpleStringSchema()) \
> .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
> .set_stream_name(stream_name) \
> .build()
>
> ds = ds.add_sink(sink)
>
> s_env.execute('pipeline')
>
> now when i run my python flink application it throws an error at my
> add_sink call with the following exception:
>
> > python locations_flink_app.py
>
> 2024-05-23 14:53:10,219 - INFO -
> apache_beam.typehints.native_type_compatibility - 315 - Using Any for
> unsupported type: typing.Sequence[~T]
> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
> module named google.cloud.bigquery_storage_v1. As a result, the
> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
> Traceback (most recent call last):
>   File "locations_flink_app.py", line 90, in 
> setup_flink_app(s_env, props)
>   File "locations_flink_app.py", line 71, in setup_flink_app
> ds = ds.add_sink(create_sink(
>   File
> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
> line 819, in add_sink
> return
> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
>   File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line
> 1322, in __call__
> return_value = get_return_value(
>   File
> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line
> 146, in deco
> return f(*a, **kw)
>   File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line
> 330, in get_return_value
> raise Py4JError(
> py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
> Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
> does not exist
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> when i open the jar i downloaded
> (flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
> the classes i need
> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
> has  KinesisStreamsSink.class[class
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]
>
> If I remove the sink the source still works perfectly fine
> (FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
> I'm using should have everything.
>
> anyone else have similar issues?  or know what I might need to do?
>
>
> Thank you,
>
> Nick Hecht
>


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread Zhanghao Chen
Hi John,

Based on the Memory config screenshot provided before, each of your TM should 
have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. 
Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory 
exceed pod physical mem, you may check the detailed TM memory model [1] and 
double check for yourself.

Maybe you can further analyze the direct memory usage using tools like JVM 
Native Memory Tracking (NMT).

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Zhanghao Chen

From: John Smith 
Sent: Thursday, May 23, 2024 22:40
To: Zhanghao Chen 
Cc: Biao Geng ; user 
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My suspicion 
is that I may have allocated too much of taskmanager.memory.flink.size and the 
total including MaxDirectMemory is more than what the physical OS has, is that 
possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula 
MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith 
mailto:java.dev@gmail.com>> wrote:
Ok, but I still don't get why it's doing it... It's the same version of 
flink... Only difference is java 11 and also I allocated more JVM heap and the 
actual physical is has more ram. Maybe I should reduce the JVM heap by a a 
gigabyte or two?

On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi John,

A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + 
Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, 
regardless of the version of JVM.

Best,
Zhanghao Chen

From: John Smith mailto:java.dev@gmail.com>>
Sent: Wednesday, May 22, 2024 22:56
To: Biao Geng mailto:biaoge...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Hi, apologies I hit reply instead of reply all. So not sure who saw this or 
didn't. We have not switched to SSL and also our assumption here would be that 
if we did switch to SSL the jobs would not work or produce invalid results. The 
jobs work absolutely fine for a week or so and then they fail.

Here is the consumer config from the task logs, which says PLAINTEXT and port 
9092 is used. Also I attached a screen of the task manager memory usage. As 
well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by 
default calculates the direct memory size to 87% of the max heap size. While 
Java 11 set it to 100% of the max heap size.

[Screen Shot 2024-05-22 at 9.50.38 AM.png]

 allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092, 
xx-kafka-0003:9092]
check.crcs = true
client.dns.lookup = default
client.id = xx
client.rack =
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = xx
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 6
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.l

Re: Task Manager memory usage

2024-05-23 Thread Zhanghao Chen
Hi Sigalit,

For states stored in memory, they would most probably keep alive for several 
rounds of GC and ended up in the old gen of heap, and won't get recycled until 
a Full GC.

As for the TM pod memory usage, most probabliy it will stop increasing at some 
point. You could try setting a larger taskmanager.memory.jvm-overhead memory, 
and monitor it for a long time. If that's not the case, then there might be 
native memory leakage somewhere, but that may not be related to the state.

Best,
Zhanghao Chen

From: Sigalit Eliazov 
Sent: Thursday, May 23, 2024 18:20
To: user 
Subject: Task Manager memory usage


Hi,

I am trying to understand the following behavior in our Flink application 
cluster. Any assistance would be appreciated.

We are running a Flink application cluster with 5 task managers, each with the 
following configuration:

  *   jobManagerMemory: 12g
  *   taskManagerMemory: 20g
  *   taskManagerMemoryHeapSize: 12g
  *   taskManagerMemoryNetworkMax: 4g
  *   taskManagerMemoryNetworkMin: 1g
  *   taskManagerMemoryManagedSize: 50m
  *   taskManagerMemoryOffHeapSize: 2g
  *   taskManagerMemoryNetworkFraction: 0.2
  *   taskManagerNetworkMemorySegmentSize: 4mb
  *   taskManagerMemoryFloatingBuffersPerGate: 64
  *   taskmanager.memory.jvm-overhead.min: 256mb
  *   taskmanager.memory.jvm-overhead.max: 2g
  *   taskmanager.memory.jvm-overhead.fraction: 0.1

Our pipeline includes stateful transformations, and we are verifying that we 
clear the state once it is no longer needed.

Through the Flink UI, we observe that the heap size increases and decreases 
during the job lifecycle.

However, there is a noticeable delay between clearing the state and the 
reduction in heap size usage, which I assume is related to the garbage 
collector frequency.

What is puzzling is the task manager pod memory usage. It appears that the 
memory usage increases intermittently and is not released. We verified the 
different state metrics and confirmed they are changing according to the logic.

Additionally, if we had a state that was never released, I would expect to see 
the heap size increasing constantly as well.

Any insights or ideas?

Thanks,

Sigalit


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Andrew Otto
Wow, I would LOVE to see this talk.  If there is no recording, perhaps
there are slides somewhere?

On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
sanabria.miranda.car...@gmail.com> wrote:

> Hi everyone!
>
> I have found in the Flink Forward website the following presentation: 
> "Self-service
> ingestion pipelines with evolving schema via Flink and Iceberg
> "
> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
> the recording anywhere. I have found the recordings of the other
> presentations in the Ververica Academy website
> , but not this one.
>
> Does anyone know where I can find it? Or at least the slides?
>
> We are using Flink with the Iceberg sink connector to write streaming
> events to Iceberg tables, and we are researching how to handle schema
> evolution properly. I saw that presentation and I thought it could be of
> great help to us.
>
> Thanks in advance!
>
> Carlos
>


issues with Flink kinesis connector

2024-05-23 Thread Nick Hecht
Hello,

I am currently having issues trying to use the python flink 1.18 Datastream
api with the Amazon Kinesis Data Streams Connector.

>From the documentation
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
 I have downloaded the "flink-connector-kinesis" jar
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar

and i have added it in my code:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.enable_checkpointing(5000)

env.add_jars(
"file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar",
)

and it has worked perfectly so far when setting up my kinesis source,  i
recently added a kinesis sink to complete my pipeline (was testing with
print before)

# ds = ds.print() sink = KinesisStreamsSink.builder() \
.set_kinesis_client_properties(config) \
.set_serialization_schema(SimpleStringSchema()) \
.set_partition_key_generator(PartitionKeyGenerator.fixed()) \
.set_stream_name(stream_name) \
.build()

ds = ds.add_sink(sink)

s_env.execute('pipeline')

now when i run my python flink application it throws an error at my
add_sink call with the following exception:

> python locations_flink_app.py

2024-05-23 14:53:10,219 - INFO -
apache_beam.typehints.native_type_compatibility - 315 - Using Any for
unsupported type: typing.Sequence[~T]
2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
module named google.cloud.bigquery_storage_v1. As a result, the
ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Traceback (most recent call last):
  File "locations_flink_app.py", line 90, in 
setup_flink_app(s_env, props)
  File "locations_flink_app.py", line 71, in setup_flink_app
ds = ds.add_sink(create_sink(
  File
"/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
line 819, in add_sink
return
DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
  File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line
1322, in __call__
return_value = get_return_value(
  File "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py",
line 146, in deco
return f(*a, **kw)
  File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line 330,
in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method
addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
does not exist
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)

when i open the jar i downloaded
(flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
the classes i need
Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
has  KinesisStreamsSink.class[class
org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]

If I remove the sink the source still works perfectly fine
(FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
I'm using should have everything.

anyone else have similar issues?  or know what I might need to do?


Thank you,

Nick Hecht


"Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Carlos Sanabria Miranda
Hi everyone!

I have found in the Flink Forward website the following presentation:
"Self-service
ingestion pipelines with evolving schema via Flink and Iceberg
"
by Márton Balassi from the 2023 conference in Seattle, but I cannot find
the recording anywhere. I have found the recordings of the other
presentations in the Ververica Academy website
, but not this one.

Does anyone know where I can find it? Or at least the slides?

We are using Flink with the Iceberg sink connector to write streaming
events to Iceberg tables, and we are researching how to handle schema
evolution properly. I saw that presentation and I thought it could be of
great help to us.

Thanks in advance!

Carlos


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread John Smith
Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My
suspicion is that I may have allocated too much
of taskmanager.memory.flink.size and the total including MaxDirectMemory is
more than what the physical OS has, is that possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula
MaxDirectMemory
of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith  wrote:

> Ok, but I still don't get why it's doing it... It's the same version of
> flink... Only difference is java 11 and also I allocated more JVM heap and
> the actual physical is has more ram. Maybe I should reduce the JVM heap by
> a a gigabyte or two?
>
> On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
> wrote:
>
>> Hi John,
>>
>> A side note here: Flink will set the MaxDirectMemory of TM = Network
>> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default
>> setting, regardless of the version of JVM.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* John Smith 
>> *Sent:* Wednesday, May 22, 2024 22:56
>> *To:* Biao Geng 
>> *Cc:* user 
>> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct
>> buffer memory?
>>
>> Hi, apologies I hit reply instead of reply all. So not sure who saw this
>> or didn't. We have not switched to SSL and also our assumption here
>> would be that if we did switch to SSL the jobs would not work or produce
>> invalid results. The jobs work absolutely fine for a week or so and then
>> they fail.
>>
>> Here is the consumer config from the task logs, which says PLAINTEXT and
>> port 9092 is used. Also I attached a screen of the task manager memory
>> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11.
>> Java 8 by default calculates the direct memory size to 87% of the max heap
>> size. While Java 11 set it to 100% of the max heap size.
>>
>> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png]
>>
>>  allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092,
>> xx-kafka-0003:9092]
>> check.crcs = true
>> client.dns.lookup = default
>> client.id = xx
>> client.rack =
>> connections.max.idle.ms = 54
>> default.api.timeout.ms = 6
>> enable.auto.commit = false
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = xx
>> group.instance.id = null
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> max.partition.fetch.bytes = 1048576
>> max.poll.interval.ms = 30
>> max.poll.records = 500
>> metadata.max.age.ms = 30
>> metric.reporters = []
>> metrics.num.samples = 2
>> metrics.recording.level = INFO
>> metrics.sample.window.ms = 3
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>> receive.buffer.bytes = 65536
>> reconnect.backoff.max.ms = 1000
>> reconnect.backoff.ms = 50
>> request.timeout.ms = 6
>> retry.backoff.ms = 100
>> sasl.client.callback.handler.class = null
>> sasl.jaas.config = null
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.min.time.before.relogin = 6
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> sasl.login.callback.handler.class = null
>> sasl.login.class = null
>> sasl.login.refresh.buffer.seconds = 300
>> sasl.login.refresh.min.period.seconds = 60
>> sasl.login.refresh.window.factor = 0.8
>> sasl.login.refresh.window.jitter = 0.05
>> sasl.mechanism = GSSAPI
>> security.protocol = PLAINTEXT
>> security.providers = null
>> send.buffer.bytes = 131072
>> session.timeout.ms = 1
>> ssl.cipher.suites = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.endpoint.identification.algorithm = https
>> ssl.key.password = null
>> ssl.keymanager.algorithm = SunX509
>> ssl.keystore.location = null
>> ssl.keystore.password = null
>> ssl.keystore.type = JKS
>> ssl.protocol = TLS
>> ssl.provider = null
>> ssl.secure.random.implementation = null
>> ssl.trustmanager.algorithm = PKIX
>> ssl.truststore.location = null
>> ssl.truststore.password = null
>> ssl.truststore.type = JKS
>> value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> On Thu, May 16, 2024 at 3:20 AM Biao Geng  wrote:
>>
>> Hi John,
>>
>> Just want to check, have you ever changed the kafka protocol in your job
>> after using the new cluster? The error message shows that it is caused by
>> the kafka 

kirankumarkathe...@gmail.com-unsubscribe

2024-05-23 Thread Kiran Kumar Kathe
Kindly un subscribe for this gmail account kirankumarkathe...@gmail.com


Re: Task Manager memory usage

2024-05-23 Thread Sigalit Eliazov
hi,
thanks for your reply,
we are storing the data in memory since it is a short term we thought that
adding rocksdb will add overhead.


On Thu, May 23, 2024 at 4:38 PM Sachin Mittal  wrote:

> Hi
> Where are you storing the state.
> Try rocksdb.
>
> Thanks
> Sachin
>
>
> On Thu, 23 May 2024 at 6:19 PM, Sigalit Eliazov 
> wrote:
>
>> Hi,
>>
>> I am trying to understand the following behavior in our Flink application
>> cluster. Any assistance would be appreciated.
>>
>> We are running a Flink application cluster with 5 task managers, each
>> with the following configuration:
>>
>>- jobManagerMemory: 12g
>>- taskManagerMemory: 20g
>>- taskManagerMemoryHeapSize: 12g
>>- taskManagerMemoryNetworkMax: 4g
>>- taskManagerMemoryNetworkMin: 1g
>>- taskManagerMemoryManagedSize: 50m
>>- taskManagerMemoryOffHeapSize: 2g
>>- taskManagerMemoryNetworkFraction: 0.2
>>- taskManagerNetworkMemorySegmentSize: 4mb
>>- taskManagerMemoryFloatingBuffersPerGate: 64
>>- taskmanager.memory.jvm-overhead.min: 256mb
>>- taskmanager.memory.jvm-overhead.max: 2g
>>- taskmanager.memory.jvm-overhead.fraction: 0.1
>>
>> Our pipeline includes stateful transformations, and we are verifying that
>> we clear the state once it is no longer needed.
>>
>> Through the Flink UI, we observe that the heap size increases and
>> decreases during the job lifecycle.
>>
>> However, there is a noticeable delay between clearing the state and the
>> reduction in heap size usage, which I assume is related to the garbage
>> collector frequency.
>>
>> What is puzzling is the task manager pod memory usage. It appears that
>> the memory usage increases intermittently and is not released. We verified
>> the different state metrics and confirmed they are changing according to
>> the logic.
>>
>> Additionally, if we had a state that was never released, I would expect
>> to see the heap size increasing constantly as well.
>>
>> Any insights or ideas?
>>
>> Thanks,
>>
>> Sigalit
>>
>


Re: Task Manager memory usage

2024-05-23 Thread Sachin Mittal
Hi
Where are you storing the state.
Try rocksdb.

Thanks
Sachin


On Thu, 23 May 2024 at 6:19 PM, Sigalit Eliazov  wrote:

> Hi,
>
> I am trying to understand the following behavior in our Flink application
> cluster. Any assistance would be appreciated.
>
> We are running a Flink application cluster with 5 task managers, each with
> the following configuration:
>
>- jobManagerMemory: 12g
>- taskManagerMemory: 20g
>- taskManagerMemoryHeapSize: 12g
>- taskManagerMemoryNetworkMax: 4g
>- taskManagerMemoryNetworkMin: 1g
>- taskManagerMemoryManagedSize: 50m
>- taskManagerMemoryOffHeapSize: 2g
>- taskManagerMemoryNetworkFraction: 0.2
>- taskManagerNetworkMemorySegmentSize: 4mb
>- taskManagerMemoryFloatingBuffersPerGate: 64
>- taskmanager.memory.jvm-overhead.min: 256mb
>- taskmanager.memory.jvm-overhead.max: 2g
>- taskmanager.memory.jvm-overhead.fraction: 0.1
>
> Our pipeline includes stateful transformations, and we are verifying that
> we clear the state once it is no longer needed.
>
> Through the Flink UI, we observe that the heap size increases and
> decreases during the job lifecycle.
>
> However, there is a noticeable delay between clearing the state and the
> reduction in heap size usage, which I assume is related to the garbage
> collector frequency.
>
> What is puzzling is the task manager pod memory usage. It appears that the
> memory usage increases intermittently and is not released. We verified the
> different state metrics and confirmed they are changing according to the
> logic.
>
> Additionally, if we had a state that was never released, I would expect to
> see the heap size increasing constantly as well.
>
> Any insights or ideas?
>
> Thanks,
>
> Sigalit
>


Help with monitoring metrics of StateFun runtime with prometheus

2024-05-23 Thread Oliver Schmied
Dear Apache Flink community,

 

I am setting up an apche flink statefun runtime on Kubernetes, following the flink-playground example: https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s.

This is the manifest I used for creating the statefun enviroment:

```---

apiVersion: v1
kind: ConfigMap
metadata:
  namespace: statefun
  name: flink-config
  labels:
    app: statefun
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: statefun-master
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
    state.backend: rocksdb
    state.backend.rocksdb.timer-service.factory: ROCKSDB
    state.backend.incremental: true
    parallelism.default: 1
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
    state.checkpoints.dir: s3://checkpoints/subscriptions
    s3.endpoint: http://minio.statefun.svc.cluster.local:9000
    s3.path-style-access: true
    jobmanager.memory.process.size: 1g
    taskmanager.memory.process.size: 1g

  log4j-console.properties: |+
  monitorInterval=30
  rootLogger.level = INFO
  rootLogger.appenderRef.console.ref = ConsoleAppender
  logger.akka.name = akka
  logger.akka.level = INFO
  logger.kafka.name= org.apache.kafka
  logger.kafka.level = INFO
  logger.hadoop.name = org.apache.hadoop
  logger.hadoop.level = INFO
  logger.zookeeper.name = org.apache.zookeeper
  logger.zookeeper.level = INFO
  appender.console.name = ConsoleAppender
  appender.console.type = CONSOLE
  appender.console.layout.type = PatternLayout
  appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
  logger.netty.level = OFF

---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master-rest
  namespace: statefun
spec:
  type: NodePort
  ports:
    - name: rest
  port: 8081
  targetPort: 8081
  selector:
    app: statefun
    component: master
---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master
  namespace: statefun
spec:
  type: ClusterIP
  ports:
    - name: rpc
  port: 6123
    - name: blob
  port: 6124
    - name: ui
  port: 8081
  selector:
    app: statefun
    component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-master
  namespace: statefun
spec:
  replicas: 1
  selector:
    matchLabels:
  app: statefun
  component: master
  template:
    metadata:
  labels:
    app: statefun
    component: master
    spec:
  containers:
    - name: master
  image: apache/flink-statefun:3.3.0
  imagePullPolicy: IfNotPresent
  env:
    - name: ROLE
  value: master
    - name: MASTER_HOST
  value: statefun-master
  ports:
    - containerPort: 6123
  name: rpc
    - containerPort: 6124
  name: blob
    - containerPort: 8081
  name: ui
  livenessProbe:
    tcpSocket:
  port: 6123
    initialDelaySeconds: 30
    periodSeconds: 60
  volumeMounts:
    - name: flink-config-volume
  mountPath: /opt/flink/conf
    - name: module-config-volume
  mountPath: /opt/statefun/modules/example
  volumes:
    - name: flink-config-volume
  configMap:
    name: flink-config
    items:
  - key: flink-conf.yaml
    path: flink-conf.yaml
  - key: log4j-console.properties
    path: log4j-console.properties
    - name: module-config-volume
  configMap:
    name: module-config
    items:
  - key: module.yaml
    path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-worker
  namespace: statefun
spec:
  replicas: 1
  selector:
    matchLabels:
  app: statefun
  component: worker
  template:
    metadata:
  labels:
    app: statefun
    component: worker
    spec:
  containers:
    - name: worker
  image: apache/flink-statefun:3.3.0
  imagePullPolicy: IfNotPresent
  env:
    - name: ROLE
  value: worker
    - name: MASTER_HOST
  value: statefun-master
  resources:
    requests:
  memory: "1Gi"
  ports:
    - containerPort: 6122
  name: rpc
    - containerPort: 6124
  name: blob
    - containerPort: 8081
  name: ui
  livenessProbe:
    tcpSocket:
  port: 6122
    initialDelaySeconds: 30

Task Manager memory usage

2024-05-23 Thread Sigalit Eliazov
Hi,

I am trying to understand the following behavior in our Flink application
cluster. Any assistance would be appreciated.

We are running a Flink application cluster with 5 task managers, each with
the following configuration:

   - jobManagerMemory: 12g
   - taskManagerMemory: 20g
   - taskManagerMemoryHeapSize: 12g
   - taskManagerMemoryNetworkMax: 4g
   - taskManagerMemoryNetworkMin: 1g
   - taskManagerMemoryManagedSize: 50m
   - taskManagerMemoryOffHeapSize: 2g
   - taskManagerMemoryNetworkFraction: 0.2
   - taskManagerNetworkMemorySegmentSize: 4mb
   - taskManagerMemoryFloatingBuffersPerGate: 64
   - taskmanager.memory.jvm-overhead.min: 256mb
   - taskmanager.memory.jvm-overhead.max: 2g
   - taskmanager.memory.jvm-overhead.fraction: 0.1

Our pipeline includes stateful transformations, and we are verifying that
we clear the state once it is no longer needed.

Through the Flink UI, we observe that the heap size increases and decreases
during the job lifecycle.

However, there is a noticeable delay between clearing the state and the
reduction in heap size usage, which I assume is related to the garbage
collector frequency.

What is puzzling is the task manager pod memory usage. It appears that the
memory usage increases intermittently and is not released. We verified the
different state metrics and confirmed they are changing according to the
logic.

Additionally, if we had a state that was never released, I would expect to
see the heap size increasing constantly as well.

Any insights or ideas?

Thanks,

Sigalit


Re: Flink kinesis connector 4.3.0 release estimated date

2024-05-23 Thread Vararu, Vadim
That’s great news.

Thanks.

From: Leonard Xu 
Date: Thursday, 23 May 2024 at 04:42
To: Vararu, Vadim 
Cc: user , Danny Cranmer 
Subject: Re: Flink kinesis connector 4.3.0 release estimated date
Hey, Vararu

The kinesis connector 4.3.0 release is under vote phase and we hope to finalize 
the release work in this week if everything goes well.


Best,
Leonard



2024年5月22日 下午11:51,Vararu, Vadim 
mailto:vadim.var...@adswizz.com>> 写道:

Hi guys,

Any idea when the 4.3.0 kinesis connector is estimated to be released?

Cheers,
Vadim.