StreamTaskException: Could not serialize object for key serializedUDF

2024-07-31 Thread dz902
Hi,

I'm using Flink 1.17.1 streaming API, on YARN.

My app first stuck at process func serialization. I know Avro Schema
is not serializable so I removed all references from my process
functions. Now it passes first round, but stuck again at the following
error:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Error in serialization.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.flink.util.FlinkRuntimeException: Error in serialization.
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:326)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:160)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1029)
at 
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
at 
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2197)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:118)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
at org.example.FlinkCDCMulti.startFlinkJob(FlinkCDCMulti.java:164)
at org.example.FlinkCDCMulti.main(FlinkCDCMulti.java:104)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
not serialize object for key serializedUDF.
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:322)
... 29 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Could not serialize object for key serializedUDF.
at 
org.apache.flink.streaming.api.graph.StreamConfig.lambda$serializeAllConfigs$1(StreamConfig.java:198)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at 
org.apache.flink.streaming.api.graph.StreamConfig.serializeAllConfigs(StreamConfig.java:192)
at 
org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:169)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 

Re: Elasticsearch 8.x Connector in Maven

2024-07-31 Thread Rion Williams
Hi Ahmed,Thanks for the response, I’ll reach out to the devs list and go from there.Thanks again,Rion On Jul 31, 2024, at 9:18 AM, Ahmed Hamdy  wrote:Hi RionIt seems that ES 8 was supported ahead of 3.1 release[1], which seems to not be released yet hence not published to maven.Given the importance of ES 8 and the fact that elastic search still depends on Flink 1.18 while we are releasing 1.20, I would suggest nudging the dev list[2] for a release manager to pick up the 3.1 release.1-https://issues.apache.org/jira/browse/FLINK-260882-https://lists.apache.org/list.html?d...@flink.apache.orgBest RegardsAhmed HamdyOn Wed, 31 Jul 2024 at 15:06, Rion Williams  wrote:Hi again all,

Just following up on this as I’ve scoured around trying to find any documentation for using the ES 8.x connector, however everything only appears to reference 6/7.

The ES 8.x seems to have been released for quite a bit of time, so I’m curious how others are using it. I’d really like to avoid doing something like forking the bits I need in my local repository if possible and building it on my own. 

Thanks in advance,

Rion

> On Jul 30, 2024, at 1:00 PM, Rion Williams  wrote:
> 
> Hi all,
> 
> I see that the Elasticsearch Connector for 8.x is supported per the repo (and completed JIRAs). Is there a way to reference this via Maven? Or is it required to build the connector from the source directly?
> 
> We recently upgraded an Elasticsearch cluster to 8.x and some of the writes are now failing from the 7.x RHLC, so trying to upgrade to get to parity (and stop the jobs from crashing).
> 
> Thanks in advance,
> 
> Rion



Re: Elasticsearch 8.x Connector in Maven

2024-07-31 Thread Ahmed Hamdy
Hi Rion
It seems that ES 8 was supported ahead of 3.1 release[1], which seems to
not be released yet hence not published to maven.
Given the importance of ES 8 and the fact that elastic search still depends
on Flink 1.18 while we are releasing 1.20, I would suggest nudging the dev
list[2] for a release manager to pick up the 3.1 release.


1-https://issues.apache.org/jira/browse/FLINK-26088
2-https://lists.apache.org/list.html?d...@flink.apache.org

Best Regards
Ahmed Hamdy


On Wed, 31 Jul 2024 at 15:06, Rion Williams  wrote:

> Hi again all,
>
> Just following up on this as I’ve scoured around trying to find any
> documentation for using the ES 8.x connector, however everything only
> appears to reference 6/7.
>
> The ES 8.x seems to have been released for quite a bit of time, so I’m
> curious how others are using it. I’d really like to avoid doing something
> like forking the bits I need in my local repository if possible and
> building it on my own.
>
> Thanks in advance,
>
> Rion
>
> > On Jul 30, 2024, at 1:00 PM, Rion Williams 
> wrote:
> >
> > Hi all,
> >
> > I see that the Elasticsearch Connector for 8.x is supported per the repo
> (and completed JIRAs). Is there a way to reference this via Maven? Or is it
> required to build the connector from the source directly?
> >
> > We recently upgraded an Elasticsearch cluster to 8.x and some of the
> writes are now failing from the 7.x RHLC, so trying to upgrade to get to
> parity (and stop the jobs from crashing).
> >
> > Thanks in advance,
> >
> > Rion
>


Re: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Alexis Sarda-Espinosa
Hi again,

I realized it's easy to create a reproducible example, see this specific
commit:

https://github.com/asardaes/test-flink-state-processor/commit/95e65f88fd1e38bcba63ebca68e3128789c0d2f2

When I run that application, I see the following output:

Savepoint created
KEY=GenericServiceCompositeKey(serviceId=X, countryCode=BAR)
Why is this null?

So a key is missing, and the key that was written has a null state.

Regards,
Alexis.

Am Mi., 31. Juli 2024 um 15:45 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Matthias,
>
> This indeed compiles, I am able to actually generate a savepoint, it's
> just that all the window states in that savepoint appear to be null. The
> second argument of withOperator(...) is specified via
> OperatorTransformation...aggregate(), so the final transformation is built
> by WindowedStateTransformation#aggregate().
>
> I don't have any special logic with timers or even multiple events per
> key, in fact, my "stateToMigrate" already contains a single state instance
> for each key of interest, so my AggregateFunctionForMigration simply
> returns "value" in its add() method, no other logic there.
>
> Regards,
> Alexis.
>
> Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
>> Hi Alexis,
>>
>>
>>
>> Just a couple of points to double-check:
>>
>>- Does your code compile? (the second argument of withOperator(..)
>>should derive StateBootstrapTransformation instead of
>>SingleOutputStreamOperator)
>>- From the documentation of savepoint API you’ll find examples for
>>each type of state
>>- Your preparation needs to generate events that within your
>>StateBootstrapTransformation impementation get set into state primitives
>>much the same as you would do with a normal streaming operator
>>- Please note that a savepoint api job always runs in batch-mode,
>>hence
>>   - Keyed events are processed in key order first and the in time
>>   order
>>   - Triggers will only be fired after processing all events of a
>>   respective key are processed
>>   - Semantics are therefore slightly different as for streaming
>>   timers
>>
>>
>>
>> Hope that helps 
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Alexis Sarda-Espinosa 
>> *Sent:* Monday, July 29, 2024 9:18 PM
>> *To:* user 
>> *Subject:* Using state processor for a custom windowed aggregate function
>>
>>
>>
>> Hello,
>>
>>
>>
>> I am trying to create state for an aggregate function that is used with a
>> GlobalWindow. This basically looks like:
>>
>>
>>
>> savepointWriter.withOperator(
>> OperatorIdentifier.forUid(UID),
>> OperatorTransformation.bootstrapWith(stateToMigrate)
>> .keyBy(...)
>> .window(GlobalWindows.create())
>> .aggregate(new AggregateFunctionForMigration())
>> )
>>
>> The job runs correctly and writes a savepoint, but if I then read the
>> savepoint I just created and load the state for that UID, the "elements"
>> iterable in the WindowReaderFunction's readWindow() method has a non-zero
>> size, but every element is null.
>>
>>
>>
>> I've tried specifying a custom trigger between window() and aggregate(),
>> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>>
>>
>>
>> Am I missing something?
>>
>>
>>
>> Regards,
>>
>> Alexis.
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>


Re: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Alexis Sarda-Espinosa
Hi Matthias,

This indeed compiles, I am able to actually generate a savepoint, it's just
that all the window states in that savepoint appear to be null. The second
argument of withOperator(...) is specified via
OperatorTransformation...aggregate(), so the final transformation is built
by WindowedStateTransformation#aggregate().

I don't have any special logic with timers or even multiple events per key,
in fact, my "stateToMigrate" already contains a single state instance for
each key of interest, so my AggregateFunctionForMigration simply returns
"value" in its add() method, no other logic there.

Regards,
Alexis.

Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
> Just a couple of points to double-check:
>
>- Does your code compile? (the second argument of withOperator(..)
>should derive StateBootstrapTransformation instead of
>SingleOutputStreamOperator)
>- From the documentation of savepoint API you’ll find examples for
>each type of state
>- Your preparation needs to generate events that within your
>StateBootstrapTransformation impementation get set into state primitives
>much the same as you would do with a normal streaming operator
>- Please note that a savepoint api job always runs in batch-mode, hence
>   - Keyed events are processed in key order first and the in time
>   order
>   - Triggers will only be fired after processing all events of a
>   respective key are processed
>   - Semantics are therefore slightly different as for streaming timers
>
>
>
> Hope that helps 
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, July 29, 2024 9:18 PM
> *To:* user 
> *Subject:* Using state processor for a custom windowed aggregate function
>
>
>
> Hello,
>
>
>
> I am trying to create state for an aggregate function that is used with a
> GlobalWindow. This basically looks like:
>
>
>
> savepointWriter.withOperator(
> OperatorIdentifier.forUid(UID),
> OperatorTransformation.bootstrapWith(stateToMigrate)
> .keyBy(...)
> .window(GlobalWindows.create())
> .aggregate(new AggregateFunctionForMigration())
> )
>
> The job runs correctly and writes a savepoint, but if I then read the
> savepoint I just created and load the state for that UID, the "elements"
> iterable in the WindowReaderFunction's readWindow() method has a non-zero
> size, but every element is null.
>
>
>
> I've tried specifying a custom trigger between window() and aggregate(),
> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>
>
>
> Am I missing something?
>
>
>
> Regards,
>
> Alexis.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


flink on yarn 模式,在jar任务中,怎么获取rest port

2024-07-31 Thread melin li
flink on yarn 模式, rest port 是随机的,需要获取rest port,有什么好办法?


Re: Elasticsearch 8.x Connector in Maven

2024-07-31 Thread Rion Williams
Hi again all,

Just following up on this as I’ve scoured around trying to find any 
documentation for using the ES 8.x connector, however everything only appears 
to reference 6/7.

The ES 8.x seems to have been released for quite a bit of time, so I’m curious 
how others are using it. I’d really like to avoid doing something like forking 
the bits I need in my local repository if possible and building it on my own. 

Thanks in advance,

Rion

> On Jul 30, 2024, at 1:00 PM, Rion Williams  wrote:
> 
> Hi all,
> 
> I see that the Elasticsearch Connector for 8.x is supported per the repo (and 
> completed JIRAs). Is there a way to reference this via Maven? Or is it 
> required to build the connector from the source directly?
> 
> We recently upgraded an Elasticsearch cluster to 8.x and some of the writes 
> are now failing from the 7.x RHLC, so trying to upgrade to get to parity (and 
> stop the jobs from crashing).
> 
> Thanks in advance,
> 
> Rion


RE: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Schwalbe Matthias
Hi Alexis,

Just a couple of points to double-check:

  *   Does your code compile? (the second argument of withOperator(..) should 
derive StateBootstrapTransformation instead of SingleOutputStreamOperator)
  *   From the documentation of savepoint API you’ll find examples for each 
type of state
  *   Your preparation needs to generate events that within your 
StateBootstrapTransformation impementation get set into state primitives much 
the same as you would do with a normal streaming operator
  *   Please note that a savepoint api job always runs in batch-mode, hence
 *   Keyed events are processed in key order first and the in time order
 *   Triggers will only be fired after processing all events of a 
respective key are processed
 *   Semantics are therefore slightly different as for streaming timers

Hope that helps 

Thias




From: Alexis Sarda-Espinosa 
Sent: Monday, July 29, 2024 9:18 PM
To: user 
Subject: Using state processor for a custom windowed aggregate function

Hello,

I am trying to create state for an aggregate function that is used with a 
GlobalWindow. This basically looks like:


savepointWriter.withOperator(
OperatorIdentifier.forUid(UID),
OperatorTransformation.bootstrapWith(stateToMigrate)
.keyBy(...)
.window(GlobalWindows.create())
.aggregate(new AggregateFunctionForMigration())
)
The job runs correctly and writes a savepoint, but if I then read the savepoint 
I just created and load the state for that UID, the "elements" iterable in the 
WindowReaderFunction's readWindow() method has a non-zero size, but every 
element is null.

I've tried specifying a custom trigger between window() and aggregate(), always 
returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.

Am I missing something?

Regards,
Alexis.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


[Request Help] Flink StreamRecord granularity latency metrics

2024-07-31 Thread Yubin Li
Hi everyone,

We are focusing on improving observability for Flink, we have a vision
to make the latency of every business stream record observable, is
there any idea to implement the feature? looking forward to your
suggestions!


Re:Changing GC settings of Taskmanager

2024-07-30 Thread Xuyang
Hi, Banu.

Could you check whether the "Configuration" icon under the "Task Managers" and 
"Job Manager" buttons on the left side of the Flink-UI 

shows that the currently effective flink conf includes these JVM changes? I 
suspect that you are using a session cluster mode, where 

changes to the yaml file only affect the job, not the cluster.




--

Best!
Xuyang




At 2024-07-30 16:23:24, "banu priya"  wrote:

Hi All,


I am trying to change GC settings of Taskmanager. I am using flink 1.18 and 
Java 8. Jave 8 uses parallel GC by default. I need to change it to use G1GC.
In flink-conf.yaml, I passed the property ( refer attached image line no. 357). 
But it is not reflecting. What am I doing wrong? 




Thanks 
Banu



Elasticsearch 8.x Connector in Maven

2024-07-30 Thread Rion Williams
Hi all,

I see that the Elasticsearch Connector for 8.x is supported per the repo (and 
completed JIRAs). Is there a way to reference this via Maven? Or is it required 
to build the connector from the source directly?

We recently upgraded an Elasticsearch cluster to 8.x and some of the writes are 
now failing from the 7.x RHLC, so trying to upgrade to get to parity (and stop 
the jobs from crashing).

Thanks in advance,

Rion 

Re: checkpoint upload thread

2024-07-30 Thread Yanfei Lei
Hi Enric,

If I understand correctly, one subtask would use one
`asyncOperationsThreadPool`[1,2], it is possible to use the same
connection for an operator chain.

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L443
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L716

Enric Ott <243816...@qq.com> 于2024年7月30日周二 11:11写道:
>
> Hi,Community:
>   Does Flink upload states and inflight buffers within the same opratorchain 
> using the same connection (instead of per connection per operator)?



-- 
Best,
Yanfei


KafkaSink self-defined RoundRobinPartitioner not able to discover new partitions

2024-07-30 Thread Lei Wang
I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use
it as following:

KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(sinkServers).setKafkaProducerConfig(props)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkRoundRobinPartitioner<>()).build()).build();

But when the partition number is changed(becomes larger), no data is
written to the new partitions.
I looked at the source code, it seems because the KafkaSinkContext can not
retrieve the partitions dynamically.
Is there any way to fix this?

Thanks,
Lei


Re: custom metric reporter

2024-07-30 Thread Dominik.Buenzli
Hi Sigalit

Did you add the classpath to the META-INF.services  folder of the reporter?

[cid:image001.png@01DAE256.1C0ADDF0]

The content of my file is:

org.apache.flink.metrics.custom.NestedGaugePrometheusReporterFactory

Kind regards

Dominik

From: Sigalit Eliazov 
Date: Monday, 29 July 2024 at 22:14
To: user 
Subject: custom metric reporter
Be aware: This is an external email.

hi
we have upgraded from flink 1.16 to 1.18 and our custom metric stopped working.
i saw in the release note of 1.17 that there was a change so i have defined the 
following

metrics.reporters: otlp
metrics.reporter.otlp.factory.class: 
xxx.flink.metrics.otlp.OpenTelemetryProtocolReporterFactory
metrics.reporter.otlp.interval: 30s
metrics.reporter.otlp.filter: 
metrics.reporter.otlp.prefix: pipeline_

i have defined OpenTelemetryProtocolReporterFactory that implements 
MetricReporterFactory

i verified the jar with my reporter and factory exists under flink/plugins

and still i get the following warning during deployment which indicates that my 
reporter is not loaded

2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: external-resource-gpu
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-datadog
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-graphite
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-influx
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-jmx
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-prometheus
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-slf4j
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-statsd
2024-07-29 19:52:37,836 INFO  org.apache.flink.core.plugin.DefaultPluginManager 
   [] - Plugin loader with ID found, reusing it: metrics-otlp

2024-07-29 14:27:31,727 WARN  org.apache.flink.runtime.metrics.ReporterSetup
   [] - The reporter factory 
(xxx.flink.infra.metric.OpenTelemetryProtocolReporterFactory) could not be 
found for reporter otlp.

Available factories: 
[org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, 
org.apache.flink.metrics.slf4j.Slf4jReporterFactory, 
org.apache.flink.metrics.graphite.GraphiteReporterFactory,

org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, 
org.apache.flink.metrics.statsd.StatsDReporterFactory, 
org.apache.flink.metrics.prometheus.PrometheusReporterFactory,

org.apache.flink.metrics.jmx.JMXReporterFactory, 
org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].

2024-07-29 14:27:31,732 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - No metrics 
reporter configured, no metrics will be exposed/reported.

I am not sure why the new reporter is not loaded. i did not see any exception

Is there any additional configuration required?



Thanks

Sigalit








checkpoint upload thread

2024-07-29 Thread Enric Ott
Hi,Community:
 Does Flink upload states and inflight buffers within the same 
opratorchain using the same connection (instead of per connection per operator)?

Re: [Request Help] flinkcdc start with errorjava.lang.NoClassDefFoundError:org/apache/flink/cdc/common/sink/MetadataApplier

2024-07-29 Thread 424767284
Hi Xiqian,
Thanks,Xiqian.I checked the Flink Home /lib directory and found i 
putflink-cdc-pipeline-connector-mysql-3.1.jar,flink-cdc-pipeline-connector-doris-3.1.jarin
 /lib directory. I remove these two and go right.I think maybe jar conflicts. I 
had use flink-cdc 3.0 and it go right


Regards,
Qijun





发自我的iPhone


-- Original --
From: Xiqian YU https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/get-started/quickstart/mysql-to-doris/
 and start with flink-cdc.sh and get an error 
 error java.lang.NoClassDefFoundError: 
org/apache/flink/cdc/common/sink/MetadataApplier
 is there anything wrong

custom metric reporter

2024-07-29 Thread Sigalit Eliazov
hi
we have upgraded from flink 1.16 to 1.18 and our custom metric stopped
working.
i saw in the release note of 1.17 that there was a change so i have defined
the following

metrics.reporters: otlp
metrics.reporter.otlp.factory.class:
xxx.flink.metrics.otlp.OpenTelemetryProtocolReporterFactory
metrics.reporter.otlp.interval: 30s
metrics.reporter.otlp.filter: 
metrics.reporter.otlp.prefix: pipeline_

i have defined OpenTelemetryProtocolReporterFactory that implements
MetricReporterFactory

i verified the jar with my reporter and factory exists under flink/plugins

and still i get the following warning during deployment which
indicates that my reporter is not loaded

2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: external-resource-gpu
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-datadog
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-graphite
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-influx
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-jmx
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-prometheus
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-slf4j
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: metrics-statsd
2024-07-29 19:52:37,836 INFO
org.apache.flink.core.plugin.DefaultPluginManager[] -
Plugin loader with ID found, reusing it: *metrics-otlp*

2024-07-29 14:27:31,727 WARN
org.apache.flink.runtime.metrics.ReporterSetup   [] - The
reporter factory
(xxx.flink.infra.metric.OpenTelemetryProtocolReporterFactory) could
not be found for reporter otlp.

Available factories:
[org.apache.flink.metrics.datadog.DatadogHttpReporterFactory,
org.apache.flink.metrics.slf4j.Slf4jReporterFactory,
org.apache.flink.metrics.graphite.GraphiteReporterFactory,

org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory,
org.apache.flink.metrics.statsd.StatsDReporterFactory,
org.apache.flink.metrics.prometheus.PrometheusReporterFactory,

org.apache.flink.metrics.jmx.JMXReporterFactory,
org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].

2024-07-29 14:27:31,732 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - No
metrics reporter configured, no metrics will be exposed/reported.

I am not sure why the new reporter is not loaded. i did not see any exception

Is there any additional configuration required?


Thanks

Sigalit


Using state processor for a custom windowed aggregate function

2024-07-29 Thread Alexis Sarda-Espinosa
Hello,

I am trying to create state for an aggregate function that is used with a
GlobalWindow. This basically looks like:

savepointWriter.withOperator(
OperatorIdentifier.forUid(UID),
OperatorTransformation.bootstrapWith(stateToMigrate)
.keyBy(...)
.window(GlobalWindows.create())
.aggregate(new AggregateFunctionForMigration())
)

The job runs correctly and writes a savepoint, but if I then read the
savepoint I just created and load the state for that UID, the "elements"
iterable in the WindowReaderFunction's readWindow() method has a non-zero
size, but every element is null.

I've tried specifying a custom trigger between window() and aggregate(),
always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.

Am I missing something?

Regards,
Alexis.


[ANNOUNCE] Apache Celeborn 0.5.1 available

2024-07-29 Thread Ethan Feng
Hello all,

Apache Celeborn community is glad to announce the
new release of Apache Celeborn 0.5.1.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, highly efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

- https://github.com/apache/celeborn/releases/tag/v0.5.1

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.5.1

Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Regards,
Ethan Feng
On behalf of the Apache Celeborn community


[ANNOUNCE] Apache Celeborn 0.4.2 available

2024-07-29 Thread Fu Chen
Hello all,

Apache Celeborn community is glad to announce the
new release of Apache Celeborn 0.4.2.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, highly efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

- https://github.com/apache/celeborn/releases/tag/v0.4.2

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.2

Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Regards,
Fu Chen
On behalf of the Apache Celeborn community


Re: Expose rocksdb options for flush thread.

2024-07-29 Thread Gabor Somogyi
This has been not moved for a while so assigned to you.

G


On Mon, Jul 15, 2024 at 9:06 AM Zhongyou Lee 
wrote:

> Hellow everyone :
>
> Up to now, To adjuest rocksdb flush thread the only way is implement
> ConfigurableRocksDBOptionsFactory #setMaxBackgroundFlushes by user. I found
> FLINK-22059 to solve this problem. The pr has never been executed, i want
> to finish this pr. Can anyone assignee this pr to me ? My jira id is
> :zhongyou.lee.
>
>
>-
>
>benefity from this pr
>
>
>1.
>
>Improve rocksdb write performance . Such as: multi slot per
>TaskManagers , savepoint recovery.
>
>
>-
>
>how to finish those pr
>
>
>1.
>
>Add state.backend.rocksdb.flush.thread.num in
>RocksDBConfigurableOptions and flink configure document description
>2.
>
>Use currentOptions.getEnv().setBackgroundThreads() instend of
>setMaxBackgroundFlushes method to change thread.(which support in 5.17 and
>above rocksdb)
>3.
>
>tell user above rocksdb 5.17 and below version rocksdb
>setMaxBackgroundFlushes api has problem as below:
>https://github.com/facebook/rocksdb/issues/4847.
>
>


Re: SavepointReader: Record size is too large for CollectSinkFunction

2024-07-29 Thread Gabor Somogyi
I've double checked and I think that CollectSinkOperatorFactory is
initialized in DataStream.collectAsync without MAX_BATCH_SIZE
and SOCKET_TIMEOUT values coming from the Flink config.
Could you plz share the whole stacktrace to double check my assumption?

G


On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara 
wrote:

> Hi all,
>
> Just to share my findings so far. Regarding tweaking the setting, it has
> been impossible for me to do so. So, the only way to work around this has
> been to duplicate some Flink code directly to allow me to do the tweak.
> More precisely, this is how my code looks like now (kudos to my dear
> colleague Den!):
>
> ```
>   private static  List executeAndCollect(DataStream dataStream,
> StreamExecutionEnvironment env,
>String maxBatchSize, int
> limit) throws Exception {
>
> TypeSerializer serializer =
> dataStream.getType().createSerializer(env.getConfig());
> String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
>
> CollectSinkOperatorFactory factory =
> new CollectSinkOperatorFactory<>(serializer, accumulatorName,
> MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue());
> CollectSinkOperator operator =
> (CollectSinkOperator) factory.getOperator();
> CollectResultIterator iterator =
> new CollectResultIterator<>(
> operator.getOperatorIdFuture(),
> serializer,
> accumulatorName,
> env.getCheckpointConfig());
> CollectStreamSink sink = new CollectStreamSink<>(dataStream,
> factory);
> sink.name("Data stream collect sink");
> env.addOperator(sink.getTransformation());
>
> final JobClient jobClient = env.executeAsync("DataStream Collect");
> iterator.setJobClient(jobClient);
>
> var clientAndIterator = new ClientAndIterator<>(jobClient, iterator);
> List results = new ArrayList<>(limit);
> while (limit > 0 && clientAndIterator.iterator.hasNext()) {
>   results.add(clientAndIterator.iterator.next());
>   limit--;
> }
> return results;
>   }
> ```
>
> Essentially, I'm just adding a parameter to the CollectSinkOperatorFactory
> constructor here:
> -
> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378
>
> This works but it's obviously inconvenient for the user. If this
> limitation is confirmed, Den and I will be glad to send a MR to fix that.
>
> Makes sense?
>
> Regards,
>
> Salva
>
> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara 
> wrote:
>
>> The same happens with this slight variation:
>>
>> ```
>> Configuration config = new Configuration();
>> config.setString("collect-sink.batch-size.max", "100mb");
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.configure(config);
>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
>> HashMapStateBackend());
>> ```
>>
>> Salva
>>
>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara 
>> wrote:
>>
>>> Hi Zhanghao,
>>>
>>> Thanks for your suggestion. Unfortunately, this does not work, I still
>>> get the same error message:
>>>
>>> ```
>>> Record size is too large for CollectSinkFunction. Record size is 9623137
>>> bytes, but max bytes per batch is only 2097152 bytes.
>>> Please consider increasing max bytes per batch value by setting
>>> collect-sink.batch-size.max
>>> ```
>>>
>>> The code looks like this now:
>>>
>>> ```
>>> Configuration config = new Configuration();
>>> config.setString("collect-sink.batch-size.max", "10mb");
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
>>> HashMapStateBackend());
>>>
>>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>>> MatcherReader());
>>> var matcherState = matcher.executeAndCollect(1000);
>>> ```
>>>
>>> I have tried other ways but none has worked (the setting is always
>>> ignored in the end).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>>
>>>
>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen 
>>> wrote:
>>>
 Hi, you could increase it as follows:

 Configuration config = new Configuration();
 config.setString(collect-sink.batch-size.max, "10mb");
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment(config);
 --
 *From:* Salva Alcántara 
 *Sent:* Saturday, July 20, 2024 15:05
 *To:* user 
 *Subject:* SavepointReader: Record size is too large for
 CollectSinkFunction

 Hi all!

 I'm trying to debug a job via inspecting its savepoints but I'm getting
 this error message:

 ```
 Caused by: java.lang.RuntimeException: Record size is too large for
 CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
 is only 2097152 bytes. Please 

Re: [Request Help] flinkcdc start with error java.lang.NoClassDefFoundError: org/apache/flink/cdc/common/sink/MetadataApplier

2024-07-28 Thread Xiqian YU
Hi Qijun,

This error message usually implies that required dependencies were not met. 
Could you please confirm if you’ve placed all .jar files like this:


  *   Flink Home
 *   /lib
*   mysql-connector-java-8.0.27.jar
*   … other pre-bundled jars
  *   Flink CDC Home
 *   /lib
*   flink-cdc-dist-3.1.jar (pre-bundled)
*   flink-cdc-pipeline-connector-mysql-3.1.jar
*   flink-cdc-pipeline-connector-doris-3.1.jar

before executing ./flink-cdc.sh at Flink CDC Home.

Regards,
Xiqian

De : 424767284 
Date : samedi, 27 juillet 2024 à 09:25
À : user 
Objet : [Request Help] flinkcdc start with error 
java.lang.NoClassDefFoundError: org/apache/flink/cdc/common/sink/MetadataApplier
hi:
follow the guide of 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/get-started/quickstart/mysql-to-doris/
and start with flink-cdc.sh  and get an error
error java.lang.NoClassDefFoundError: 
org/apache/flink/cdc/common/sink/MetadataApplier
is there anything wrong


退订

2024-07-28 Thread 戴鹏
退订

[Request Help] flinkcdc start with error java.lang.NoClassDefFoundError: org/apache/flink/cdc/common/sink/MetadataApplier

2024-07-26 Thread 424767284
hi:
follow the guide of 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/get-started/quickstart/mysql-to-doris/
and start with flink-cdc.sh  and get an error 
error java.lang.NoClassDefFoundError: 
org/apache/flink/cdc/common/sink/MetadataApplier
is there anything wrong

Re: Tuning rocksdb configuration

2024-07-26 Thread Zakelly Lan
Hi Banupriya,

Sometimes a sst will not be compacted and will be referenced for a long
time. That depends on how rocksdb picks the files for compaction. It may
happen when some range of keys is never touched at some point of time,
since the rocksdb only takes care of the files or key range that gets large.
Typically you don't need to worry about this, except for the checkpoint
size keeps getting large for a long time.


Best,
Zakelly

On Fri, Jul 26, 2024 at 2:49 PM banu priya  wrote:

> Hi Zakelly,
>
> Thanks a lot for your reply.
>
> I have one more query,
>
> In side checkpoints chk-X directory there is a _metadata file, that
> contains list of other .sst files. In my chk-17000 directory it still
> refers to very old 00014.sst(latest is 225.sst). Why is it so??..
> compaction has happened and that's why 1 to 00013 are not present. Why
> it didn't compact the very old file yet.  1.Do I need to change any other
> rocksdb property? Or 2.does it means my source events are still coming to
> same key and keeps that state??
>
> Window fires for every 2s, so I don't need it the data for long time.
>
> Thanks
> Banupriya
>
> On Fri, 26 Jul, 2024, 11:46 am Zakelly Lan,  wrote:
>
>> Hi Banu,
>>
>> I'm trying to answer your question in brief:
>>
>> 1. Yes, when the memtable reaches the value you configured, a flush will
>> be triggered. And no, sst files have different format with memtables, the
>> size is smaller than 64mb IIUC.
>>
>> 2. Typically you don't need to change this value. If it is set to 2, when
>> 1 write buffer is being flushed to storage, new writes can continue to the
>> other write buffer. Increase this when the flush is too slow.
>>
>> 3. IIUC, bloom filter helps during point query, and window processing
>> requires point queries. So enabling this would help.
>>
>> 4. I'd suggest not setting this to 0. This only affects whether the
>> checkpoint data is stored inline in the metadata file. Maybe the checkpoint
>> size is a little bit different, but it has nothing to do with the
>> throughput.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Jul 25, 2024 at 3:25 PM banu priya  wrote:
>>
>>> Hi All,
>>>
>>> I have a flink job with RMQ Source, filters, tumbling window(uses
>>> processing time fires every 2s), aggregator, RMQ Sink. Enabled incremental
>>> rocksdb checkpoints for every 10s with minimum pause between checkpoints as
>>> 5s. My checkpoints size is keep on increasing , so I am planning to tune
>>> some rocksdb configuration.
>>>
>>>
>>>
>>> Following are my queries. Can someone help me choose a correct values.?
>>>
>>>
>>>
>>> 1.state.backend.rocksdb.writebuffer.size = 64 mb:
>>>
>>> Does it mean once write buffer (memtable) reaches 64 mb it will be
>>> flushed to disk as .sst file. Will .sst file also have size as 64mb?
>>>
>>>
>>>
>>> 2.state.backend.rocksdb.writebuffer.count = 2.
>>>
>>> My job is running with parallelism of 15 and 3 taskmanager(so 5 slots
>>> per taskmanager).  For single rocks DB folder, how can I choose the correct
>>> buffer count.?
>>>
>>> 3. do I need to enable bloom filter?
>>>
>>>  4. state.storage.fs.memory-threshold is 0 in my job. Does it have any
>>> effect in Taskmanager through put or check points size??
>>>
>>> Thanks
>>>
>>> Banu
>>>
>>


Re: Tuning rocksdb configuration

2024-07-26 Thread Zakelly Lan
Hi Banu,

I'm trying to answer your question in brief:

1. Yes, when the memtable reaches the value you configured, a flush will be
triggered. And no, sst files have different format with memtables, the size
is smaller than 64mb IIUC.

2. Typically you don't need to change this value. If it is set to 2, when 1
write buffer is being flushed to storage, new writes can continue to the
other write buffer. Increase this when the flush is too slow.

3. IIUC, bloom filter helps during point query, and window processing
requires point queries. So enabling this would help.

4. I'd suggest not setting this to 0. This only affects whether the
checkpoint data is stored inline in the metadata file. Maybe the checkpoint
size is a little bit different, but it has nothing to do with the
throughput.


Best,
Zakelly

On Thu, Jul 25, 2024 at 3:25 PM banu priya  wrote:

> Hi All,
>
> I have a flink job with RMQ Source, filters, tumbling window(uses
> processing time fires every 2s), aggregator, RMQ Sink. Enabled incremental
> rocksdb checkpoints for every 10s with minimum pause between checkpoints as
> 5s. My checkpoints size is keep on increasing , so I am planning to tune
> some rocksdb configuration.
>
>
>
> Following are my queries. Can someone help me choose a correct values.?
>
>
>
> 1.state.backend.rocksdb.writebuffer.size = 64 mb:
>
> Does it mean once write buffer (memtable) reaches 64 mb it will be flushed
> to disk as .sst file. Will .sst file also have size as 64mb?
>
>
>
> 2.state.backend.rocksdb.writebuffer.count = 2.
>
> My job is running with parallelism of 15 and 3 taskmanager(so 5 slots per
> taskmanager).  For single rocks DB folder, how can I choose the correct
> buffer count.?
>
> 3. do I need to enable bloom filter?
>
>  4. state.storage.fs.memory-threshold is 0 in my job. Does it have any
> effect in Taskmanager through put or check points size??
>
> Thanks
>
> Banu
>


Access to S3 - checkpoints

2024-07-25 Thread Sigalit Eliazov
Hi,

We are using Ceph buckets to store the checkpoints and savepoints, and the
access is done via the S3 protocol. Since we don't have any integration
with Hadoop, we added a dependency on flink-s3-fs-presto.

Our Flink configuration looks like this:


state.checkpoint-storage: filesystemstate.checkpoints.dir:
s3://my-bucket/flink_checkpoints/checkpointshigh-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactoryhigh-availability.storageDir:
s3://my-bucket/flink_ha_storages3.endpoint:
"https://rook-ceph-rgw-ocs-storagecluster-cephobjectstore.openshift-storage.svc.cluster.local:443"s3.path-style-access:
"true"s3.access-key: "my-access-key-id"s3.secret-key:
"my-secret-access-key"

However, we encounter the following error when trying to write a checkpoint:

java.util.concurrent.CompletionException:
com.amazonaws.SdkClientException: Unable to execute HTTP request: PKIX
path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target

When trying to connect without SSL and using the following endpoint:

s3.endpoint: "
http://rook-ceph-rgw-ocs-storagecluster-cephobjectstore.openshift-storage.svc.cluster.local:80
"

It works without issues.

I have seen different solutions that involve creating a new Flink image on
top of the community one, but I would prefer to avoid this. If you have
encountered this issue, I would appreciate any suggestions on the best
practice to solve this.

Thanks

Sigalit


Tuning rocksdb configuration

2024-07-25 Thread banu priya
Hi All,

I have a flink job with RMQ Source, filters, tumbling window(uses
processing time fires every 2s), aggregator, RMQ Sink. Enabled incremental
rocksdb checkpoints for every 10s with minimum pause between checkpoints as
5s. My checkpoints size is keep on increasing , so I am planning to tune
some rocksdb configuration.



Following are my queries. Can someone help me choose a correct values.?



1.state.backend.rocksdb.writebuffer.size = 64 mb:

Does it mean once write buffer (memtable) reaches 64 mb it will be flushed
to disk as .sst file. Will .sst file also have size as 64mb?



2.state.backend.rocksdb.writebuffer.count = 2.

My job is running with parallelism of 15 and 3 taskmanager(so 5 slots per
taskmanager).  For single rocks DB folder, how can I choose the correct
buffer count.?

3. do I need to enable bloom filter?

 4. state.storage.fs.memory-threshold is 0 in my job. Does it have any
effect in Taskmanager through put or check points size??

Thanks

Banu


Setting web.submit.enable to false doesn't allow flinksessionjobs to work when running in Kubernetes

2024-07-24 Thread Ralph Blaise
Setting web.submit.enable to false in a flinkdeployment deployed to kubernetes 
doesn't allow flinksessionjobs for it to work.  It instead result in the error 
below:

 
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [POST request not
allowed]"

I have to re-enable web.submit.enable in order for FlinkSessionJob to work 
again on Kubernetes. My goal is to make the dashboard available to users but 
not allow them to submit jobs through the UI.

Any help in this matter would be appreciated.

Best regards,

Ralph.

Running Flink 1.19.1. Same thing was happening with 1.19.0



Re: Troubleshooting checkpoint expiration

2024-07-23 Thread Alexis Sarda-Espinosa
Hi again,

I found a Hadoop class that can log latency information [1], but since I
don't see any exceptions in the logs when a checkpoint expires due to
timeout, I'm still wondering if I can change other log levels to get more
insights, maybe somewhere in Flink's file system abstractions?

[1]
https://hadoop.apache.org/docs/r3.2.4/hadoop-azure/abfs.html#Perf_Options

Regards,
Alexis.

Am Fr., 19. Juli 2024 um 09:17 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hello,
>
> We have a Flink job that uses ABFSS for checkpoints and related state.
> Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
> guessing that's an issue in the infrastructure or on Azure's side, but I
> was wondering if there are Flink/Hadoop Java packages that log potentially
> useful information if we DEBUG/TRACE them?
>
> Regards,
> Alexis.
>
>


Re: SavepointReader: Record size is too large for CollectSinkFunction

2024-07-23 Thread Salva Alcántara
Hi all,

Just to share my findings so far. Regarding tweaking the setting, it has
been impossible for me to do so. So, the only way to work around this has
been to duplicate some Flink code directly to allow me to do the tweak.
More precisely, this is how my code looks like now (kudos to my dear
colleague Den!):

```
  private static  List executeAndCollect(DataStream dataStream,
StreamExecutionEnvironment env,
   String maxBatchSize, int
limit) throws Exception {

TypeSerializer serializer =
dataStream.getType().createSerializer(env.getConfig());
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();

CollectSinkOperatorFactory factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName,
MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue());
CollectSinkOperator operator =
(CollectSinkOperator) factory.getOperator();
CollectResultIterator iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig());
CollectStreamSink sink = new CollectStreamSink<>(dataStream,
factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());

final JobClient jobClient = env.executeAsync("DataStream Collect");
iterator.setJobClient(jobClient);

var clientAndIterator = new ClientAndIterator<>(jobClient, iterator);
List results = new ArrayList<>(limit);
while (limit > 0 && clientAndIterator.iterator.hasNext()) {
  results.add(clientAndIterator.iterator.next());
  limit--;
}
return results;
  }
```

Essentially, I'm just adding a parameter to the CollectSinkOperatorFactory
constructor here:
-
https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378

This works but it's obviously inconvenient for the user. If this limitation
is confirmed, Den and I will be glad to send a MR to fix that.

Makes sense?

Regards,

Salva

On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara 
wrote:

> The same happens with this slight variation:
>
> ```
> Configuration config = new Configuration();
> config.setString("collect-sink.batch-size.max", "100mb");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.configure(config);
> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
> HashMapStateBackend());
> ```
>
> Salva
>
> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara 
> wrote:
>
>> Hi Zhanghao,
>>
>> Thanks for your suggestion. Unfortunately, this does not work, I still
>> get the same error message:
>>
>> ```
>> Record size is too large for CollectSinkFunction. Record size is 9623137
>> bytes, but max bytes per batch is only 2097152 bytes.
>> Please consider increasing max bytes per batch value by setting
>> collect-sink.batch-size.max
>> ```
>>
>> The code looks like this now:
>>
>> ```
>> Configuration config = new Configuration();
>> config.setString("collect-sink.batch-size.max", "10mb");
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
>> HashMapStateBackend());
>>
>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>> MatcherReader());
>> var matcherState = matcher.executeAndCollect(1000);
>> ```
>>
>> I have tried other ways but none has worked (the setting is always
>> ignored in the end).
>>
>> Regards,
>>
>> Salva
>>
>>
>>
>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen 
>> wrote:
>>
>>> Hi, you could increase it as follows:
>>>
>>> Configuration config = new Configuration();
>>> config.setString(collect-sink.batch-size.max, "10mb");
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>> --
>>> *From:* Salva Alcántara 
>>> *Sent:* Saturday, July 20, 2024 15:05
>>> *To:* user 
>>> *Subject:* SavepointReader: Record size is too large for
>>> CollectSinkFunction
>>>
>>> Hi all!
>>>
>>> I'm trying to debug a job via inspecting its savepoints but I'm getting
>>> this error message:
>>>
>>> ```
>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
>>> is only 2097152 bytes. Please consider increasing max bytes per batch value
>>> by setting collect-sink.batch-size.max
>>> ```
>>>
>>> My code looks like this:
>>>
>>> ```
>>>   private static void run(String savepointPath) throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> org.apache.flink.state.api.SavepointReader savepoint =
>>> org.apache.flink.state.api.SavepointReader.read(env,
>>> savepointPath, new HashMapStateBackend());
>>>
>>> var operator 

Re: flight buffer local storage

2024-07-22 Thread Enric Ott
Thanks,Zhanghao.
I think it's the async upload mechanism helped mitigating the in flight buffers 
materialization latency,and the execution vertex restarting procedure just 
reads the in flight buffers and the local TaskStateSnapshots to make its 
job done.




--Original--
From:   
 "Zhanghao Chen"



Re: Flink state

2024-07-22 Thread Saurabh Singh
Hi Banu,

Rocksdb is intelligently built to clear any un-useful state from its
purview. So you should be good and any required cleanup will be
automatically done by RocksDb itself.
>From the current documentation, it looks quite hard to relate Flink
Internal DS activity to RocksDB DS activity. In my opinion, it is
indifferent to the application.
This is a good doc that gives insights -
https://flink.apache.org/2018/01/30/managing-large-state-in-apache-flink-an-intro-to-incremental-checkpointing/#how-it-works

Regards
Saurabh


On Mon, Jul 22, 2024 at 10:38 AM banu priya  wrote:

> Dear Community,
>
> Gentle reminder about my below email.
>
> Thanks
> Banu
>
> On Sat, 20 Jul, 2024, 4:37 pm banu priya,  wrote:
>
>> Hi All,
>>
>> I have a flink job with RMQ Source, filters, tumbling window(uses
>> processing time fires every 2s), aggregator, RMQ Sink.
>>
>> I am trying to understand about states and checkpoints(enabled
>> incremental rocksdb checkpoints).
>>
>> In local rocks db directory, I have .sst files, log, lock, options files.
>>
>> I read that states are cleared once tumbling window is fired. Does it
>> mean my local rocksdb directory's .sst files will be deleted once windows
>> fires??. I understand that compaction happens once .sst files reaches (64MB
>> it is configurable). Is there any other way .sst files will be/can be
>> deleted when tumbling window fires??
>>
>> Happy Learning. Happy Weekend.
>>
>> Thanks
>> Banu
>>
>>


Re: SavepointReader: Record size is too large for CollectSinkFunction

2024-07-22 Thread Salva Alcántara
The same happens with this slight variation:

```
Configuration config = new Configuration();
config.setString("collect-sink.batch-size.max", "100mb");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.configure(config);
SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
HashMapStateBackend());
```

Salva

On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara 
wrote:

> Hi Zhanghao,
>
> Thanks for your suggestion. Unfortunately, this does not work, I still get
> the same error message:
>
> ```
> Record size is too large for CollectSinkFunction. Record size is 9623137
> bytes, but max bytes per batch is only 2097152 bytes.
> Please consider increasing max bytes per batch value by setting
> collect-sink.batch-size.max
> ```
>
> The code looks like this now:
>
> ```
> Configuration config = new Configuration();
> config.setString("collect-sink.batch-size.max", "10mb");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
> HashMapStateBackend());
>
> var matcher = savepoint.readKeyedState("Raw Triggers", new
> MatcherReader());
> var matcherState = matcher.executeAndCollect(1000);
> ```
>
> I have tried other ways but none has worked (the setting is always ignored
> in the end).
>
> Regards,
>
> Salva
>
>
>
> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen 
> wrote:
>
>> Hi, you could increase it as follows:
>>
>> Configuration config = new Configuration();
>> config.setString(collect-sink.batch-size.max, "10mb");
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>> --
>> *From:* Salva Alcántara 
>> *Sent:* Saturday, July 20, 2024 15:05
>> *To:* user 
>> *Subject:* SavepointReader: Record size is too large for
>> CollectSinkFunction
>>
>> Hi all!
>>
>> I'm trying to debug a job via inspecting its savepoints but I'm getting
>> this error message:
>>
>> ```
>> Caused by: java.lang.RuntimeException: Record size is too large for
>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
>> is only 2097152 bytes. Please consider increasing max bytes per batch value
>> by setting collect-sink.batch-size.max
>> ```
>>
>> My code looks like this:
>>
>> ```
>>   private static void run(String savepointPath) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> org.apache.flink.state.api.SavepointReader savepoint =
>> org.apache.flink.state.api.SavepointReader.read(env,
>> savepointPath, new HashMapStateBackend());
>>
>> var operator = savepoint.readKeyedState("uuid", new
>> MyKeyedOperatorReader());
>> var operatorState = matcher.executeAndCollect(1000);
>>   }
>> ```
>>
>> I haven't found the way to increase the `collect-sink.batch-size.max` as
>> suggested in the error msg.
>>
>> Any help on this will be appreciated!
>>
>> Regards,
>>
>> Salva
>>
>


Re: SavepointReader: Record size is too large for CollectSinkFunction

2024-07-22 Thread Salva Alcántara
Hi Zhanghao,

Thanks for your suggestion. Unfortunately, this does not work, I still get
the same error message:

```
Record size is too large for CollectSinkFunction. Record size is 9623137
bytes, but max bytes per batch is only 2097152 bytes.
Please consider increasing max bytes per batch value by setting
collect-sink.batch-size.max
```

The code looks like this now:

```
Configuration config = new Configuration();
config.setString("collect-sink.batch-size.max", "10mb");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
HashMapStateBackend());

var matcher = savepoint.readKeyedState("Raw Triggers", new MatcherReader());
var matcherState = matcher.executeAndCollect(1000);
```

I have tried other ways but none has worked (the setting is always ignored
in the end).

Regards,

Salva



On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen 
wrote:

> Hi, you could increase it as follows:
>
> Configuration config = new Configuration();
> config.setString(collect-sink.batch-size.max, "10mb");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> --
> *From:* Salva Alcántara 
> *Sent:* Saturday, July 20, 2024 15:05
> *To:* user 
> *Subject:* SavepointReader: Record size is too large for
> CollectSinkFunction
>
> Hi all!
>
> I'm trying to debug a job via inspecting its savepoints but I'm getting
> this error message:
>
> ```
> Caused by: java.lang.RuntimeException: Record size is too large for
> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
> is only 2097152 bytes. Please consider increasing max bytes per batch value
> by setting collect-sink.batch-size.max
> ```
>
> My code looks like this:
>
> ```
>   private static void run(String savepointPath) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> org.apache.flink.state.api.SavepointReader savepoint =
> org.apache.flink.state.api.SavepointReader.read(env,
> savepointPath, new HashMapStateBackend());
>
> var operator = savepoint.readKeyedState("uuid", new
> MyKeyedOperatorReader());
> var operatorState = matcher.executeAndCollect(1000);
>   }
> ```
>
> I haven't found the way to increase the `collect-sink.batch-size.max` as
> suggested in the error msg.
>
> Any help on this will be appreciated!
>
> Regards,
>
> Salva
>


Re: Flink Slot request bulk is not fulfillable!

2024-07-22 Thread Saurabh Singh
Hi Li,

The error suggests that Job is not able to acquire the required TaskManager
TaskSlots within the configured time duration of 5 minutes.
Job Runs on the TaskManagers (Worker Nodes). Helpful Link -
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#anatomy-of-a-flink-cluster
TaskSlot information is available in the Flink GUI as below.
If you see available as 0, and the job is not running, it means it is not
able to acquire the required resources (CPU/MEM).
For example - 1 taskslot = 1 compute resource which could be 1 CPU and 4GB
of RAM, this is the config that you define and you get possible Task Slots
for your node type.

[image: image.png]
Thanks
Saurabh

On Mon, Jul 22, 2024 at 11:01 AM Li Shao  wrote:

> Hi All,
> We are using flink batch mode to process s3 files. However, recently we
> are seeing the errors like:
>
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
>
> at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source)
>
> at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source)
>
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
> Source)
>
> ... 36 more
>
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
>
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>
> ... 29 more
>
> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> 30 ms
>
> ... 30 more
>
>
> I can see the job manager & task manager still got memory to run jobs.
> Anyone can help me on this?
>
> Job manager: JVM: 5.02 GB / 6.00 GB
> jobmanager.heap.size 6144m
> jobmanager.memory.heap.size 6442450944b
> jobmanager.memory.jvm-metaspace.size 4294967296b
> jobmanager.memory.jvm-overhead.max 1073741824b
> jobmanager.memory.jvm-overhead.min 1073741824b
> jobmanager.memory.off-heap.size 134217728b
>


Flink Slot request bulk is not fulfillable!

2024-07-21 Thread Li Shao
Hi All,
We are using flink batch mode to process s3 files. However, recently we are
seeing the errors like:


Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout

at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source)

at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
Source)

at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source)

... 36 more

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout

at
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)

... 29 more

Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
30 ms

... 30 more


I can see the job manager & task manager still got memory to run jobs.
Anyone can help me on this?

Job manager: JVM: 5.02 GB / 6.00 GB
jobmanager.heap.size 6144m
jobmanager.memory.heap.size 6442450944b
jobmanager.memory.jvm-metaspace.size 4294967296b
jobmanager.memory.jvm-overhead.max 1073741824b
jobmanager.memory.jvm-overhead.min 1073741824b
jobmanager.memory.off-heap.size 134217728b


Re: Flink state

2024-07-21 Thread banu priya
Dear Community,

Gentle reminder about my below email.

Thanks
Banu

On Sat, 20 Jul, 2024, 4:37 pm banu priya,  wrote:

> Hi All,
>
> I have a flink job with RMQ Source, filters, tumbling window(uses
> processing time fires every 2s), aggregator, RMQ Sink.
>
> I am trying to understand about states and checkpoints(enabled incremental
> rocksdb checkpoints).
>
> In local rocks db directory, I have .sst files, log, lock, options files.
>
> I read that states are cleared once tumbling window is fired. Does it mean
> my local rocksdb directory's .sst files will be deleted once windows
> fires??. I understand that compaction happens once .sst files reaches (64MB
> it is configurable). Is there any other way .sst files will be/can be
> deleted when tumbling window fires??
>
> Happy Learning. Happy Weekend.
>
> Thanks
> Banu
>
>


Re: flight buffer local storage

2024-07-21 Thread Zhanghao Chen
By default, Flink uses aligned checkpoint where we wait for all in-flight data 
before the barriers to be fully processed and then make the checkpoints. 
There's no in need to store in-flight buffers in this case at the cost of 
additional barrier alignment, which may take a long time at the presence of 
backpressure. Unaligned checkpoint is introduced to solve this problem, where 
in-flight buffers are stored in cp without the need of alignment.

Best,
Zhanghao Chen

From: Enric Ott <243816...@qq.com>
Sent: Wednesday, July 17, 2024 16:35
To: user@flink.apache.org 
Subject: flight buffer local storage

Hello,Community:
  Why doesn't flink store in flight buffers to local disks when it checkpoints?
  Thanks.


Re: SavepointReader: Record size is too large for CollectSinkFunction

2024-07-21 Thread Zhanghao Chen
Hi, you could increase it as follows:

Configuration config = new Configuration();
config.setString(collect-sink.batch-size.max, "10mb");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);

From: Salva Alcántara 
Sent: Saturday, July 20, 2024 15:05
To: user 
Subject: SavepointReader: Record size is too large for CollectSinkFunction

Hi all!

I'm trying to debug a job via inspecting its savepoints but I'm getting this 
error message:

```
Caused by: java.lang.RuntimeException: Record size is too large for 
CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch is 
only 2097152 bytes. Please consider increasing max bytes per batch value by 
setting collect-sink.batch-size.max
```

My code looks like this:

```
  private static void run(String savepointPath) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
org.apache.flink.state.api.SavepointReader savepoint =
org.apache.flink.state.api.SavepointReader.read(env, savepointPath, new 
HashMapStateBackend());

var operator = savepoint.readKeyedState("uuid", new 
MyKeyedOperatorReader());
var operatorState = matcher.executeAndCollect(1000);
  }
```

I haven't found the way to increase the `collect-sink.batch-size.max` as 
suggested in the error msg.

Any help on this will be appreciated!

Regards,

Salva


Flink state

2024-07-20 Thread banu priya
Hi All,

I have a flink job with RMQ Source, filters, tumbling window(uses
processing time fires every 2s), aggregator, RMQ Sink.

I am trying to understand about states and checkpoints(enabled incremental
rocksdb checkpoints).

In local rocks db directory, I have .sst files, log, lock, options files.

I read that states are cleared once tumbling window is fired. Does it mean
my local rocksdb directory's .sst files will be deleted once windows
fires??.

Happy Learning. Happy Weekend.

Thanks
Banu


SavepointReader: Record size is too large for CollectSinkFunction

2024-07-20 Thread Salva Alcántara
Hi all!

I'm trying to debug a job via inspecting its savepoints but I'm getting
this error message:

```
Caused by: java.lang.RuntimeException: Record size is too large for
CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
is only 2097152 bytes. Please consider increasing max bytes per batch value
by setting collect-sink.batch-size.max
```

My code looks like this:

```
  private static void run(String savepointPath) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
org.apache.flink.state.api.SavepointReader savepoint =
org.apache.flink.state.api.SavepointReader.read(env, savepointPath,
new HashMapStateBackend());

var operator = savepoint.readKeyedState("uuid", new
MyKeyedOperatorReader());
var operatorState = matcher.executeAndCollect(1000);
  }
```

I haven't found the way to increase the `collect-sink.batch-size.max` as
suggested in the error msg.

Any help on this will be appreciated!

Regards,

Salva


Re: 如何基于FLIP-384扩展对业务数据全链路延时情况的监控

2024-07-19 Thread YH Zhu
退订

Yubin Li  于2024年7月18日周四 14:23写道:

> Hi, all
>
> 目前FLIP-384[1]支持了检查点、任务恢复的trace可观测,但实际业务场景中常需要监测每条业务数据在数据链路的各个节点流转过程中的延时情况,请问有什么好的思路吗
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
>
> Best,
> Yubin
>


Troubleshooting checkpoint expiration

2024-07-19 Thread Alexis Sarda-Espinosa
Hello,

We have a Flink job that uses ABFSS for checkpoints and related state.
Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
guessing that's an issue in the infrastructure or on Azure's side, but I
was wondering if there are Flink/Hadoop Java packages that log potentially
useful information if we DEBUG/TRACE them?

Regards,
Alexis.


Re: Event de duplication in flink with rabbitmq connector

2024-07-18 Thread Ahmed Hamdy
Yes, The current implementation doesn't leverage transactions on publish
like it does for the source on acking and nacking the deliveries, you can
raise a ticket to support exactly once RMQSinks within the community or
implement the logic yourself.

my checkpoints size is increasing.  can this lead to state build up. As
> checkpoints might need to keep the state so windows can't purge them??


No, In theory after the window is materialized that instance state is
purged, on checkpointing active window states are recorded, Checkpoints
here record a snapshot of the pipeline rather than the whole progress of an
operator however some other operators do have to accumulate state and flush
on checkpoints (similar to the RMQSource in this case).
Debugging why you have an inflating state might need a deep dive on your
data flow and job performance for bottlenecks and also experiment with
different configurations for rocksdb compaction.



Best Regards
Ahmed Hamdy


On Thu, 18 Jul 2024 at 13:52, banu priya  wrote:

> Hi Ahmed,
>
> Thanks for the clarification. I see from flink documentation that Kafka
> sinks are transactional and de duplication happens for it..but it is not
> applicable for RMQ sink.
>
> But i have to use RMQ Sink only due to project requirements .
>
> I am facing one more issue i.e. my check points size is increasing. What I
> understand is after tumbling window state is cleared. I had tumbling window
> (that uses processing time and triggers every 2s) and check point interval
> of 10s, can this lead to state build up. As checkpoints might need to keep
> the state so windows can't purge them??
>
>
> Thanks
> Banu
>
>
>
>
> On Thu, 18 Jul, 2024, 5:55 pm Ahmed Hamdy,  wrote:
>
>> Hi Banu,
>> yes, regarding the RMQSource, it only acknowledges during checkpoint
>> completion, all the messages after the checkpoint till the next checkpoint
>> completion are grouped to be acknowledged together whether that is during
>> the minimum pause or during the start of the next checkpoint. Failure
>> during this periods will have these unacked messages reprocessed again.
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Thu, 18 Jul 2024 at 13:20, banu priya  wrote:
>>
>>> Hi Ahmed,
>>>
>>> Thanks a lot for your reply.
>>>
>>> I am planning keep both window time and check point interval same ie 10s.
>>>
>>> Minimum pause between check point is 5s. What happens to the events that
>>> are received during this time??
>>>
>>> Will it be acknowledged at the end of next checkpoint?
>>>
>>> Thanks
>>> Banu
>>>
>>>
>>> On Thu, 18 Jul, 2024, 5:34 pm Ahmed Hamdy,  wrote:
>>>
 Hi Banu,
 This behavior of source is expected, the guarantee of the RMQSource is
 exactly once which is achieved by acknowledging envelopes on checkpoints
 hence the source would never re-read a message after checkpoint even if it
 was still inside the pipeline and not yet passed to sink, eager
 acknowledgment causes risk of data loss on failure and restoring from a
 previous checkpoint hence breaking all delivery guarantees.
 In concept there is no guarantee that a Flink pipeline achieves end to
 end exactly once without an exactly once sink as well (which is not the
 case for RMQSink).
 In your case, reprocessing is bound by the checkpoint interval which is
 5 minutes, you can make it tighter if it suits your case better.

 Best Regards
 Ahmed Hamdy


 On Thu, 18 Jul 2024 at 11:37, banu priya  wrote:

> Hi All,
>
> Gentle reminder about bow query.
>
> Thanks
> Banu
>
> On Tue, 9 Jul, 2024, 1:42 pm banu priya,  wrote:
>
>> Hi All,
>>
>> I have a Flink job with a RMQ source, tumbling windows (fires for
>> each 2s), an aggregator, then a RMQ sink. Incremental RocksDB 
>> checkpointing
>> is enabled with an interval of 5 minutes.
>>
>> I was trying to understand Flink failure recovery. My checkpoint X is
>> started, I have sent one event to my source. As windows are triggered 
>> every
>> 2s, my sink is updated with the aggregated result. But when I checked the
>> RabbitMQ console, my source queue still had unacked messages. (It is
>> expected and it is as per design
>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
>> ).
>>
>> Now I restarted my task manager, as restart happens within the same
>> checkpoint interval and checkpoint X has not yet completed. The message 
>> is
>> not acknowledged and is sent again. Duplicate processing of events 
>> happens.
>>
>> How to avoid these duplicates?
>>
>>
>> Thanks
>>
>> Banu
>>
>


Re: flink 任务运行抛ClassNotFoundException

2024-07-18 Thread Yanquan Lv
你好,
假设 xxx.shade. 是你用于 shade 的前缀。
grep -rn 'org.apache.hudi.com.xx.xx.xxx.A'  和grep -rn
'xxx.shade.org.apache.hudi.com.xx.xx.xxx.A'  出来的结果一致吗?

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年7月18日周四 20:14写道:

> 您好,感谢您的回复。
> 我理解应该是都做了 shade 处理,我这边用了您的 grep -rn 命令查看了下没问题。而且,这个
> 'org.apache.hudi.com.xx.xx.xxx.A' 在我的任务 jar 里面确实是存在的
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> decq12y...@gmail.com;
> 发送时间:2024年7月18日(星期四) 晚上7:55
> 收件人:"user-zh"
> 主题:Re: flink 任务运行抛ClassNotFoundException
>
>
>
> 你好,这个类被 shade 了,但是调用这个类的其他类可能在不同的 jar 包,没有都被 shade 处理。可以 grep -rn
> 'org.apache.hudi.com.xx.xx.xxx.A' 看看所有调用这个类的包是不是都做了 shade 处理。
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年7月18日周四 18:31写道:
>
>  请问,Flink 任务运行期间 偶尔会抛出 ClassNotFoundException
> 异常,这个一般是什么原因,以及怎么解决呢?信息如下:
>  * 这个类确实存在于 任务Jar 里面
>  * 这个类是经过 shade 后的,因为 Flink 集群里面集成了这个依赖,所以需要将相关的类经过shade 处理
>  * 这个问题偶尔出现,出现后可能会导致任务重启,并且重启后,任务可能恢复正常也可能继续因为这种异常继续失败
>  * 当前集群是 session standalone 方式的
>  * child first / parent first 方式都试过后,还是有这个问题
> 
> 
>  异常栈如下(JM节点):
>  Caused by: java.lang.ClassNotFoundException:
>  org.apache.hudi.com.xx.xx.xxx.A
>  at
> java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>  at
> java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at
> 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:97)
>  at
> 
> org.apache.flink.util.ParentFirstClassLoader.loadClassWithoutExceptionHandling(ParentFirstClassLoader.java:65)
>  at
> 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:81)
>  at
> java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  ... 59 more
> 
> 
> 
>  感谢大家


Re: Event de duplication in flink with rabbitmq connector

2024-07-18 Thread Ahmed Hamdy
Hi Banu,
This behavior of source is expected, the guarantee of the RMQSource is
exactly once which is achieved by acknowledging envelopes on checkpoints
hence the source would never re-read a message after checkpoint even if it
was still inside the pipeline and not yet passed to sink, eager
acknowledgment causes risk of data loss on failure and restoring from a
previous checkpoint hence breaking all delivery guarantees.
In concept there is no guarantee that a Flink pipeline achieves end to end
exactly once without an exactly once sink as well (which is not the case
for RMQSink).
In your case, reprocessing is bound by the checkpoint interval which is 5
minutes, you can make it tighter if it suits your case better.

Best Regards
Ahmed Hamdy


On Thu, 18 Jul 2024 at 11:37, banu priya  wrote:

> Hi All,
>
> Gentle reminder about bow query.
>
> Thanks
> Banu
>
> On Tue, 9 Jul, 2024, 1:42 pm banu priya,  wrote:
>
>> Hi All,
>>
>> I have a Flink job with a RMQ source, tumbling windows (fires for each
>> 2s), an aggregator, then a RMQ sink. Incremental RocksDB checkpointing is
>> enabled with an interval of 5 minutes.
>>
>> I was trying to understand Flink failure recovery. My checkpoint X is
>> started, I have sent one event to my source. As windows are triggered every
>> 2s, my sink is updated with the aggregated result. But when I checked the
>> RabbitMQ console, my source queue still had unacked messages. (It is
>> expected and it is as per design
>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
>> ).
>>
>> Now I restarted my task manager, as restart happens within the same
>> checkpoint interval and checkpoint X has not yet completed. The message is
>> not acknowledged and is sent again. Duplicate processing of events happens.
>>
>> How to avoid these duplicates?
>>
>>
>> Thanks
>>
>> Banu
>>
>


Re: flink 任务运行抛ClassNotFoundException

2024-07-18 Thread Yanquan Lv
你好,这个类被 shade 了,但是调用这个类的其他类可能在不同的 jar 包,没有都被 shade 处理。可以 grep -rn
'org.apache.hudi.com.xx.xx.xxx.A' 看看所有调用这个类的包是不是都做了 shade 处理。

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年7月18日周四 18:31写道:

> 请问,Flink 任务运行期间 偶尔会抛出 ClassNotFoundException 异常,这个一般是什么原因,以及怎么解决呢?信息如下:
> * 这个类确实存在于 任务Jar 里面
> * 这个类是经过 shade 后的,因为 Flink 集群里面集成了这个依赖,所以需要将相关的类经过shade 处理
> * 这个问题偶尔出现,出现后可能会导致任务重启,并且重启后,任务可能恢复正常也可能继续因为这种异常继续失败
> * 当前集群是 session standalone 方式的
> * child first / parent first 方式都试过后,还是有这个问题
>
>
> 异常栈如下(JM节点):
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hudi.com.xx.xx.xxx.A
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:97)
> at
> org.apache.flink.util.ParentFirstClassLoader.loadClassWithoutExceptionHandling(ParentFirstClassLoader.java:65)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:81)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 59 more
>
>
>
> 感谢大家


Read avro files with wildcard

2024-07-18 Thread irakli.keshel...@sony.com
Hi all,

I have a Flink application where I need to read in AVRO files from s3 which are 
partitioned by date and hour. I need to read in multiple dates, meaning I need 
to read files from multiple folders. Does anyone know how I can do this? My 
application is written in Scala using Flink 1.17.1.

Best,
Irakli



Re: Event de duplication in flink with rabbitmq connector

2024-07-18 Thread banu priya
Hi All,

Gentle reminder about bow query.

Thanks
Banu

On Tue, 9 Jul, 2024, 1:42 pm banu priya,  wrote:

> Hi All,
>
> I have a Flink job with a RMQ source, tumbling windows (fires for each
> 2s), an aggregator, then a RMQ sink. Incremental RocksDB checkpointing is
> enabled with an interval of 5 minutes.
>
> I was trying to understand Flink failure recovery. My checkpoint X is
> started, I have sent one event to my source. As windows are triggered every
> 2s, my sink is updated with the aggregated result. But when I checked the
> RabbitMQ console, my source queue still had unacked messages. (It is
> expected and it is as per design
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
> ).
>
> Now I restarted my task manager, as restart happens within the same
> checkpoint interval and checkpoint X has not yet completed. The message is
> not acknowledged and is sent again. Duplicate processing of events happens.
>
> How to avoid these duplicates?
>
>
> Thanks
>
> Banu
>


如何基于FLIP-384扩展对业务数据全链路延时情况的监控

2024-07-18 Thread Yubin Li
Hi, all
  
目前FLIP-384[1]支持了检查点、任务恢复的trace可观测,但实际业务场景中常需要监测每条业务数据在数据链路的各个节点流转过程中的延时情况,请问有什么好的思路吗

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces

Best,
Yubin


Re:回复:回复:使用hive的catalog问题

2024-07-17 Thread Xuyang
Hi, 
我试了下,flink-connector-kafka-3.2.0-1.19.jar需要替换成flink-sql-connector-kafka-3.2.0-1.19.jar
 ,

下载地址在文档[1]里的sql client那一列下面,这个包里面是有OffsetResetStrategy的。




你能用这个包再试一下吗?




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#dependencies




--

Best!
Xuyang





在 2024-07-17 14:22:30,"冯奇"  写道:
>flink1.19,hive3.1.2
>使用新参数创建表
>CREATE TABLE mykafka (name String, age Int) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
>);
>kafka包放了flink-connector-kafka-3.2.0-1.19.jar和flink-connector-base-1.19.0.jar
>Flink SQL> select * from mykafka; [ERROR] Could not execute SQL statement. 
>Reason: java.lang.ClassNotFoundException: 
>org.apache.kafka.clients.consumer.OffsetResetStrategy
>--
>发件人:Feng Jin 
>发送时间:2024年7月16日(星期二) 19:30
>收件人:"user-zh"
>主 题:Re: 回复:使用hive的catalog问题
>上面的示例好像使用的旧版本的 kafka connector 参数。
>参考文档使用新版本的参数:
>https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_catalog/#step-4-create-a-kafka-table-with-flink-sql-ddl
> 
> >
>需要把 kafka 的 connector [1] 也放入到 lib 目录下。
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/
> 
> >
>Best,
>Feng
>On Tue, Jul 16, 2024 at 2:11 PM Xuyang  wrote:
>> lib目录下,需要放置一下flink-sql-connector-hive-3.1.3,这个包是给sql作业用的
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> 在 2024-07-16 13:40:23,"冯奇"  写道:
>> >我看了下文档,几个包都在,还有一个单独下载依赖的包flink-sql-connector-hive-3.1.3,不知道是使用这个还是下面的?
>> >// Flink's Hive connector flink-connector-hive_2.12-1.19.1.jar // Hive
>> dependencies hive-exec-3.1.0.jar libfb303-0.9.3.jar // libfb303 is not
>> packed into hive-exec in some versions, need to add it separately // add
>> antlr-runtime if you need to use hive dialect antlr-runtime-3.5.2.jar
>> >lib下面的包
>> >antlr-runtime-3.5.2.jar flink-table-api-java-1.19.0.jar
>> flink-cdc-dist-3.0.0.jar flink-table-api-java-uber-1.19.0.jar
>> flink-cdc-pipeline-connector-doris-3.1.0.jar flink-table-common-1.19.0.jar
>> flink-cdc-pipeline-connector-mysql-3.1.0.jar
>> flink-table-planner-loader-1.19.0.jar flink-cep-1.19.0.jar
>> flink-table-runtime-1.19.0.jar flink-connector-files-1.19.0.jar
>> hive-exec-3.1.2.jar flink-connector-hive_2.12-1.19.0.jar libfb303-0.9.3.jar
>> flink-connector-jdbc-3.1.2-1.18.jar log4j-1.2-api-2.17.1.jar
>> flink-connector-kafka-3.1.0-1.18.jar log4j-api-2.17.1.jar
>> flink-csv-1.19.0.jar log4j-core-2.17.1.jar flink-dist-1.19.0.jar
>> log4j-slf4j-impl-2.17.1.jar flink-json-1.19.0.jar
>> mysql-connector-java-8.0.28.jar flink-scala_2.12-1.19.0.jar
>> paimon-flink-1.19-0.9-20240628.002224-23.jar
>> >--
>> >发件人:Xuyang 
>> >发送时间:2024年7月16日(星期二) 11:43
>> >收件人:"user-zh"
>> >主 题:Re:使用hive的catalog问题
>> >Hi, 可以check一下是否将hive sql connector的依赖[1]放入lib目录下或者add jar了吗?
>> >[1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
>>  
>> >  >
>> <
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
>>  
>> >  >
>> >
>> >--
>> > Best!
>> > Xuyang
>> >At 2024-07-15 17:09:45, "冯奇"  wrote:
>> >>Flink SQL> USE CATALOG myhive;
>> >>Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
>> >> 'connector.type' = 'kafka',
>> >> 'connector.version' = 'universal',
>> >> 'connector.topic' = 'hive_sink',
>> >> 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
>> >> 'format.type' = 'csv',
>> >> 'update-mode' = 'append'
>> >>);
>> >>提示下面错误:
>> >>[ERROR] Could not execute SQL statement. Reason:
>> >>org.apache.flink.table.factories.NoMatchingTableFactoryException: Could
>> not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> >>the classpath.
>> >>Reason: Required context properties mismatch.
>> >>The matching candidates:
>> >>org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> >>Mismatched properties:
>> >>'connector.type' expects 'filesystem', but is 'kafka'
>> >>The following properties are requested:
>> >>connector.properties.bootstrap.servers=10.0.15.242:9092
>> >>connector.topic=hive_sink
>> >>connector.type=kafka
>> >>connector.version=universal
>> >>format.type=csv
>> >>schema.0.data-type=VARCHAR(2147483647)
>> >>schema.0.name=name
>> >>schema.1.data-type=INT
>> >>schema.1.name=age
>> >>update-mode=append
>> >>The following factories have been 

Re: Buffer Priority

2024-07-17 Thread Enric Ott
Oh,it's designed for unaligned checkpoint.
Thanks,Zhanghao.




--Original--
From:   
 "Zhanghao Chen"

https://issues.apache.org/jira/browse/FLINK-19026for more details where 
it is firstly introduced.
  
 
   Best,
 Zhanghao Chen
 
 
 
 
 From: Enric Ott <243816...@qq.com
 Sent: Wednesday, July 10, 2024 18:02
 To: user 

Unsubscribe

2024-07-17 Thread Phil Stavridis
Unsubscribe


Re:Event-Driven Window Computation

2024-07-17 Thread Xuyang
Hi, 

As far as I know, the community currently has no plans to support custom 
triggers on Flink SQL, 

because it is difficult to describe triggers using SQL. 

You can create a jira[1] for it and restart the discussion in dev maillist.




[1] https://issues.apache.org/jira/projects/FLINK




--

Best!
Xuyang




At 2024-07-17 17:22:01, "liu ze"  wrote:

Hi,

Currently, Flink's windows are based on time (or a fixed number of elements). I 
want to trigger window computation based on specific events (marked within the 
data). In the DataStream API, this can be achieved using GlobalWindow and 
custom triggers, but how can it be done in Flink SQL? Additionally, it is 
necessary to ensure that the upstream and downstream windows process the same 
batch of data.

This is a common requirement. For example, we need to calculate some metrics 
for a user based on their user ID. This data does not have a time attribute 
field, so we can only determine that the data is complete and trigger window 
computation through some specially marked data. The computed results are then 
passed to the downstream via Kafka. The downstream might perform more 
coarse-grained calculations for this user. It is essential to ensure that the 
data is complete and exactly matches the data in the upstream window.

Can Flink SQL achieve this functionality?

Event-Driven Window Computation

2024-07-17 Thread liu ze
Hi,

Currently, Flink's windows are based on time (or a fixed number of
elements). I want to trigger window computation based on specific events
(marked within the data). In the DataStream API, this can be achieved using
GlobalWindow and custom triggers, but how can it be done in Flink SQL?
Additionally, it is necessary to ensure that the upstream and downstream
windows process the same batch of data.

This is a common requirement. For example, we need to calculate some
metrics for a user based on their user ID. This data does not have a time
attribute field, so we can only determine that the data is complete and
trigger window computation through some specially marked data. The computed
results are then passed to the downstream via Kafka. The downstream might
perform more coarse-grained calculations for this user. It is essential to
ensure that the data is complete and exactly matches the data in the
upstream window.

Can Flink SQL achieve this functionality?


RE: Trying to read a file from S3 with flink on kubernetes

2024-07-17 Thread gwenael . lebarzic
Hello everyone.

In fact, the problem was coming from FileSystem.get() :
###
val fs = FileSystem.get(hadoopConfig)
###

When you want to interact with S3, you need to add a first parameter, before 
the hadoop config, to specify the filesystem.
Something like this :
###
val s3uri = URI.create("s3a://mybucket")
val fs = FileSystem.get(s3uri, hadoopConfig)
###

Best regards.
[Logo Orange]

Gwenael Le Barzic


De : LE BARZIC Gwenael DTSI/SI
Envoyé : jeudi 11 juillet 2024 16:24
À : user@flink.apache.org
Objet : Trying to read a file from S3 with flink on kubernetes

Hey guys.

I'm trying to read a file from an internal S3 with flink on Kubernetes, but get 
a strange blocking error.

Here is the code :

MyFlinkJob.scala :
###
package com.example.flink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.io.Source

object MyFlinkJob {
def main(args: Array[String]): Unit = {
try {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val hadoopConfig = new Configuration()
hadoopConfig.set("fs.s3a.access.key", "###")
hadoopConfig.set("fs.s3a.secret.key", "###")
hadoopConfig.set("fs.s3a.endpoint", "internal endpoint")

val fs = FileSystem.get(hadoopConfig)
val s3Path = new Path("s3a://mybucket/myfolder/myfile.txt")
val inputStream = fs.open(s3Path)
val referenceData = 
Source.fromInputStream(inputStream).getLines().toSet
inputStream.close()

println("Reference Data:")
referenceData.foreach(println)

env.execute("Flink S3 Simple Example")
} catch {
case e: Exception =>
e.printStackTrace()
println(s"Error: ${e.getMessage}")
}
}
}

###

And my build.sbt file :
###
import Dependencies._

name := "MyFlinkJob"

version := "0.1"

scalaVersion := "2.12.19"

ThisBuild / scalaVersion := "2.12.19"
ThisBuild / version  := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.18.1",
  "org.apache.flink" %% "flink-streaming-scala" % "1.18.1",
  "org.apache.flink" % "flink-s3-fs-hadoop" % "1.18.1"
)

assembly / assemblyOption ~= {
  _.withIncludeScala(false)
}

assembly / mainClass := Some(s"com.example.flink.MyFlinkJob")

assembly / assemblyJarName := s"myflinkjob_2.12-0.1.jar"

assembly / assemblyMergeStrategy := {
  case path if path.contains("services") => MergeStrategy.concat
  case PathList("META-INF", _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

###

I'm using the following docker image :
###
FROM flink:1.18-scala_2.12

USER root

RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \
cp -p /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar 
/opt/flink/plugins/s3-fs-hadoop/

RUN mkdir -p /opt/flink/log/ /opt/flink/conf /opt/flink/userlib

WORKDIR /opt/flink/userlib
COPY target/scala-2.12/myflinkjob_2.12-0.1.jar myflinkjob.jar

RUN chown -R flink:flink /opt/flink && \
chmod -R 755 /opt/flink

RUN chown -R flink:flink /opt/flink/userlib && \
chmod -R 755 /opt/flink/userlib
###

And the following Kubernetes deployment :
###
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-s3
spec:
  image: flink-s3:0.1
  flinkVersion: v1_18
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
classloader.resolve-order: parent-first
  serviceAccount: flink
  jobManager:
resource:
  memory: 2048m
  cpu: 0.5
  taskManager:
replicas: 2
resource:
  memory: 2048m
  cpu: 0.5
  job:
jarURI: "local:///opt/flink/userlib/myflinkjob.jar"
parallelism: 2
#upgradeMode: stateless  # stateless or savepoint or last-state
entryClass: "com.example.flink.MyFlinkJob"
args: []
  podTemplate:
apiVersion: v1
kind: Pod
metadata:
  name: flink-s3
spec:
  containers:
- name: flink-main-container
  securityContext:
runAsUser:  # UID of a non-root user
runAsNonRoot: true
  env: []
  volumeMounts: []
  volumes: []

###

I launch the flink job like this :
###
kubectl apply -f kubernetes/FlinkDeployment.yml
###

I am using Flink operator on Kubernetes.

And I get this error in the logs :
###
java.lang.IllegalArgumentException: Wrong FS: 
s3a://mybucket/myfolder/myfile.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:807)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:105)
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:774)
at 

flight buffer local storage

2024-07-17 Thread Enric Ott
Hello,Community: Why doesn't flink store in flight buffers to local disks 
when it checkpoints?
 Thanks.

Re: Encountering scala.matchError in Flink 1.18.1 Query

2024-07-17 Thread Norihiro FUKE
Hi Xuyang,

Thank you for the information regarding the bug fix.

I will proceed with the method of joining input_table and udtf first. Thank
you for the suggestion.

Best regards, Norihiro Fuke.

2024年7月15日(月) 10:43 Xuyang :

> Hi, this is a bug fixed in
> https://github.com/apache/flink/pull/25075/files#diff-4ee2dd065d2b45fb64cacd5977bec6126396cc3b56e72addfe434701ac301efeL405
> .
>
>
> You can try to join input_table and udtf first, and then use it as the
> input of window tvf to bypass this bug.
>
>
> --
> Best!
> Xuyang
>
>
> At 2024-07-09 10:35:04, "Norihiro FUKE"  wrote:
>
> Hi, community
>
> I encountered a scala.matchError when trying to obtain the table plan for
> the following query in Flink 1.18.1.
>
> The input data is read from Kafka, and the query is intended to perform a
> typical WordCount operation. The query is as follows. SPLIT_STRING is a
> Table Function UDF that splits sentences into words by spaces.
>
> ```SELECT
> window_start,
> word,
> COUNT(*) AS `count`
> FROM
> TABLE(
>   TUMBLE(TABLE input_table, DESCRIPTOR(proctime), INTERVAL '10' SECOND)),
>   LATERAL TABLE(SPLIT_STRING(sentence)) AS T(word)
> GROUP BY
> window_start,
> window_end,
> word```
>
> The error message received is:
>
> ```
> [ERR] scala.MatchError: rel#177237:FlinkLogicalCorrelate.LOGICAL.any.None: 
> 0.[NONE].[NONE](left=RelSubset#177235,right=RelSubset#177236,correlation=$cor0,joinType=inner,requiredColumns={1})
>  (of class 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate)
> ```
>
> I believe that the issue lies in the existNeighbourWindowTableFunc method
> in flink-table-planner/WindowUtil.scala, where there is an unconsidered
> node (FlinkLogicalCorrelate) when traversing the AST. (This method was
> added in FLINK-32578.) I suspect this comes from the LATERAL entry. While
> this query was FlinkLogicalCorrelate, I think there might be other
> unconsidered nodes as well.
>
> I have two questions regarding this:
>
>1. Is it an expected behavior for scala.matchError to occur in this
>case? In other words, I suspect this might be an unreported bug.
>2. In the code comments of the PR mentioned in the FLINK-32578 ticket,
>I found the terms "standard form" and "relax form." I searched for "relax
>form" in the Flink documentation but could not find any reference. As a
>workaround for this issue, using the WITH clause could be considered, but I
>am uncertain if this is a universal solution.
>
> Thank you for your assistance.
>
>


回复:回复:使用hive的catalog问题

2024-07-17 Thread 冯奇
flink1.19,hive3.1.2
使用新参数创建表
CREATE TABLE mykafka (name String, age Int) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv'
);
kafka包放了flink-connector-kafka-3.2.0-1.19.jar和flink-connector-base-1.19.0.jar
Flink SQL> select * from mykafka; [ERROR] Could not execute SQL statement. 
Reason: java.lang.ClassNotFoundException: 
org.apache.kafka.clients.consumer.OffsetResetStrategy
--
发件人:Feng Jin 
发送时间:2024年7月16日(星期二) 19:30
收件人:"user-zh"
主 题:Re: 回复:使用hive的catalog问题
上面的示例好像使用的旧版本的 kafka connector 参数。
参考文档使用新版本的参数:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_catalog/#step-4-create-a-kafka-table-with-flink-sql-ddl
 

需要把 kafka 的 connector [1] 也放入到 lib 目录下。
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/
 

Best,
Feng
On Tue, Jul 16, 2024 at 2:11 PM Xuyang  wrote:
> lib目录下,需要放置一下flink-sql-connector-hive-3.1.3,这个包是给sql作业用的
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2024-07-16 13:40:23,"冯奇"  写道:
> >我看了下文档,几个包都在,还有一个单独下载依赖的包flink-sql-connector-hive-3.1.3,不知道是使用这个还是下面的?
> >// Flink's Hive connector flink-connector-hive_2.12-1.19.1.jar // Hive
> dependencies hive-exec-3.1.0.jar libfb303-0.9.3.jar // libfb303 is not
> packed into hive-exec in some versions, need to add it separately // add
> antlr-runtime if you need to use hive dialect antlr-runtime-3.5.2.jar
> >lib下面的包
> >antlr-runtime-3.5.2.jar flink-table-api-java-1.19.0.jar
> flink-cdc-dist-3.0.0.jar flink-table-api-java-uber-1.19.0.jar
> flink-cdc-pipeline-connector-doris-3.1.0.jar flink-table-common-1.19.0.jar
> flink-cdc-pipeline-connector-mysql-3.1.0.jar
> flink-table-planner-loader-1.19.0.jar flink-cep-1.19.0.jar
> flink-table-runtime-1.19.0.jar flink-connector-files-1.19.0.jar
> hive-exec-3.1.2.jar flink-connector-hive_2.12-1.19.0.jar libfb303-0.9.3.jar
> flink-connector-jdbc-3.1.2-1.18.jar log4j-1.2-api-2.17.1.jar
> flink-connector-kafka-3.1.0-1.18.jar log4j-api-2.17.1.jar
> flink-csv-1.19.0.jar log4j-core-2.17.1.jar flink-dist-1.19.0.jar
> log4j-slf4j-impl-2.17.1.jar flink-json-1.19.0.jar
> mysql-connector-java-8.0.28.jar flink-scala_2.12-1.19.0.jar
> paimon-flink-1.19-0.9-20240628.002224-23.jar
> >--
> >发件人:Xuyang 
> >发送时间:2024年7月16日(星期二) 11:43
> >收件人:"user-zh"
> >主 题:Re:使用hive的catalog问题
> >Hi, 可以check一下是否将hive sql connector的依赖[1]放入lib目录下或者add jar了吗?
> >[1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
>  
>   >
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
>  
>   >
> >
> >--
> > Best!
> > Xuyang
> >At 2024-07-15 17:09:45, "冯奇"  wrote:
> >>Flink SQL> USE CATALOG myhive;
> >>Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
> >> 'connector.type' = 'kafka',
> >> 'connector.version' = 'universal',
> >> 'connector.topic' = 'hive_sink',
> >> 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
> >> 'format.type' = 'csv',
> >> 'update-mode' = 'append'
> >>);
> >>提示下面错误:
> >>[ERROR] Could not execute SQL statement. Reason:
> >>org.apache.flink.table.factories.NoMatchingTableFactoryException: Could
> not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>the classpath.
> >>Reason: Required context properties mismatch.
> >>The matching candidates:
> >>org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>Mismatched properties:
> >>'connector.type' expects 'filesystem', but is 'kafka'
> >>The following properties are requested:
> >>connector.properties.bootstrap.servers=10.0.15.242:9092
> >>connector.topic=hive_sink
> >>connector.type=kafka
> >>connector.version=universal
> >>format.type=csv
> >>schema.0.data-type=VARCHAR(2147483647)
> >>schema.0.name=name
> >>schema.1.data-type=INT
> >>schema.1.name=age
> >>update-mode=append
> >>The following factories have been considered:
> >>org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>org.apache.flink.table.sources.CsvAppendTableSourceFactory
>


Re: 通过 InputFormatSourceFunction 实现flink 实时读取 ftp 的文件时,获取下一个 split 切片失败,

2024-07-16 Thread YH Zhu
退订

Px New <15701181132mr@gmail.com> 于2024年7月16日周二 22:52写道:

> 通过老的API 也就是 InputFormatSourceFunction、InputFormat
> 实现了一版,但发现第一批文件(任务启动时也已存在的文件)会正常处理,但我新上传文件后,这里一直为空,有解决思路吗?请问
>
> [image: image.png]
> 
> 或者有其他实现 ftp 目录实时读取的实现吗?尽可能满足
> 1. 实时读取 ftp 文件
> 2. 支持持续监测目录及递归子目录与文件3.
> 3. 支持并行读取以及大文件的切分
> 4. 文件种类可能有 json、txt、zip 等,支持读取不同类型文件内的数据
> 5. 支持断点续传以及状态的保存
>
>


通过 InputFormatSourceFunction 实现flink 实时读取 ftp 的文件时,获取下一个 split 切片失败,

2024-07-16 Thread Px New
通过老的API 也就是 InputFormatSourceFunction、InputFormat
实现了一版,但发现第一批文件(任务启动时也已存在的文件)会正常处理,但我新上传文件后,这里一直为空,有解决思路吗?请问

[image: image.png]

或者有其他实现 ftp 目录实时读取的实现吗?尽可能满足
1. 实时读取 ftp 文件
2. 支持持续监测目录及递归子目录与文件3.
3. 支持并行读取以及大文件的切分
4. 文件种类可能有 json、txt、zip 等,支持读取不同类型文件内的数据
5. 支持断点续传以及状态的保存


Re: 回复:使用hive的catalog问题

2024-07-16 Thread Feng Jin
上面的示例好像使用的旧版本的 kafka connector 参数。

参考文档使用新版本的参数:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_catalog/#step-4-create-a-kafka-table-with-flink-sql-ddl
需要把 kafka 的 connector [1] 也放入到 lib 目录下。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/

Best,
Feng

On Tue, Jul 16, 2024 at 2:11 PM Xuyang  wrote:

> lib目录下,需要放置一下flink-sql-connector-hive-3.1.3,这个包是给sql作业用的
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2024-07-16 13:40:23,"冯奇"  写道:
> >我看了下文档,几个包都在,还有一个单独下载依赖的包flink-sql-connector-hive-3.1.3,不知道是使用这个还是下面的?
> >// Flink's Hive connector  flink-connector-hive_2.12-1.19.1.jar  // Hive
> dependencies  hive-exec-3.1.0.jar  libfb303-0.9.3.jar // libfb303 is not
> packed into hive-exec in some versions, need to add it separately  // add
> antlr-runtime if you need to use hive dialect  antlr-runtime-3.5.2.jar
> >lib下面的包
> >antlr-runtime-3.5.2.jar flink-table-api-java-1.19.0.jar
> flink-cdc-dist-3.0.0.jar flink-table-api-java-uber-1.19.0.jar
> flink-cdc-pipeline-connector-doris-3.1.0.jar flink-table-common-1.19.0.jar
> flink-cdc-pipeline-connector-mysql-3.1.0.jar
> flink-table-planner-loader-1.19.0.jar flink-cep-1.19.0.jar
> flink-table-runtime-1.19.0.jar flink-connector-files-1.19.0.jar
> hive-exec-3.1.2.jar flink-connector-hive_2.12-1.19.0.jar libfb303-0.9.3.jar
> flink-connector-jdbc-3.1.2-1.18.jar log4j-1.2-api-2.17.1.jar
> flink-connector-kafka-3.1.0-1.18.jar log4j-api-2.17.1.jar
> flink-csv-1.19.0.jar log4j-core-2.17.1.jar flink-dist-1.19.0.jar
> log4j-slf4j-impl-2.17.1.jar flink-json-1.19.0.jar
> mysql-connector-java-8.0.28.jar flink-scala_2.12-1.19.0.jar
> paimon-flink-1.19-0.9-20240628.002224-23.jar
> >--
> >发件人:Xuyang 
> >发送时间:2024年7月16日(星期二) 11:43
> >收件人:"user-zh"
> >主 题:Re:使用hive的catalog问题
> >Hi, 可以check一下是否将hive sql connector的依赖[1]放入lib目录下或者add jar了吗?
> >[1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
> >
> >--
> > Best!
> > Xuyang
> >At 2024-07-15 17:09:45, "冯奇"  wrote:
> >>Flink SQL> USE CATALOG myhive;
> >>Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
> >> 'connector.type' = 'kafka',
> >> 'connector.version' = 'universal',
> >> 'connector.topic' = 'hive_sink',
> >> 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
> >> 'format.type' = 'csv',
> >> 'update-mode' = 'append'
> >>);
> >>提示下面错误:
> >>[ERROR] Could not execute SQL statement. Reason:
> >>org.apache.flink.table.factories.NoMatchingTableFactoryException: Could
> not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>the classpath.
> >>Reason: Required context properties mismatch.
> >>The matching candidates:
> >>org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>Mismatched properties:
> >>'connector.type' expects 'filesystem', but is 'kafka'
> >>The following properties are requested:
> >>connector.properties.bootstrap.servers=10.0.15.242:9092
> >>connector.topic=hive_sink
> >>connector.type=kafka
> >>connector.version=universal
> >>format.type=csv
> >>schema.0.data-type=VARCHAR(2147483647)
> >>schema.0.name=name
> >>schema.1.data-type=INT
> >>schema.1.name=age
> >>update-mode=append
> >>The following factories have been considered:
> >>org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>org.apache.flink.table.sources.CsvAppendTableSourceFactory
>


Re:回复:使用hive的catalog问题

2024-07-16 Thread Xuyang
lib目录下,需要放置一下flink-sql-connector-hive-3.1.3,这个包是给sql作业用的




--

Best!
Xuyang





在 2024-07-16 13:40:23,"冯奇"  写道:
>我看了下文档,几个包都在,还有一个单独下载依赖的包flink-sql-connector-hive-3.1.3,不知道是使用这个还是下面的?
>// Flink's Hive connector  flink-connector-hive_2.12-1.19.1.jar  // Hive 
>dependencies  hive-exec-3.1.0.jar  libfb303-0.9.3.jar // libfb303 is not 
>packed into hive-exec in some versions, need to add it separately  // add 
>antlr-runtime if you need to use hive dialect  antlr-runtime-3.5.2.jar
>lib下面的包
>antlr-runtime-3.5.2.jar flink-table-api-java-1.19.0.jar 
>flink-cdc-dist-3.0.0.jar flink-table-api-java-uber-1.19.0.jar 
>flink-cdc-pipeline-connector-doris-3.1.0.jar flink-table-common-1.19.0.jar 
>flink-cdc-pipeline-connector-mysql-3.1.0.jar 
>flink-table-planner-loader-1.19.0.jar flink-cep-1.19.0.jar 
>flink-table-runtime-1.19.0.jar flink-connector-files-1.19.0.jar 
>hive-exec-3.1.2.jar flink-connector-hive_2.12-1.19.0.jar libfb303-0.9.3.jar 
>flink-connector-jdbc-3.1.2-1.18.jar log4j-1.2-api-2.17.1.jar 
>flink-connector-kafka-3.1.0-1.18.jar log4j-api-2.17.1.jar flink-csv-1.19.0.jar 
>log4j-core-2.17.1.jar flink-dist-1.19.0.jar log4j-slf4j-impl-2.17.1.jar 
>flink-json-1.19.0.jar mysql-connector-java-8.0.28.jar 
>flink-scala_2.12-1.19.0.jar paimon-flink-1.19-0.9-20240628.002224-23.jar
>--
>发件人:Xuyang 
>发送时间:2024年7月16日(星期二) 11:43
>收件人:"user-zh"
>主 题:Re:使用hive的catalog问题
>Hi, 可以check一下是否将hive sql connector的依赖[1]放入lib目录下或者add jar了吗?
>[1] 
>https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
> 
> >
>--
> Best!
> Xuyang
>At 2024-07-15 17:09:45, "冯奇"  wrote:
>>Flink SQL> USE CATALOG myhive;
>>Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic' = 'hive_sink',
>> 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
>> 'format.type' = 'csv',
>> 'update-mode' = 'append'
>>);
>>提示下面错误:
>>[ERROR] Could not execute SQL statement. Reason:
>>org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not 
>>find a suitable table factory for 
>>'org.apache.flink.table.factories.TableSourceFactory' in
>>the classpath.
>>Reason: Required context properties mismatch.
>>The matching candidates:
>>org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>Mismatched properties:
>>'connector.type' expects 'filesystem', but is 'kafka'
>>The following properties are requested:
>>connector.properties.bootstrap.servers=10.0.15.242:9092
>>connector.topic=hive_sink
>>connector.type=kafka
>>connector.version=universal
>>format.type=csv
>>schema.0.data-type=VARCHAR(2147483647)
>>schema.0.name=name
>>schema.1.data-type=INT
>>schema.1.name=age
>>update-mode=append
>>The following factories have been considered:
>>org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>org.apache.flink.table.sources.CsvAppendTableSourceFactory


回复:使用hive的catalog问题

2024-07-15 Thread 冯奇
我看了下文档,几个包都在,还有一个单独下载依赖的包flink-sql-connector-hive-3.1.3,不知道是使用这个还是下面的?
// Flink's Hive connector  flink-connector-hive_2.12-1.19.1.jar  // Hive 
dependencies  hive-exec-3.1.0.jar  libfb303-0.9.3.jar // libfb303 is not packed 
into hive-exec in some versions, need to add it separately  // add 
antlr-runtime if you need to use hive dialect  antlr-runtime-3.5.2.jar
lib下面的包
antlr-runtime-3.5.2.jar flink-table-api-java-1.19.0.jar 
flink-cdc-dist-3.0.0.jar flink-table-api-java-uber-1.19.0.jar 
flink-cdc-pipeline-connector-doris-3.1.0.jar flink-table-common-1.19.0.jar 
flink-cdc-pipeline-connector-mysql-3.1.0.jar 
flink-table-planner-loader-1.19.0.jar flink-cep-1.19.0.jar 
flink-table-runtime-1.19.0.jar flink-connector-files-1.19.0.jar 
hive-exec-3.1.2.jar flink-connector-hive_2.12-1.19.0.jar libfb303-0.9.3.jar 
flink-connector-jdbc-3.1.2-1.18.jar log4j-1.2-api-2.17.1.jar 
flink-connector-kafka-3.1.0-1.18.jar log4j-api-2.17.1.jar flink-csv-1.19.0.jar 
log4j-core-2.17.1.jar flink-dist-1.19.0.jar log4j-slf4j-impl-2.17.1.jar 
flink-json-1.19.0.jar mysql-connector-java-8.0.28.jar 
flink-scala_2.12-1.19.0.jar paimon-flink-1.19-0.9-20240628.002224-23.jar
--
发件人:Xuyang 
发送时间:2024年7月16日(星期二) 11:43
收件人:"user-zh"
主 题:Re:使用hive的catalog问题
Hi, 可以check一下是否将hive sql connector的依赖[1]放入lib目录下或者add jar了吗?
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/
 

--
 Best!
 Xuyang
At 2024-07-15 17:09:45, "冯奇"  wrote:
>Flink SQL> USE CATALOG myhive;
>Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'hive_sink',
> 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
> 'format.type' = 'csv',
> 'update-mode' = 'append'
>);
>提示下面错误:
>[ERROR] Could not execute SQL statement. Reason:
>org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not 
>find a suitable table factory for 
>'org.apache.flink.table.factories.TableSourceFactory' in
>the classpath.
>Reason: Required context properties mismatch.
>The matching candidates:
>org.apache.flink.table.sources.CsvAppendTableSourceFactory
>Mismatched properties:
>'connector.type' expects 'filesystem', but is 'kafka'
>The following properties are requested:
>connector.properties.bootstrap.servers=10.0.15.242:9092
>connector.topic=hive_sink
>connector.type=kafka
>connector.version=universal
>format.type=csv
>schema.0.data-type=VARCHAR(2147483647)
>schema.0.name=name
>schema.1.data-type=INT
>schema.1.name=age
>update-mode=append
>The following factories have been considered:
>org.apache.flink.table.sources.CsvBatchTableSourceFactory
>org.apache.flink.table.sources.CsvAppendTableSourceFactory


Re:使用hive的catalog问题

2024-07-15 Thread Xuyang
Hi, 可以check一下是否将hive sql connector的依赖[1]放入lib目录下或者add jar了吗?




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/




--

Best!
Xuyang





At 2024-07-15 17:09:45, "冯奇"  wrote:
>Flink SQL> USE CATALOG myhive;
>Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'hive_sink',
> 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
> 'format.type' = 'csv',
> 'update-mode' = 'append'
>);
>提示下面错误:
>[ERROR] Could not execute SQL statement. Reason:
>org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not 
>find a suitable table factory for 
>'org.apache.flink.table.factories.TableSourceFactory' in
>the classpath.
>Reason: Required context properties mismatch.
>The matching candidates:
>org.apache.flink.table.sources.CsvAppendTableSourceFactory
>Mismatched properties:
>'connector.type' expects 'filesystem', but is 'kafka'
>The following properties are requested:
>connector.properties.bootstrap.servers=10.0.15.242:9092
>connector.topic=hive_sink
>connector.type=kafka
>connector.version=universal
>format.type=csv
>schema.0.data-type=VARCHAR(2147483647)
>schema.0.name=name
>schema.1.data-type=INT
>schema.1.name=age
>update-mode=append
>The following factories have been considered:
>org.apache.flink.table.sources.CsvBatchTableSourceFactory
>org.apache.flink.table.sources.CsvAppendTableSourceFactory


RE: Taskslots usage

2024-07-15 Thread Alexandre KY
Hello,


Thank you for you answers,  I now understand Flink's behavior.


Thank you and best regards,

Ky Alexandre


De : Aleksandr Pilipenko 
Envoyé : vendredi 12 juillet 2024 19:42:06
À : Alexandre KY
Cc : user
Objet : Re: Taskslots usage

Hello Alexandre,

Flink does not use TaskSlot per each task by default, but rather task slot will 
hold a slice of the entire pipeline (up to 1 subtasks of each operator, 
depending on the operator parallelism) [1].
So if your job parallelism is 1 - only a single task slot will be occupied.

If you want to modify this behavior and distribute operators between slots - 
you can take a look at slot sharing groups [2].

1 - 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/flink-architecture/#task-slots-and-resources
2 - 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/#set-slot-sharing-group

Kind regards,
Aleksandr

On Fri, 12 Jul 2024 at 17:34, Alexandre KY 
mailto:alexandre...@magellium.fr>> wrote:

Hello,


I am trying to run a pipeline made of 3 tasks (have a look at flink_job.png):

  *   Task 1: Source, FlatMap, Map, keyBy
  *   Task 2: Window, Map, keyBy
  *   Task 3: FlatMap, Map, Sink

>From what I have read, in streaming mode, all the tasks run simultaneously. 
>Therefore, each task take one TaskSlot like 
>here.
> However, as you can see in the picture flink_tm (I run the job on a cluster 
>made of 1 jobmanager and 1 taskmanager), the taskmanager has 3 slots, but only 
>1 of them is being used even though the 3 tasks are running. The first task is 
>still creating more data (supposed to produce 25 outputs) to send to the 2nd 
>one and even when the 3rd task receives data, the number of taskslots used 
>remain 1.


I don't understand why Flink doesn't use all the taskslots which leads it to 
behave similarly to batch mode: it tends to produce all the outputs of Task 1, 
then produces only the outputs of Task 2 and crashes because my computer is out 
of memory since it keeps accumulates the outputs of Task 2 in memory before 
sending them to Task 3 despite setting 
`env.set_runtime_mode(RuntimeExecutionMode.STREAMING)`

I said "tends" because while Task 2 is processing the 25 products, Task 3 
received 2 of them and produced 2 outputs, but after that it stopped (the 
number of records received remained 2) and Flink only runs Task 2 (I see it in 
the logs) until the memory explodes.


To sum it up, I have no idea why Flink doesn't use all the Taskslots available 
despite having more than 1 Task and shouldn't the backpressure stop Task 2 
since it's output buffer is getting full thanks to the backpressure mechanism ? 
Or maybe I should reduce the number of buffers to make the backpressure 
mechanism kick in ?


Thanks in advance and best regards,

Ky Alexandre


使用hive的catalog问题

2024-07-15 Thread 冯奇
Flink SQL> USE CATALOG myhive;
Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.topic' = 'hive_sink',
 'connector.properties.bootstrap.servers' = '10.0.15.242:9092',
 'format.type' = 'csv',
 'update-mode' = 'append'
);
提示下面错误:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not 
find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
The following properties are requested:
connector.properties.bootstrap.servers=10.0.15.242:9092
connector.topic=hive_sink
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=name
schema.1.data-type=INT
schema.1.name=age
update-mode=append
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory


Expose rocksdb options for flush thread.

2024-07-14 Thread Zhongyou Lee
Hellow everyone :

   Up to now, To adjuest rocksdb flush thread  the only way is implement 
ConfigurableRocksDBOptionsFactory #setMaxBackgroundFlushes by user. I found 
FLINK-22059 to solve this problem. The pr has never been executed, i want to 
finish this pr. Can anyone  assignee this pr to me ? My jira id is 
:zhongyou.lee.

benefity from this pr

Improve rocksdb write performance . Such as: multi slot per TaskManagers , 
savepoint recovery.

how to finish those pr

Add state.backend.rocksdb.flush.thread.num in RocksDBConfigurableOptions and 
flink configure document description 

Use  currentOptions.getEnv().setBackgroundThreads() instend of 
setMaxBackgroundFlushes method to change thread.(which support in 5.17 and 
above rocksdb)

tell user above rocksdb 5.17 and below version rocksdb setMaxBackgroundFlushes 
api has problem as below:https://github.com/facebook/rocksdb/issues/4847. 

Re:来自kingdomad的邮件

2024-07-14 Thread 张胜军

R





发自139邮箱



The following is the content of the forwarded email
From:kingdomad 
To:user-zh 
Date:2024-07-15 09:36:43
Subject:来自kingdomad的邮件

(无)


Re:Encountering scala.matchError in Flink 1.18.1 Query

2024-07-14 Thread Xuyang
Hi, this is a bug fixed in 
https://github.com/apache/flink/pull/25075/files#diff-4ee2dd065d2b45fb64cacd5977bec6126396cc3b56e72addfe434701ac301efeL405.




You can try to join input_table and udtf first, and then use it as the input of 
window tvf to bypass this bug.




--

Best!
Xuyang




At 2024-07-09 10:35:04, "Norihiro FUKE"  wrote:

Hi, community

I encountered a scala.matchError when trying to obtain the table plan for the 
following query in Flink 1.18.1.

The input data is read from Kafka, and the query is intended to perform a 
typical WordCount operation. The query is as follows. SPLIT_STRING is a Table 
Function UDF that splits sentences into words by spaces.

```
SELECT
window_start,
word,
COUNT(*) AS `count`
FROM
TABLE(
  TUMBLE(TABLE input_table, DESCRIPTOR(proctime), INTERVAL '10' SECOND)),
  LATERAL TABLE(SPLIT_STRING(sentence)) AS T(word)
GROUP BY
window_start,
window_end,
word```
The error message received is:

```
[ERR] scala.MatchError: rel#177237:FlinkLogicalCorrelate.LOGICAL.any.None: 
0.[NONE].[NONE](left=RelSubset#177235,right=RelSubset#177236,correlation=$cor0,joinType=inner,requiredColumns={1})
 (of class 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate)
```
I believe that the issue lies in the existNeighbourWindowTableFunc method in 
flink-table-planner/WindowUtil.scala, where there is an unconsidered node 
(FlinkLogicalCorrelate) when traversing the AST. (This method was added in 
FLINK-32578.) I suspect this comes from the LATERAL entry. While this query was 
FlinkLogicalCorrelate, I think there might be other unconsidered nodes as well.

I have two questions regarding this:

Is it an expected behavior for scala.matchError to occur in this case? In other 
words, I suspect this might be an unreported bug.
In the code comments of the PR mentioned in the FLINK-32578 ticket, I found the 
terms "standard form" and "relax form." I searched for "relax form" in the 
Flink documentation but could not find any reference. As a workaround for this 
issue, using the WITH clause could be considered, but I am uncertain if this is 
a universal solution.
Thank you for your assistance.


Re:来自kingdomad的邮件

2024-07-14 Thread kingdomad















--

kingdomad







At 2024-07-15 09:36:43, "kingdomad"  wrote:
>


来自kingdomad的邮件

2024-07-14 Thread kingdomad



回复:Flink Standalone-ZK-HA模式下,CLi任务提交

2024-07-13 Thread love_h1...@126.com
猜测是两个JM同时都在向ZK的rest_service_lock节点上写入自身地址,导致Flink客户端的任务有的提交到了一个JM,另一些任务提交到了另一个JM

通过手动修改ZK节点可以复现上述情况。
无法只通过重启ZK完全复现当时的集群, 不清楚上述情况的根本原因,是否有相似BUG出现



 回复的原邮件 
| 发件人 | Zhanghao Chen |
| 日期 | 2024年07月13日 12:41 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: Flink Standalone-ZK-HA模式下,CLi任务提交 |
从日志看,ZK 集群滚动的时候发生了切主,两个 JM 都先后成为过 Leader,但是并没有同时是 Leader。

Best,
Zhanghao Chen

From: love_h1...@126.com 
Sent: Friday, July 12, 2024 17:17
To: user-zh@flink.apache.org 
Subject: Flink Standalone-ZK-HA模式下,CLi任务提交

版本:Flink 1.11.6版本,Standalone HA模式,ZooKeeper 3.5.8版本
操作:
1. 只cancel了所有正在运行的Job,没有Stop Flink集群
2. 滚动重启Zookeeper集群
3. 使用 Flink run 命令提交多个Job
现象:
1. 部分Job提交失败,错误信息为 The rpc invocation size 721919700 exceeds the maximum akka 
framesize.
2. Flink 集群有两个JobManager节点的日志中出现了任务接收和执行的信息
疑问:
1. 使用Flink run 命令提交任务会提交到Flink 集群中的两个JobManager节点么
2. 重启Zookeeper集群会导致Flink集群中出现两个Leader角色的JobManager,这是否是一个特殊场景下的BUG





Re: Buffer Priority

2024-07-12 Thread Zhanghao Chen
Hi Enric,

It basically means the prioritized buffers can bypass all non-prioritized 
buffers at the input gate and get processed first. You may refer to 
https://issues.apache.org/jira/browse/FLINK-19026 for more details where it is 
firstly introduced.

Best,
Zhanghao Chen

From: Enric Ott <243816...@qq.com>
Sent: Wednesday, July 10, 2024 18:02
To: user 
Subject: Buffer Priority

Hello,Community:
  I am puzzled by what the Priority means in Flink Buffer,it explains with 
example(as follows) in Buffer.java,but I still don't get what exactly is "it 
skipped buffers"。Could anyone give me a
intuitive explanation?
  Thanks.



/** Same as EVENT_BUFFER, but the event has been prioritized (e.g. it skipped 
buffers). */
PRIORITIZED_EVENT_BUFFER(false, true, false, true, false, false),


Re: Flink Standalone-ZK-HA模式下,CLi任务提交

2024-07-12 Thread Zhanghao Chen
从日志看,ZK 集群滚动的时候发生了切主,两个 JM 都先后成为过 Leader,但是并没有同时是 Leader。

Best,
Zhanghao Chen

From: love_h1...@126.com 
Sent: Friday, July 12, 2024 17:17
To: user-zh@flink.apache.org 
Subject: Flink Standalone-ZK-HA模式下,CLi任务提交

版本:Flink 1.11.6版本,Standalone HA模式,ZooKeeper 3.5.8版本
操作:
 1. 只cancel了所有正在运行的Job,没有Stop Flink集群
 2. 滚动重启Zookeeper集群
 3. 使用 Flink run 命令提交多个Job
现象:
1. 部分Job提交失败,错误信息为 The rpc invocation size 721919700 exceeds the maximum akka 
framesize.
2. Flink 集群有两个JobManager节点的日志中出现了任务接收和执行的信息
疑问:
1. 使用Flink run 命令提交任务会提交到Flink 集群中的两个JobManager节点么
2. 重启Zookeeper集群会导致Flink集群中出现两个Leader角色的JobManager,这是否是一个特殊场景下的BUG





Re: Taskslots usage

2024-07-12 Thread Aleksandr Pilipenko
Hello Alexandre,

Flink does not use TaskSlot per each task by default, but rather task slot
will hold a slice of the entire pipeline (up to 1 subtasks of each
operator, depending on the operator parallelism) [1].
So if your job parallelism is 1 - only a single task slot will be occupied.

If you want to modify this behavior and distribute operators between slots
- you can take a look at slot sharing groups [2].

1 -
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/flink-architecture/#task-slots-and-resources
2 -
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/#set-slot-sharing-group

Kind regards,
Aleksandr

On Fri, 12 Jul 2024 at 17:34, Alexandre KY 
wrote:

> Hello,
>
>
> I am trying to run a pipeline made of 3 tasks (have a look at
> flink_job.png):
>
>- Task 1: Source, FlatMap, Map, keyBy
>- Task 2: Window, Map, keyBy
>- Task 3: FlatMap, Map, Sink
>
> From what I have read, in streaming mode, all the tasks run
> simultaneously. Therefore, each task take one TaskSlot like here
> .
> However, as you can see in the picture flink_tm (I run the job on a cluster
> made of 1 jobmanager and 1 taskmanager), the taskmanager has 3 slots, but
> only 1 of them is being used even though the 3 tasks are running. The
> first task is still creating more data (supposed to produce 25 outputs) to
> send to the 2nd one and even when the 3rd task receives data, the number of
> taskslots used remain 1.
>
>
> I don't understand why Flink doesn't use all the taskslots which leads it
> to behave similarly to batch mode: it tends to produce all the outputs of
> Task 1, then produces only the outputs of Task 2 and crashes because my
> computer is out of memory since it keeps accumulates the outputs of Task 2
> in memory before sending them to Task 3 despite setting `
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)`
>
> I said "tends" because while Task 2 is processing the 25 products, Task 3
> received 2 of them and produced 2 outputs, but after that it stopped (the
> number of records received remained 2) and Flink only runs Task 2 (I see it
> in the logs) until the memory explodes.
>
>
> To sum it up, I have no idea why Flink doesn't use all the Taskslots
> available despite having more than 1 Task and shouldn't the backpressure
> stop Task 2 since it's output buffer is getting full thanks to the
> backpressure mechanism ? Or maybe I should reduce the number of buffers to
> make the backpressure mechanism kick in ?
>
>
> Thanks in advance and best regards,
>
> Ky Alexandre
>


Re: Taskslots usage

2024-07-12 Thread Saurabh Singh
Hi Ky Alexandre,

I would recommend reading this section which explains slot sharing b/w
tasks. Link



Quote - By default, Flink allows subtasks to share slots even if they are
> subtasks of different tasks, so long as they are from the same job. The
> result is that one slot may hold an entire pipeline of the job.


For your use case, I would suggest either of the two options:

   1. Set different parallelism for different tasks. . This shall ensure you
   will utilize all 3 task slots.
   2. Use Fine-Grained Resource Management Techniques on the job. Link
   


Hope this helps.

Regards
Saurabh




On Fri, Jul 12, 2024 at 8:47 PM Alexandre KY 
wrote:

> Hello,
>
>
> I am trying to run a pipeline made of 3 tasks (have a look at
> flink_job.png):
>
>- Task 1: Source, FlatMap, Map, keyBy
>- Task 2: Window, Map, keyBy
>- Task 3: FlatMap, Map, Sink
>
> From what I have read, in streaming mode, all the tasks run
> simultaneously. Therefore, each task take one TaskSlot like here
> .
> However, as you can see in the picture flink_tm (I run the job on a cluster
> made of 1 jobmanager and 1 taskmanager), the taskmanager has 3 slots, but
> only 1 of them is being used even though the 3 tasks are running. The
> first task is still creating more data (supposed to produce 25 outputs) to
> send to the 2nd one and even when the 3rd task receives data, the number of
> taskslots used remain 1.
>
>
> I don't understand why Flink doesn't use all the taskslots which leads it
> to behave similarly to batch mode: it tends to produce all the outputs of
> Task 1, then produces only the outputs of Task 2 and crashes because my
> computer is out of memory since it keeps accumulates the outputs of Task 2
> in memory before sending them to Task 3 despite setting `
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)`
>
> I said "tends" because while Task 2 is processing the 25 products, Task 3
> received 2 of them and produced 2 outputs, but after that it stopped (the
> number of records received remained 2) and Flink only runs Task 2 (I see it
> in the logs) until the memory explodes.
>
>
> To sum it up, I have no idea why Flink doesn't use all the Taskslots
> available despite having more than 1 Task and shouldn't the backpressure
> stop Task 2 since it's output buffer is getting full thanks to the
> backpressure mechanism ? Or maybe I should reduce the number of buffers to
> make the backpressure mechanism kick in ?
>
>
> Thanks in advance and best regards,
>
> Ky Alexandre
>


Re: Flink reactive deployment on with kubernetes operator

2024-07-11 Thread Enric Ott
Thanks,nacisimsek.I will try your suggestion.




--Original--
From:   
 "nacisimsek"   
 
http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.



apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
 name: basic-reactive-example
spec:
 image: flink:1.17
 flinkVersion: v1_17
 flinkConfiguration:
  scheduler-mode: REACTIVE
  taskmanager.numberOfTaskSlots: "2"
  state.savepoints.dir: file:///flink-data/savepoints
  state.checkpoints.dir: file:///flink-data/checkpoints
  high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
  high-availability.storageDir: file:///flink-data/ha
 serviceAccount: flink
 jobManager:
  resource:
   memory: "2048m"
   cpu: 1
 taskManager:
  resource:
   memory: "2048m"
   cpu: 1
 podTemplate:
  spec:
   containers:
- name: flink-main-container
 volumeMounts:
 - mountPath: /flink-data
  name: flink-volume
   volumes:
   - name: flink-volume
hostPath:
 # directory location on host
 path: 
/run/desktop/mnt/host/c/Users/24381/Documents/
 # this field is optional
 type: DirectoryOrCreate
 job:
  jarURI: 
local:///opt/flink/examples/streaming/StateMachineExample.jar
  parallelism: 2
  upgradeMode: savepoint
  state: running
  savepointTriggerNonce: 0
 mode: standalone





ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint   
 [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: The 
base directory of the JobResultStore isn't accessible. No dirty JobResults can 
be restored.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: The base directory of the 
JobResultStore isn't accessible. No dirty JobResults can be restored.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:199)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
 ~[flink-dist-1.17.1.jar:1.17.1]
... 4 more

?????? Flink reactive deployment on with kubernetes operator

2024-07-11 Thread Enric Ott
Thanks,Gyula.I agree with you on Autoscaler,and I will try the latest Flink 
Operator version.




----
??: 
   "Gyula F??ra"



flink-runtime:1.14.6????????????

2024-07-11 Thread ??????
flinkflink-runtime:1.14.6

2024-07-10 16:48:09.700 WARN [XNIO-1 
task-8-SendThread(102.195.8.107:2181)] 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [1164] 
Session 0x0 for server 102.195.8.107/102.195.8.107:2181, unexpected error, 
closing socket connection and attempting reconnect
java.lang.StackOverflowError: null
at 
org.apache.flink.runtime.security.DynamicConfiguration.getAppConfigurationEntry(DynamicConfiguration.java:79)
 ~[flink-runtime-1.14.6.jar:1.14.6]
at 
org.apache.flink.runtime.security.DynamicConfiguration.getAppConfigurationEntry(DynamicConfiguration.java:79)
 ~[flink-runtime-1.14.6.jar:1.14.6]
at 
org.apache.flink.runtime.security.DynamicConfiguration.getAppConfigurationEntry(DynamicConfiguration.java:79)
 ~[flink-runtime-1.14.6.jar:1.14.6]
at 
org.apache.flink.runtime.security.DynamicConfiguration.getAppConfigurationEntry(DynamicConfiguration.java:79)
 ~[flink-runtime-1.14.6.jar:1.14.6]

??fastjson??:-Dfastjson.parser.safeMode=true 


Re: Can we use custom serialization/deserialization for kafka sources and sinks through the table api?

2024-07-11 Thread Kevin Lam via user
Hi Gabriel,

You could consider overriding the value.serializer

 and value.deserializer

 (and similar for key) in the consumer and producer configuration that
Flink sets, using the `properties.*` option in the Kafka Connector.

Your serializer
and deserializer will have access to the headers, and can perform your
integrity checks, and can otherwise pass the byte[] around so the formats'
logic continues to handle SerDes otherwise.

Flink uses the ByteArray(De|S)erializers by default in its source

 and sink
.
It's currently not possible to override the source serializer, but it's a
work in progress via https://issues.apache.org/jira/browse/FLINK-35808
. Hoping to have it
merged soon.

Alternatively, you can wait for first-class header support in Flink table
Formats. There's some ongoing discussion and work via FLIP-454

 and this mailing list discussion
.

On Thu, Jul 11, 2024 at 2:13 PM Gabriel Giussi 
wrote:

> Reading from a kafka topic with custom serialization/deserialization can
> be done using a KafkaSource configured with an implementation
> of KafkaRecordDeserializationSchema, which has access even to kafka headers
> which are used in my case for checking message integrity.
> How can we do the same but using the table API where you can just
> configure the value.format with a string to a predefined set of formats?
>
> Thanks.
>


Can we use custom serialization/deserialization for kafka sources and sinks through the table api?

2024-07-11 Thread Gabriel Giussi
Reading from a kafka topic with custom serialization/deserialization can be
done using a KafkaSource configured with an implementation
of KafkaRecordDeserializationSchema, which has access even to kafka headers
which are used in my case for checking message integrity.
How can we do the same but using the table API where you can just configure
the value.format with a string to a predefined set of formats?

Thanks.


Trying to read a file from S3 with flink on kubernetes

2024-07-11 Thread gwenael . lebarzic
Hey guys.

I'm trying to read a file from an internal S3 with flink on Kubernetes, but get 
a strange blocking error.

Here is the code :

MyFlinkJob.scala :
###
package com.example.flink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.io.Source

object MyFlinkJob {
def main(args: Array[String]): Unit = {
try {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val hadoopConfig = new Configuration()
hadoopConfig.set("fs.s3a.access.key", "###")
hadoopConfig.set("fs.s3a.secret.key", "###")
hadoopConfig.set("fs.s3a.endpoint", "internal endpoint")

val fs = FileSystem.get(hadoopConfig)
val s3Path = new Path("s3a://mybucket/myfolder/myfile.txt")
val inputStream = fs.open(s3Path)
val referenceData = 
Source.fromInputStream(inputStream).getLines().toSet
inputStream.close()

println("Reference Data:")
referenceData.foreach(println)

env.execute("Flink S3 Simple Example")
} catch {
case e: Exception =>
e.printStackTrace()
println(s"Error: ${e.getMessage}")
}
}
}

###

And my build.sbt file :
###
import Dependencies._

name := "MyFlinkJob"

version := "0.1"

scalaVersion := "2.12.19"

ThisBuild / scalaVersion := "2.12.19"
ThisBuild / version  := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.18.1",
  "org.apache.flink" %% "flink-streaming-scala" % "1.18.1",
  "org.apache.flink" % "flink-s3-fs-hadoop" % "1.18.1"
)

assembly / assemblyOption ~= {
  _.withIncludeScala(false)
}

assembly / mainClass := Some(s"com.example.flink.MyFlinkJob")

assembly / assemblyJarName := s"myflinkjob_2.12-0.1.jar"

assembly / assemblyMergeStrategy := {
  case path if path.contains("services") => MergeStrategy.concat
  case PathList("META-INF", _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

###

I'm using the following docker image :
###
FROM flink:1.18-scala_2.12

USER root

RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \
cp -p /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar 
/opt/flink/plugins/s3-fs-hadoop/

RUN mkdir -p /opt/flink/log/ /opt/flink/conf /opt/flink/userlib

WORKDIR /opt/flink/userlib
COPY target/scala-2.12/myflinkjob_2.12-0.1.jar myflinkjob.jar

RUN chown -R flink:flink /opt/flink && \
chmod -R 755 /opt/flink

RUN chown -R flink:flink /opt/flink/userlib && \
chmod -R 755 /opt/flink/userlib
###

And the following Kubernetes deployment :
###
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-s3
spec:
  image: flink-s3:0.1
  flinkVersion: v1_18
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
classloader.resolve-order: parent-first
  serviceAccount: flink
  jobManager:
resource:
  memory: 2048m
  cpu: 0.5
  taskManager:
replicas: 2
resource:
  memory: 2048m
  cpu: 0.5
  job:
jarURI: "local:///opt/flink/userlib/myflinkjob.jar"
parallelism: 2
#upgradeMode: stateless  # stateless or savepoint or last-state
entryClass: "com.example.flink.MyFlinkJob"
args: []
  podTemplate:
apiVersion: v1
kind: Pod
metadata:
  name: flink-s3
spec:
  containers:
- name: flink-main-container
  securityContext:
runAsUser:  # UID of a non-root user
runAsNonRoot: true
  env: []
  volumeMounts: []
  volumes: []

###

I launch the flink job like this :
###
kubectl apply -f kubernetes/FlinkDeployment.yml
###

I am using Flink operator on Kubernetes.

And I get this error in the logs :
###
java.lang.IllegalArgumentException: Wrong FS: 
s3a://mybucket/myfolder/myfile.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:807)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:105)
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:774)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:160)
at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
at com.orange.flink.MyFlinkJob$.main(MyFlinkJob.scala:28)
at 

Re: Flink reactive deployment on with kubernetes operator

2024-07-11 Thread nacisimsek
Hi Enric,

You can try using persistent volume claim on your kubernetes cluster as a 
JobResultStore, instead of using a local path from your underlying host, and 
see if it works.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-data-pvc
spec:
  resources:
requests:
  storage: 10Gi
  volumeMode: Filesystem
  accessModes:
- ReadWriteOnce


And edit your yaml 
(spec.podTemplate.spec.volumes.persistentVolumeClaim.claimName) to use this PVC:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-reactive-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
scheduler-mode: REACTIVE
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
  serviceAccount: flink
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  podTemplate:
spec:
  containers:
- name: flink-main-container
  volumeMounts:
  - mountPath: /flink-data
name: flink-volume
  volumes:
  - name: flink-volume
persistentVolumeClaim:
  claimName: flink-data-pvc
  job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
state: running
savepointTriggerNonce: 0
  mode: standalone


Naci

> On 11. Jul 2024, at 05:40, Enric Ott <243816...@qq.com> wrote:
> 
> Hi,Community:
>   I hava encountered a problem when deploy reactive flink scheduler on 
> kubernetes with flink kubernetes operator 1.6.0,the manifest and exception 
> stack info listed as follows.
> Any clues would be appreciated.
> 
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> 
> 
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: basic-reactive-example
> spec:
>   image: flink:1.17
>   flinkVersion: v1_17
>   flinkConfiguration:
> scheduler-mode: REACTIVE
> taskmanager.numberOfTaskSlots: "2"
> state.savepoints.dir: file:///flink-data/savepoints
> state.checkpoints.dir: file:///flink-data/checkpoints
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: file:///flink-data/ha
>   serviceAccount: flink
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   podTemplate:
> spec:
>   containers:
> - name: flink-main-container
>   volumeMounts:
>   - mountPath: /flink-data
> name: flink-volume
>   volumes:
>   - name: flink-volume
> hostPath:
>   # directory location on host
>   path: /run/desktop/mnt/host/c/Users/24381/Documents/
>   # this field is optional
>   type: DirectoryOrCreate
>   job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: savepoint
> state: running
> savepointTriggerNonce: 0
>   mode: standalone
> 
> 
> ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal 
> error occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> The base directory of the JobResultStore isn't accessible. No dirty 
> JobResults can be restored.
>   at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
>   at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) [?:?]
>   at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source) [?:?]
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> [?:?]
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 

Kubernetes HA checkpoint not retained on termination

2024-07-11 Thread Clemens Valiente
hi, I have a problem that Flink deletes checkpoint information on
kubernetes HA setup even if
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
is set.
config documentation:
"RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is
cancelled or fails.
but in the kubernetes implementation:
https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java#L82-L84
LOG.info("Shutting down.");
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Removing counter from ConfigMap {}", configMapName);
on termination, it removes the counter from the configmap that seems to be
important to recover checkpoints.
Is this correct behaviour? (Flink 1.17.2 in our case). The jobmanager pod
is shut down via pod eviction, with a terminationGracePeriodSeconds of 30s.


2024-07-11 08:18:45,866 INFO
 org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool []
- Releasing slot [3c92e4ee1303da214e868fcef5d4ad91].
2024-07-11 08:19:08,335 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
SIGNAL 15: SIGTERM. Shutting down as requested.
2024-07-11 08:19:08,336 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
StandaloneApplicationClusterEntryPoint down with application status
UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2024-07-11 08:19:08,922 INFO
 org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter
[] - Shutting down.
2024-07-11 08:19:08,922 INFO
 org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter
[] - Removing counter from ConfigMap
flink-fuzzy-wifibissid-counter-pax-cluster-da512a1587e13e9ef844938ede5adab8-config-map

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law



回复:Flink在HA模式,重启ZK集群,客户端任务提交异常

2024-07-11 Thread wjw_bigd...@163.com
退订



 回复的原邮件 
| 发件人 | love_h1...@126.com |
| 日期 | 2024年07月11日 16:10 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Flink在HA模式,重启ZK集群,客户端任务提交异常 |
问题现象:
Flink 1.11.6版本,Standalone HA模式, 滚动重启了ZK集群;在Flink集群的一个节点上使用flink run 命令提交多个任务;
部分任务提交失败,异常信息如下:
[Flink-DispatcherRestEndpoint-thread-2] - [WARN ] - 
[org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(line:290)]
 - Could not create remote rpc invocation message. Failing rpc invocation 
because...
java.io.IOException: The rpc invocation size 12532388 exceeds the maximum akka 
framesize.


日志信息:
集群中A点的JobManager日志有获得主角色的日志信息
17:19:45,433 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.ResourceManager.tryAcceptLeadership(line:1118)]
 - ResourceManager 
akka.tcp://flink@10.10.160.57:46746/user/rpc/resourcemanager_0 was granted 
leadership with fencing token ad84d46e902e0cf6da92179447af4e00
17:19:45,434 - [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.grantLeadership(line:931)]
 - http://XXX:XXX was granted leadership with 
leaderSessionID=f60df688-372d-416b-a965-989a59b37feb
17:19:45,437 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.start(line:287)]
 - Starting the SlotManager.
17:19:45,480 - [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)]
 - Start SessionDispatcherLeaderProcess.XXX
17:19:45,489 - [cluster-io-thread-1] - [INFO ] - 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(line:232)] - 
Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
17:19:45,495 - [flink-akka.actor.default-dispatcher-23] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.ResourceManager.registerTaskExecutorInternal(line:891)]
 - Registering TaskManager with ResourceID XX 
(akka.tcp://flink@X:XX/user/rpc/taskmanager_0) at ResourceManager

Flink集群中有两个节点(A和B)接收到了Job提交请求,两个节点的日志中均有如下信息
[flink-akka.actor.default-dispatcher-33] - [INFO ] - 
[org.apache.flink.runtime.jobmaster.JobMaster.connectToResourceManager(line:1107)]
 - Connecting to ResourceManager 
akka.tcp://flink@X.X.X.X:46746/user/rpc/resourcemanager_0(ad84d46e902e0cf6da92179447af4e00)
集群中有4个JobManager节点日志出现了 Start SessionDispatcherLeaderProcess日志,但几乎都跟随了Stopping 
SessionDispatcherLeaderProcess日志,但(A和B)点没有Stopping 
SessionDispatcherLeaderProcess信息
[main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)]
 - Start SessionDispatcherLeaderProcess.
[Curator-ConnectionStateManager-0] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.closeInternal(line:134)]
 - Stopping SessionDispatcherLeaderProcess.






Flink在HA模式,重启ZK集群,客户端任务提交异常

2024-07-11 Thread love_h1...@126.com
问题现象:
Flink 1.11.6版本,Standalone HA模式, 滚动重启了ZK集群;在Flink集群的一个节点上使用flink run 命令提交多个任务;
部分任务提交失败,异常信息如下: 
[Flink-DispatcherRestEndpoint-thread-2] - [WARN ] - 
[org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(line:290)]
 - Could not create remote rpc invocation message. Failing rpc invocation 
because...
java.io.IOException: The rpc invocation size 12532388 exceeds the maximum akka 
framesize.


日志信息:
集群中A点的JobManager日志有获得主角色的日志信息
17:19:45,433 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.ResourceManager.tryAcceptLeadership(line:1118)]
 - ResourceManager 
akka.tcp://flink@10.10.160.57:46746/user/rpc/resourcemanager_0 was granted 
leadership with fencing token ad84d46e902e0cf6da92179447af4e00
17:19:45,434 - [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.grantLeadership(line:931)]
 - http://XXX:XXX was granted leadership with 
leaderSessionID=f60df688-372d-416b-a965-989a59b37feb
17:19:45,437 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.start(line:287)]
 - Starting the SlotManager.
17:19:45,480 - [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)]
 - Start SessionDispatcherLeaderProcess.XXX
17:19:45,489 - [cluster-io-thread-1] - [INFO ] - 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(line:232)] - 
Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
17:19:45,495 - [flink-akka.actor.default-dispatcher-23] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.ResourceManager.registerTaskExecutorInternal(line:891)]
 - Registering TaskManager with ResourceID XX 
(akka.tcp://flink@X:XX/user/rpc/taskmanager_0) at ResourceManager

Flink集群中有两个节点(A和B)接收到了Job提交请求,两个节点的日志中均有如下信息
[flink-akka.actor.default-dispatcher-33] - [INFO ] - 
[org.apache.flink.runtime.jobmaster.JobMaster.connectToResourceManager(line:1107)]
 - Connecting to ResourceManager 
akka.tcp://flink@X.X.X.X:46746/user/rpc/resourcemanager_0(ad84d46e902e0cf6da92179447af4e00)
集群中有4个JobManager节点日志出现了 Start SessionDispatcherLeaderProcess日志,但几乎都跟随了Stopping 
SessionDispatcherLeaderProcess日志,但(A和B)点没有Stopping 
SessionDispatcherLeaderProcess信息
 [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)]
 - Start SessionDispatcherLeaderProcess.
 [Curator-ConnectionStateManager-0] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.closeInternal(line:134)]
 - Stopping SessionDispatcherLeaderProcess.






Re: Flink reactive deployment on with kubernetes operator

2024-07-11 Thread Gyula Fóra
Hi Eric!
The community cannot support old versions of the Flink operator, please
upgrade to the latest version (1.9.0)

Also, we do not recommend using the Reactive mode (with standalone). You
should instead try Native Mode + Autoscaler which works much better in most
cases.

Cheers,
Gyula


Flink reactive deployment on with kubernetes operator

2024-07-10 Thread Enric Ott
Hi,Community:
 I hava encountered a problemwhen deploy reactive flink scheduler 
on kubernetes with flink kubernetes operator 1.6.0,the manifest and exception 
stack info listed as follows.
Any clues would be appreciated.



# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.



apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
 name: basic-reactive-example
spec:
 image: flink:1.17
 flinkVersion: v1_17
 flinkConfiguration:
  scheduler-mode: REACTIVE
  taskmanager.numberOfTaskSlots: "2"
  state.savepoints.dir: file:///flink-data/savepoints
  state.checkpoints.dir: file:///flink-data/checkpoints
  high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
  high-availability.storageDir: file:///flink-data/ha
 serviceAccount: flink
 jobManager:
  resource:
   memory: "2048m"
   cpu: 1
 taskManager:
  resource:
   memory: "2048m"
   cpu: 1
 podTemplate:
  spec:
   containers:
- name: flink-main-container
 volumeMounts:
 - mountPath: /flink-data
  name: flink-volume
   volumes:
   - name: flink-volume
hostPath:
 # directory location on host
 path: 
/run/desktop/mnt/host/c/Users/24381/Documents/
 # this field is optional
 type: DirectoryOrCreate
 job:
  jarURI: 
local:///opt/flink/examples/streaming/StateMachineExample.jar
  parallelism: 2
  upgradeMode: savepoint
  state: running
  savepointTriggerNonce: 0
 mode: standalone





ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint   
 [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: The 
base directory of the JobResultStore isn't accessible. No dirty JobResults can 
be restored.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: The base directory of the 
JobResultStore isn't accessible. No dirty JobResults can be restored.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:199)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
 ~[flink-dist-1.17.1.jar:1.17.1]
... 4 more

RE: Flink Serialisation

2024-07-10 Thread Alexandre KY
After taking a closer look to the logs, I found out it was a 
`java.lang.OutOfMemoryError: Java heap space` error which confirms what I 
thought: the serialized object is too big. Here is the solution to increase the 
JVM heap: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_trouble/
 (I use Flink 1.18.1 btw as seen in the link).


But I still don't know if it's good practice to manipulate big objects (mine is 
around hundred mb) or is it better to write the output on a 
database/filesystem, serialize a message giving the output address to next 
operator which will then pull the data.


De : Alexandre KY 
Envoyé : mercredi 10 juillet 2024 12:32:03
À : user
Objet : Flink Serialisation


Hello,

I was wondering if Flink has a size limit to serialize data. I have an object 
that stores a big 2D array and when I try to hand it over the next operator, I 
have the following error:

```
2024-07-10 10:14:51,983 ERROR 
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: 
Thread 'grpc-default-executor-1' produced an uncaught exception. If you want to 
fail on uncaught exceptions, then configure cluster.uncaught-exception-handling 
accordingly
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3745) ~[?:?]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[?:?]
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) 
~[?:?]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) 
~[?:?]
at 
org.apache.beam.sdk.util.StreamUtils.getBytesWithoutClosing(StreamUtils.java:64)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:101) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:819)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:813)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:737)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:68)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:144)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:130)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 

Flink Serialisation

2024-07-10 Thread Alexandre KY
Hello,

I was wondering if Flink has a size limit to serialize data. I have an object 
that stores a big 2D array and when I try to hand it over the next operator, I 
have the following error:

```
2024-07-10 10:14:51,983 ERROR 
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: 
Thread 'grpc-default-executor-1' produced an uncaught exception. If you want to 
fail on uncaught exceptions, then configure cluster.uncaught-exception-handling 
accordingly
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3745) ~[?:?]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[?:?]
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) 
~[?:?]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) 
~[?:?]
at 
org.apache.beam.sdk.util.StreamUtils.getBytesWithoutClosing(StreamUtils.java:64)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:101) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:819)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:813)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:737)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:68)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:144)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:130)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at 

Buffer Priority

2024-07-10 Thread Enric Ott
Hello,Community:
 I am puzzled by what the Priority means in Flink Buffer,it explains with 
example(as follows) in Buffer.java,but I still don't get what exactly is "it 
skipped buffers"??Could anyone give me a

intuitive explanation?
 Thanks.




/** Same as EVENT_BUFFER, but the event has been prioritized (e.g. it skipped 
buffers). */
PRIORITIZED_EVENT_BUFFER(false, true, false, true, false, false),

Re:Re:Flink LAG-Function doesn't work as expected

2024-07-10 Thread Xuyang
Sorry, I mean "could not".




--

Best!
Xuyang




在 2024-07-10 15:21:48,"Xuyang"  写道:

Hi, which Flink version does you use? I could re-produce this bug in master. My 
test sql is below:




```




CREATE TABLE UNITS_DATA(

  proctime AS PROCTIME()

, `IDENT` INT

, `STEPS_ID` INT

, `ORDERS_ID` INT

) WITH (

  'connector' = 'datagen',

  'fields.STEPS_ID.min' = '64911',

  'fields.STEPS_ID.max' = '64912',

  'rows-per-second' = '1'

);






WITH current_and_previous as (

SELECT  proctime

, ud.STEPS_ID as STEPS_ID

, ud.IDENT as UNITS_DATA_ID

, ud.ORDERS_ID as ORDERS_ID

, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID ORDER BY proctime) 
PREV_ORDERS_ID

FROM UNITS_DATA ud

WHERE STEPS_ID=64911

)



select *

from current_and_previous;






```

The result is as expected:

```

++-+-+---+-++

| op |proctime |STEPS_ID | UNITS_DATA_ID |   ORDERS_ID | 
PREV_ORDERS_ID |

++-+-+---+-++

| +I | 2024-07-10 15:19:17.661 |   64911 |-772065898 |   342255977 |
  |

| +I | 2024-07-10 15:19:20.651 |   64911 |1165938984 |   162006411 |
  342255977 |

| +I | 2024-07-10 15:19:22.614 |   64911 |   -1000903042 | -2059780314 |
  162006411 |

```










--

Best!
Xuyang




在 2024-07-09 14:22:08,"Brandl, Johann"  写道:

Hi everyone,

i’m new to flink and tried some queries with flink sql.

 

Currently I have a problem with the LAG function. I want to emit a new record 
when the ORDERS_ID changes. To do this, I use the LAG function to detect 
whether this has changed.

However, I noticed that every now and then I cannot access the ORDERS_ID of the 
previous message. It seems to have to do with the proctime I use in the ORDER 
BY with LAG. As soon as the proctime changes in the range of seconds, I cannot 
access the last value and gives me NULL. Do any of you know what this could be?

Here is the query I use:

 

CREATE TABLE UNITS_DATA(

  proctime AS PROCTIME()

, `IDENT` DOUBLE

, `STEPS_ID` DOUBLE

, `ORDERS_ID` DOUBLE

) WITH (

  'connector' = 'kafka',

  'topic' = 'UNITS_DATA',

  'properties.bootstrap.servers' = 'http://myserver:9094',

  'scan.startup.mode' = 'latest-offset',

  'format' = 'avro-confluent',

  'avro-confluent.url' = 'http://myserver:8080/apis/ccompat/v6/'

);

 

WITH current_and_previous as (

SELECTproctime

, ud.STEPS_ID as STEPS_ID

, ud.IDENT as UNITS_DATA_ID

, ud.ORDERS_ID as ORDERS_ID

, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID 
ORDER BY proctime) PREV_ORDERS_ID

FROMUNITS_DATA ud

WHERE STEPS_ID=64911

)

select *

from current_and_previous;

 

Thanks in advance and best regards

 



Zollner Elektronik AG
Manfred-Zollner-Str. 1, 93499 Zandt, GERMANY / Phone: +49 9944 201-0 / Fax: +49 
9944 201-1314 / i...@zollner.de / www.zollner.de
Registered office of the company: Zandt / Registration court Regensburg HRB 8354

Managing Board: Ludwig Zollner (Spokesman of the Board) / Manfred Zollner jun. 
/ Christian Zollner / Thomas Schreiner / Markus Aschenbrenner
Supervisory Board: Manfred Zollner sen. (Chairman)


Wichtiger Hinweis: Diese E-Mail enthält vertrauliche Informationen. Wenn Sie 
diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den 
Absender und vernichten Sie diese E-Mail. Die Anfertigung unerlaubter Kopien, 
die Weitergabe der Information sowie die unbefugte Weitersendung der E-Mail ist 
nicht gestattet. Weiterführend können Hinweise zur Verarbeitung Ihrer 
personenbezogenen Daten (nach Art. 13 und 14 Datenschutzgrundverordnung - 
DSGVO) unter dem folgenden Link abgerufen werden: 
https://www.zollner.de/datenschutz-verarbeitungsverzeichnis-extern

Important note: This e-mail contains confidential information. If you received 
this e-mail by mistake, please inform the sender immediately and destroy this 
e-mail. The making of unauthorized copies, the passing on of the information 
and the unauthorized forwarding of the e-mail is not permitted. Furhermore 
information about the processing of your personal Information (according to 
art. 13 and 14 general data protection regulation - GDPR) can be retrieved on 
the page: 
https://www.zollner-electronics.com/en/data-protection-processing-directory-external
  

Re:Flink LAG-Function doesn't work as expected

2024-07-10 Thread Xuyang
Hi, which Flink version does you use? I could re-produce this bug in master. My 
test sql is below:




```




CREATE TABLE UNITS_DATA(

  proctime AS PROCTIME()

, `IDENT` INT

, `STEPS_ID` INT

, `ORDERS_ID` INT

) WITH (

  'connector' = 'datagen',

  'fields.STEPS_ID.min' = '64911',

  'fields.STEPS_ID.max' = '64912',

  'rows-per-second' = '1'

);






WITH current_and_previous as (

SELECT  proctime

, ud.STEPS_ID as STEPS_ID

, ud.IDENT as UNITS_DATA_ID

, ud.ORDERS_ID as ORDERS_ID

, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID ORDER BY proctime) 
PREV_ORDERS_ID

FROM UNITS_DATA ud

WHERE STEPS_ID=64911

)



select *

from current_and_previous;






```

The result is as expected:

```

++-+-+---+-++

| op |proctime |STEPS_ID | UNITS_DATA_ID |   ORDERS_ID | 
PREV_ORDERS_ID |

++-+-+---+-++

| +I | 2024-07-10 15:19:17.661 |   64911 |-772065898 |   342255977 |
  |

| +I | 2024-07-10 15:19:20.651 |   64911 |1165938984 |   162006411 |
  342255977 |

| +I | 2024-07-10 15:19:22.614 |   64911 |   -1000903042 | -2059780314 |
  162006411 |

```










--

Best!
Xuyang




在 2024-07-09 14:22:08,"Brandl, Johann"  写道:

Hi everyone,

i’m new to flink and tried some queries with flink sql.

 

Currently I have a problem with the LAG function. I want to emit a new record 
when the ORDERS_ID changes. To do this, I use the LAG function to detect 
whether this has changed.

However, I noticed that every now and then I cannot access the ORDERS_ID of the 
previous message. It seems to have to do with the proctime I use in the ORDER 
BY with LAG. As soon as the proctime changes in the range of seconds, I cannot 
access the last value and gives me NULL. Do any of you know what this could be?

Here is the query I use:

 

CREATE TABLE UNITS_DATA(

  proctime AS PROCTIME()

, `IDENT` DOUBLE

, `STEPS_ID` DOUBLE

, `ORDERS_ID` DOUBLE

) WITH (

  'connector' = 'kafka',

  'topic' = 'UNITS_DATA',

  'properties.bootstrap.servers' = 'http://myserver:9094',

  'scan.startup.mode' = 'latest-offset',

  'format' = 'avro-confluent',

  'avro-confluent.url' = 'http://myserver:8080/apis/ccompat/v6/'

);

 

WITH current_and_previous as (

SELECTproctime

, ud.STEPS_ID as STEPS_ID

, ud.IDENT as UNITS_DATA_ID

, ud.ORDERS_ID as ORDERS_ID

, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID 
ORDER BY proctime) PREV_ORDERS_ID

FROMUNITS_DATA ud

WHERE STEPS_ID=64911

)

select *

from current_and_previous;

 

Thanks in advance and best regards

 



Zollner Elektronik AG
Manfred-Zollner-Str. 1, 93499 Zandt, GERMANY / Phone: +49 9944 201-0 / Fax: +49 
9944 201-1314 / i...@zollner.de / www.zollner.de
Registered office of the company: Zandt / Registration court Regensburg HRB 8354

Managing Board: Ludwig Zollner (Spokesman of the Board) / Manfred Zollner jun. 
/ Christian Zollner / Thomas Schreiner / Markus Aschenbrenner
Supervisory Board: Manfred Zollner sen. (Chairman)


Wichtiger Hinweis: Diese E-Mail enthält vertrauliche Informationen. Wenn Sie 
diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den 
Absender und vernichten Sie diese E-Mail. Die Anfertigung unerlaubter Kopien, 
die Weitergabe der Information sowie die unbefugte Weitersendung der E-Mail ist 
nicht gestattet. Weiterführend können Hinweise zur Verarbeitung Ihrer 
personenbezogenen Daten (nach Art. 13 und 14 Datenschutzgrundverordnung - 
DSGVO) unter dem folgenden Link abgerufen werden: 
https://www.zollner.de/datenschutz-verarbeitungsverzeichnis-extern

Important note: This e-mail contains confidential information. If you received 
this e-mail by mistake, please inform the sender immediately and destroy this 
e-mail. The making of unauthorized copies, the passing on of the information 
and the unauthorized forwarding of the e-mail is not permitted. Furhermore 
information about the processing of your personal Information (according to 
art. 13 and 14 general data protection regulation - GDPR) can be retrieved on 
the page: 
https://www.zollner-electronics.com/en/data-protection-processing-directory-external
  

Event de duplication in flink with rabbitmq connector

2024-07-09 Thread banu priya
Hi All,

I have a Flink job with a RMQ source, tumbling windows (fires for each 2s),
an aggregator, then a RMQ sink. Incremental RocksDB checkpointing is
enabled with an interval of 5 minutes.

I was trying to understand Flink failure recovery. My checkpoint X is
started, I have sent one event to my source. As windows are triggered every
2s, my sink is updated with the aggregated result. But when I checked the
RabbitMQ console, my source queue still had unacked messages. (It is
expected and it is as per design
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
).

Now I restarted my task manager, as restart happens within the same
checkpoint interval and checkpoint X has not yet completed. The message is
not acknowledged and is sent again. Duplicate processing of events happens.

How to avoid these duplicates?


Thanks

Banu


Flink LAG-Function doesn't work as expected

2024-07-09 Thread Brandl, Johann
Hi everyone,
i'm new to flink and tried some queries with flink sql.

Currently I have a problem with the LAG function. I want to emit a new record 
when the ORDERS_ID changes. To do this, I use the LAG function to detect 
whether this has changed.
However, I noticed that every now and then I cannot access the ORDERS_ID of the 
previous message. It seems to have to do with the proctime I use in the ORDER 
BY with LAG. As soon as the proctime changes in the range of seconds, I cannot 
access the last value and gives me NULL. Do any of you know what this could be?
Here is the query I use:

CREATE TABLE UNITS_DATA(
  proctime AS PROCTIME()
, `IDENT` DOUBLE
, `STEPS_ID` DOUBLE
, `ORDERS_ID` DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'UNITS_DATA',
  'properties.bootstrap.servers' = 'http://myserver:9094',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://myserver:8080/apis/ccompat/v6/'
);

WITH current_and_previous as (
SELECTproctime
, ud.STEPS_ID as STEPS_ID
, ud.IDENT as UNITS_DATA_ID
, ud.ORDERS_ID as ORDERS_ID
, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID 
ORDER BY proctime) PREV_ORDERS_ID
FROMUNITS_DATA ud
WHERE STEPS_ID=64911
)
select *
from current_and_previous;

Thanks in advance and best regards



Zollner Elektronik AG
Manfred-Zollner-Str. 1, 93499 Zandt, GERMANY / Phone: +49 9944 201-0 / Fax: +49 
9944 201-1314 / i...@zollner.de / www.zollner.de
Registered office of the company: Zandt / Registration court Regensburg HRB 8354

Managing Board: Ludwig Zollner (Spokesman of the Board) / Manfred Zollner jun. 
/ Christian Zollner / Thomas Schreiner / Markus Aschenbrenner
Supervisory Board: Manfred Zollner sen. (Chairman)


Wichtiger Hinweis: Diese E-Mail enth?lt vertrauliche Informationen. Wenn Sie 
diese E-Mail irrt?mlich erhalten haben, informieren Sie bitte sofort den 
Absender und vernichten Sie diese E-Mail. Die Anfertigung unerlaubter Kopien, 
die Weitergabe der Information sowie die unbefugte Weitersendung der E-Mail ist 
nicht gestattet. Weiterf?hrend k?nnen Hinweise zur Verarbeitung Ihrer 
personenbezogenen Daten (nach Art. 13 und 14 Datenschutzgrundverordnung - 
DSGVO) unter dem folgenden Link abgerufen werden: 
https://www.zollner.de/datenschutz-verarbeitungsverzeichnis-extern

Important note: This e-mail contains confidential information. If you received 
this e-mail by mistake, please inform the sender immediately and destroy this 
e-mail. The making of unauthorized copies, the passing on of the information 
and the unauthorized forwarding of the e-mail is not permitted. Furhermore 
information about the processing of your personal Information (according to 
art. 13 and 14 general data protection regulation - GDPR) can be retrieved on 
the page: 
https://www.zollner-electronics.com/en/data-protection-processing-directory-external
  


Using BlobServer in FlinkDeployment

2024-07-09 Thread Saransh Jain
Hi all, I am deploying a FlinkDeployment CR in an Operator watched
namespace. I have passed these configs in the flinkConfiguration:

blob.server.address: "jobmanager"
blob.server.port: "6128"
blob.storage.directory: "/tmp/jars/"

There are a couple of jars that I don't want to make part of the image. But
they should be available in /opt/flink/lib/ in all pods. I am downloading
some jars using initContainer in the JobManager pod to /tmp/jars location
and then copying them in /opt/flink/lib/
Expectation is that these jars are shipped to TM pods /tmp/jars/ (from here
we will move these jars to /opt/flink/lib/) . But jars are not coming to
the TM pod. No relevant error logs in TRACE mode also. What am I missing?


Thanks & Regards
Saransh Jain


Encountering scala.matchError in Flink 1.18.1 Query

2024-07-08 Thread Norihiro FUKE
Hi, community

I encountered a scala.matchError when trying to obtain the table plan for
the following query in Flink 1.18.1.

The input data is read from Kafka, and the query is intended to perform a
typical WordCount operation. The query is as follows. SPLIT_STRING is a
Table Function UDF that splits sentences into words by spaces.

```SELECT
window_start,
word,
COUNT(*) AS `count`
FROM
TABLE(
  TUMBLE(TABLE input_table, DESCRIPTOR(proctime), INTERVAL '10' SECOND)),
  LATERAL TABLE(SPLIT_STRING(sentence)) AS T(word)
GROUP BY
window_start,
window_end,
word```

The error message received is:

```
[ERR] scala.MatchError:
rel#177237:FlinkLogicalCorrelate.LOGICAL.any.None:
0.[NONE].[NONE](left=RelSubset#177235,right=RelSubset#177236,correlation=$cor0,joinType=inner,requiredColumns={1})
(of class 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate)
```

I believe that the issue lies in the existNeighbourWindowTableFunc method
in flink-table-planner/WindowUtil.scala, where there is an unconsidered
node (FlinkLogicalCorrelate) when traversing the AST. (This method was
added in FLINK-32578.) I suspect this comes from the LATERAL entry. While
this query was FlinkLogicalCorrelate, I think there might be other
unconsidered nodes as well.

I have two questions regarding this:

   1. Is it an expected behavior for scala.matchError to occur in this
   case? In other words, I suspect this might be an unreported bug.
   2. In the code comments of the PR mentioned in the FLINK-32578 ticket, I
   found the terms "standard form" and "relax form." I searched for "relax
   form" in the Flink documentation but could not find any reference. As a
   workaround for this issue, using the WITH clause could be considered, but I
   am uncertain if this is a universal solution.

Thank you for your assistance.


  1   2   3   4   5   6   7   8   9   10   >