Cast Exception

2023-12-05 Thread Tauseef Janvekar
Dear Team,

I am getting cast exception in flink.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: class java.lang.String cannot be cast to class
java.lang.Double (java.lang.String and java.lang.Double are in module
java.base of loader 'bootstrap') at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

The code that I wrote is

Properties alerts = new Properties();

try (InputStream stream = OtelTransformerJob.class
.getClassLoader().getResourceAsStream("rule-based-config.txt")) {

alerts.load(stream);

}

Map rules = alerts.entrySet().stream()

.collect(Collectors.toMap(e -> (String) e.getKey(), e -> (Double) e
.getValue()));


Not sure what is the problem here

Thanks,
Tauseef


Re: Cast Exception

2023-12-05 Thread Tauseef Janvekar
Dear Team,

After changing the code to the below, error got resolved

 Map rules = alerts.entrySet().stream()

.collect(Collectors.toMap(e -> (String) e.getKey(), e -> Double.parseDouble
((String)e.getValue(;

Thanks,
Tauseef

On Tue, 5 Dec 2023 at 14:00, Tauseef Janvekar 
wrote:

> Dear Team,
>
> I am getting cast exception in flink.
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error: class java.lang.String cannot be cast to class
> java.lang.Double (java.lang.String and java.lang.Double are in module
> java.base of loader 'bootstrap') at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> The code that I wrote is
>
> Properties alerts = new Properties();
>
> try (InputStream stream = OtelTransformerJob.class
> .getClassLoader().getResourceAsStream("rule-based-config.txt")) {
>
> alerts.load(stream);
>
> }
>
> Map rules = alerts.entrySet().stream()
>
> .collect(Collectors.toMap(e -> (String) e.getKey(), e -> (Double) e
> .getValue()));
>
>
> Not sure what is the problem here
>
> Thanks,
> Tauseef
>


Re: Is the kafka-connector doc missing a dependency on flink-connector-base

2023-12-05 Thread Hang Ruan
Hi, Jean-Marc Paulin.

The flink-connector-base will not be packaged in the externalized
connectors [1].
The flink-connector-base has been included in flink-dist and we should use
the provided scope in maven for it.

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-30400

Jean-Marc Paulin  于2023年12月5日周二 01:12写道:

> Hi,
>
> Trying to update the kafka connector to my project and I am missing a
> class. Is the doc missing a dependency on flink-connector-base ?
>
> 
>   org.apache.flink
>   flink-connector-base
>   compile
> 
>
> I added it and it works. I think that's required but I would have expected
> this in the dependency on the  Kafka | Apache Flink
> 
>  page.
>
> Thanks
>
> JM
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: Cast Exception

2023-12-05 Thread Junrui Lee
Hello Tauseef,

The issue you're encountering is due to the fact that the Properties class
in Java stores both keys and values as Strings. When you are trying to cast
the value directly to Double, it throws a ClassCastException because values
from the properties file are loaded as String and cannot be directly cast
to Double.

To resolve this, you need to manually parse the String value to a Double.
Here's how you can modify your code to do this:
Map rules = alerts.entrySet().stream()
.collect(Collectors.toMap(
e -> (String) e.getKey(),
e -> Double.parseDouble((String) e.getValue())
));

Best,
Junrui

Tauseef Janvekar  于2023年12月5日周二 16:30写道:

> Dear Team,
>
> I am getting cast exception in flink.
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error: class java.lang.String cannot be cast to class
> java.lang.Double (java.lang.String and java.lang.Double are in module
> java.base of loader 'bootstrap') at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> The code that I wrote is
>
> Properties alerts = new Properties();
>
> try (InputStream stream = OtelTransformerJob.class
> .getClassLoader().getResourceAsStream("rule-based-config.txt")) {
>
> alerts.load(stream);
>
> }
>
> Map rules = alerts.entrySet().stream()
>
> .collect(Collectors.toMap(e -> (String) e.getKey(), e -> (Double) e
> .getValue()));
>
>
> Not sure what is the problem here
>
> Thanks,
> Tauseef
>


Concerns and Anomalies in Flink Window Functions with TumblingProcessingTimeWindows

2023-12-05 Thread arjun s
Hi team,
I'm a newcomer to Flink's window functions, specifically utilizing
TumblingProcessingTimeWindows with a configured window duration of 20
minutes. However, I've noticed an anomaly where the window output occurs
within 16 to 18 minutes. This has left me uncertain about whether I
overlooked a configuration or how to address this issue.

Additionally, my data source is a file continuously read from a directory.
I encountered a situation where, upon initiating my Flink job with a file
already present in the directory, the window output is generated before the
specified window time. I'm seeking guidance on how to resolve this
discrepancy.

Thanks,
Arjun


Advice on checkpoint interval best practices

2023-12-05 Thread Oscar Perez via user
Hei,

We are tuning some of the flink jobs we have in production and we would
like to know what are the best numbers/considerations for checkpoint
interval. We have set a default of 30 seconds for checkpoint interval and
the checkpoint operation takes around 2 seconds.
We have also enabled incremental checkpoint. I understand there is a
tradeoff between recovery from failure time vs performance degradation on
having an aggressive checkpoint policy but would like to know about what
you guys think it is a good compromise.

I read this article as reference:
https://shopify.engineering/optimizing-apache-flink-applications-tips

But what I would like is some formula or recipe in order to find out the
best value for checkpoint interval.

Regards,
Oscar


Re: Could not start the JobMaster. org.apache.flink.api.common.ExecutionConfig not found

2023-12-05 Thread Matwey V. Kornilov




Sorry for the noise. Reverting to version 1.17.2 fixed the issue.


04.12.2023 15:08, Matwey V. Kornilov пишет:

Hello,

I am trying to run very simple job locally via maven exec plugin:


public class DataStreamJob {

     public static void main(String[] args) throws Exception {
     final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


     DataStream dataStream = env.fromElements(1,2,3);

     dataStream.print();

     env.execute("Flink Java API Skeleton");
     }
}


To be able to run the jar package via maven exec plugin, I created the 
maven profile with flink-java, flink-core, flink-clients, flink-runtime, 
and flink-java-streaming installed. I see that the temporary cluster 
deploying is starting until the following error happens:



org.apache.flink.runtime.client.JobInitializationException: Could not 
start the JobMaster.
 at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
 at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
 at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
 at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)

 at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.util.concurrent.CompletionException: 
java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig
 at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
 at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
 at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)

 ... 3 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig
 at 
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
 at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
 at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)

 ... 3 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig
 at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
 at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)

 at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
 at java.base/java.lang.Class.forName0(Native Method)
 at java.base/java.lang.Class.forName(Class.java:534)
 at java.base/java.lang.Class.forName(Class.java:513)
 at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
 at 
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2061)
 at 
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1927)
 at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2252)
 at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1762)
 at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:540)
 at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:498)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
 at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
 at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:101)
 at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356)
 at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
 at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
 at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)

 ... 4 more


How could I figure

Re: Is the kafka-connector doc missing a dependency on flink-connector-base

2023-12-05 Thread Péter Váry
Hi JM,

The dependency is set here:
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/pom.xml#L50-L55


org.apache.flink
flink-connector-base
${flink.version}
provided


We expect that the non-provided dependencies will be embedded in the jar,
as they are required for the connector.

The 3.0 kafka connector could be deployed to Flink 1.17.0/1.17.1/1.18.0,
but the flink-connector-base could be different for these versions.
Other connectors might have a different support matrix, so embedding any
flink-connector-base dependency would be a bad practice.

I hope this helps,
Peter

Jean-Marc Paulin  ezt írta (időpont: 2023. dec. 4., H,
22:09):

> Hi,
>
> Trying to update the kafka connector to my project and I am missing a
> class. Is the doc missing a dependency on flink-connector-base ?
>
> 
>   org.apache.flink
>   flink-connector-base
>   compile
> 
>
> I added it and it works. I think that's required but I would have expected
> this in the dependency on the  Kafka | Apache Flink
> 
>  page.
>
> Thanks
>
> JM
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


S3 bucket as a source

2023-12-05 Thread Matwey V. Kornilov

Hello,

I have an S3 bucket and I would like to process the objects metainfo 
(such as keys (filenames), metainfo, tags, etc.).
I don't care about the objects content since it is irrelevant for my 
task. What I want is to construct a data stream where each instance is a 
metainfo attached to some object from the bucket.


Is it anyhow possible to tune and reuse the FileSystem connector for my 
purposes? The connector is provided to read content of files, while I 
would like to read content of directory, or metainfo for every file.




Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-05 Thread 秋成 王
Hi,

I am recently working on syncing my Flink log to Kafka via log4j2 Kafka 
appender. I have a log4j2.properties file which works fine locally, say run my 
flink fat jar form terminal via following command:
  PS D:\repo>>java -cp .\reconciliation-1.0-SNAPSHOT.jar 
The log can be synced to Kafka successfully when run locally.

The contents of log4j2.properties file are pasted below:
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = KafkaLog
appender.kafka.type = Kafka
appender.kafka.name = KafkaLog

appender.kafka.topic = topicName
appender.kafka.properties[0].type=Property
appender.kafka.properties[0].name=bootstrap.servers
appender.kafka.properties[0].value=
appender.kafka.properties[1].type=Property
appender.kafka.properties[1].name=sasl.mechanism
appender.kafka.properties[1].value=PLAIN
appender.kafka.properties[2].type=Property
appender.kafka.properties[2].name=sasl.jaas.config
appender.kafka.properties[2].value=org.apache.kafka.common.security.plain.PlainLoginModule
 required username="$ConnectionString" 
password="${env:log_event_hub_connection_string}";
appender.kafka.properties[3].type=Property
appender.kafka.properties[3].name=security.protocol
appender.kafka.properties[3].value=SASL_SSL

appender.kafka.layout.type = JsonTemplateLayout
appender.kafka.layout.eventTemplateUri = classpath:kusto-applogv2-layout.json
appender.kafka.layout.eventTemplateAdditionalField[0].type = 
EventTemplateAdditionalField
appender.kafka.layout.eventTemplateAdditionalField[0].key = Application
appender.kafka.layout.eventTemplateAdditionalField[0].value = reconciliation
appender.kafka.layout.eventTemplateAdditionalField[0].format = String
appender.kafka.layout.eventTemplateAdditionalField[1].type = 
EventTemplateAdditionalField
appender.kafka.layout.eventTemplateAdditionalField[1].key = Language
appender.kafka.layout.eventTemplateAdditionalField[1].value = Java
appender.kafka.layout.eventTemplateAdditionalField[1].format = String


I am now deploying Flink via Flink Kubernetes operator. However, after I copied 
the contents in log4j2.properties file to log4j-console.properties under 
section of logConfiguration in FlinkDeployment yaml, the kafka Appender failed 
to init with error message:

  2023-12-05 10:12:36,903 main ERROR Unable to locate plugin type for 
JsonTemplateLayout

  2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
EventTemplateAdditionalField

  2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
EventTemplateAdditionalField

  2023-12-05 10:12:37,047 main ERROR Unable to locate plugin for 
JsonTemplateLayout


My question is that Does Flink Kubernetes operator support Kafka appender 
configuration in log4j-console.properties? If so can anyone provide me with an 
example?


PS: similar error message once showed up when run locally, I fixed the issue 
with sulotion posted here. via adding

com.github.edwgiz.mavenShadePlugin.log4j2CacheTransformer.PluginsCacheFileTransformer
 to pom file.

java - Console contains an invalid element or attribute "JsonTemplateLayout" 
even after adding dependency - Stack 
Overflow


Thanks,

Chosen


Re: S3 bucket as a source

2023-12-05 Thread Feng Jin
Hi Matwey,

I think you can customize an inputFormat to meet your needs. And use the
FileSource::forBulkFileFormat interface to create a FileSource;

In the custom inputFormat, you can choose to only read the metadata of the
file without reading its content.


https://github.com/apache/flink/blob/1dac395967e5870833d67c6bf1103ba874fce601/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L171

public static  FileSourceBuilder forBulkFileFormat(
final BulkFormat bulkFormat, final Path...
paths) {
checkNotNull(bulkFormat, "reader");
checkNotNull(paths, "paths");
checkArgument(paths.length > 0, "paths must not be empty");

return new FileSourceBuilder<>(paths, bulkFormat);
}


Best,
Feng


On Tue, Dec 5, 2023 at 8:43 PM Matwey V. Kornilov 
wrote:

> Hello,
>
> I have an S3 bucket and I would like to process the objects metainfo
> (such as keys (filenames), metainfo, tags, etc.).
> I don't care about the objects content since it is irrelevant for my
> task. What I want is to construct a data stream where each instance is a
> metainfo attached to some object from the bucket.
>
> Is it anyhow possible to tune and reuse the FileSystem connector for my
> purposes? The connector is provided to read content of files, while I
> would like to read content of directory, or metainfo for every file.
>
>


Flink 1.18.0 jobs in the console keeps changing their orders

2023-12-05 Thread Ethan T Yang
Hello,

I recently upgraded Flink (from 1.13.1 -> 1.18.0). I noticed that the jobs in 
the Flink console keep changing their order for every refresh. I am wondering 
if there is a setting to keep them in the chronological order like the 1.13.1. 

Thanks.
Ivan

Flink Kubernetes HA

2023-12-05 Thread Ethan T Yang
Hi Flink users,
After upgrading Flink ( from 1.13.1 -> 1.18.0), I noticed the an issue when HA 
is enabled.( see exception below). I am using k8s deployment and I clean the 
previous configmaps, like leader files etc. I know the pekko is a recently 
thing. Can someone share doc on how to use or set it? When I disable HA, the 
deployment was successful. I also noticed a new configmap called 
“-cluster-config-map”, can someone provide reference on what it is for? I 
don’t see it in the 1.13.1 version.

Thanks a lot
Ivan


org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
not send message 
[LocalRpcInvocation(RestfulGateway.requestMultipleJobDetails(Time))] from 
sender [unknown] to recipient [pe
kko.tcp://flink@flink-secondary-jobmanager:6123/user/rpc/dispatcher_1], because 
the recipient is unreachable. This can either mean that the recipient has been 
terminated or that the remote RpcService i
s currently not reachable.
at com.sun.proxy.$Proxy55.requestMultipleJobDetails(Unknown Source) 
~[?:?]
at 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler.handleRequest(JobsOverviewHandler.java:65)
 ~[flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
 ~[flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]
at java.util.Optional.ifPresent(Unknown Source) [?:?]
at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
 [flink-dist-1.18.0.jar:1.18.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChann

Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-05 Thread Mason Chen
Hi Rui,

Sorry for the late reply. I was suggesting that perhaps we could do some
testing with Kubernetes wrt configuring values for the exponential restart
strategy. We've noticed that the default strategy in 1.17 caused a lot of
requests to the K8s API server for unstable deployments.

However, people in different Kubernetes setups will have different limits
so it would be challenging to provide a general benchmark. Another thing I
found helpful in the past is to refer to Kubernetes--for example, the
default strategy is exponential for pod restarts and we could draw
inspiration from what they have set as a general purpose default config.

Best,
Mason

On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi David and Mason,
>
> Thanks for your feedback!
>
> To David:
>
> > Given that the new default feels more complex than the current behavior,
> if we decide to do this I think it will be important to include the
> rationale you've shared in the documentation.
>
> Sounds make sense to me, I will add the related doc if we
> update the default strategy.
>
> To Mason:
>
> > I suppose we could do some benchmarking on what works well for the
> resource providers that Flink relies on e.g. Kubernetes. Based on
> conferences and blogs,
> > it seems most people are relying on Kubernetes to deploy Flink and the
> restart strategy has a large dependency on how well Kubernetes can scale to
> requests to redeploy the job.
>
> Sorry, I didn't understand what type of benchmarking
> we should do, could you elaborate on it? Thanks a lot.
>
> Best,
> Rui
>
> On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:
>
>> Hi Rui,
>>
>> I suppose we could do some benchmarking on what works well for the
>> resource providers that Flink relies on e.g. Kubernetes. Based on
>> conferences and blogs, it seems most people are relying on Kubernetes to
>> deploy Flink and the restart strategy has a large dependency on how well
>> Kubernetes can scale to requests to redeploy the job.
>>
>> Best,
>> Mason
>>
>> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
>> wrote:
>>
>>> Rui,
>>>
>>> I don't have any direct experience with this topic, but given the
>>> motivation you shared, the proposal makes sense to me. Given that the new
>>> default feels more complex than the current behavior, if we decide to do
>>> this I think it will be important to include the rationale you've shared in
>>> the documentation.
>>>
>>> David
>>>
>>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>>>
 Hi dear flink users and devs:

 FLIP-364[1] intends to make some improvements to restart-strategy
 and discuss updating some of the default values of exponential-delay,
 and whether exponential-delay can be used as the default
 restart-strategy.
 After discussing at dev mail list[2], we hope to collect more feedback
 from Flink users.

 # Why does the default restart-strategy need to be updated?

 If checkpointing is enabled, the default value is fixed-delay with
 Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
 the job will restart infinitely with high frequency when a job
 continues to fail.

 When the Kafka cluster fails, a large number of flink jobs will be
 restarted frequently. After the kafka cluster is recovered, a large
 number of high-frequency restarts of flink jobs may cause the
 kafka cluster to avalanche again.

 Considering the exponential-delay as the default strategy with
 a couple of reasons:

 - The exponential-delay can reduce the restart frequency when
   a job continues to fail.
 - It can restart a job quickly when a job fails occasionally.
 - The restart-strategy.exponential-delay.jitter-factor can avoid r
   estarting multiple jobs at the same time. It’s useful to prevent
   avalanches.

 # What are the current default values[4] of exponential-delay?

 restart-strategy.exponential-delay.initial-backoff : 1s
 restart-strategy.exponential-delay.backoff-multiplier : 2.0
 restart-strategy.exponential-delay.jitter-factor : 0.1
 restart-strategy.exponential-delay.max-backoff : 5 min
 restart-strategy.exponential-delay.reset-backoff-threshold : 1h

 backoff-multiplier=2 means that the delay time of each restart
 will be doubled. The delay times are:
 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.

 The delay time is increased rapidly, it will affect the recover
 time for flink jobs.

 # Option improvements

 We think the backoff-multiplier between 1 and 2 is more sensible,
 such as:

 restart-strategy.exponential-delay.backoff-multiplier : 1.2
 restart-strategy.exponential-delay.max-backoff : 1 min

 After updating, the delay times are:

 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s,

Re: Flink 1.18.0 jobs in the console keeps changing their orders

2023-12-05 Thread Yuxin Tan
Hi, Ethan,

Do you mean that the jobs of Flink web UI? If I remember correctly, the
order
does change with each refresh, however, there is an option to sort them
using
the provided sort button. If you believe that having a consistent order is
a
significant necessity, maybe you can start a discussion in the mailing list
to
improve it.

Best,
Yuxin


Ethan T Yang  于2023年12月6日周三 06:47写道:

> Hello,
>
> I recently upgraded Flink (from 1.13.1 -> 1.18.0). I noticed that the jobs
> in the Flink console keep changing their order for every refresh. I am
> wondering if there is a setting to keep them in the chronological order
> like the 1.13.1.
>
> Thanks.
> Ivan


Re: Flink 1.18.0 jobs in the console keeps changing their orders

2023-12-05 Thread Ethan T Yang
Ah, I got it. The behavior is slight different than the older version, but I 
guess the new one has a sort button on the job names. So I click it and the 
order will stay in the same session as opposed to keep moving on every refresh 
when no sorted. So I guess it is just a learn curve. Thanks Yuxin for the tip.

> On Dec 5, 2023, at 7:06 PM, Yuxin Tan  wrote:
> 
> Hi, Ethan,
> 
> Do you mean that the jobs of Flink web UI? If I remember correctly, the order 
> does change with each refresh, however, there is an option to sort them using 
> the provided sort button. If you believe that having a consistent order is a 
> significant necessity, maybe you can start a discussion in the mailing list 
> to 
> improve it.
> 
> Best,
> Yuxin
> 
> 
> Ethan T Yang mailto:ivanygy...@gmail.com>> 
> 于2023年12月6日周三 06:47写道:
>> Hello,
>> 
>> I recently upgraded Flink (from 1.13.1 -> 1.18.0). I noticed that the jobs 
>> in the Flink console keep changing their order for every refresh. I am 
>> wondering if there is a setting to keep them in the chronological order like 
>> the 1.13.1. 
>> 
>> Thanks.
>> Ivan



Re: Advice on checkpoint interval best practices

2023-12-05 Thread Hangxiang Yu
Hi, Oscar.
Just share my thoughts:
Benefits of more aggressive checkpoint:
1. less recovery time as you mentioned (which is also related to data flink
has to rollback to process)
2. less end-to-end latency for checkpoint-bounded sink in exactly-once mode
Costs of more aggressive checkpoint:
1. more resources e.g. CPU, Network
2. performance degradation as you mentioned (It will become more obvious If
there are some resources bottleneck)

So if your job doesn't have high requirements about the above benefits, you
could choose a bigger checkpoint interval, e.g. 3 mins.
If not, you could control it within 1 min and try to decrease it until it
could match your requirements.

On Tue, Dec 5, 2023 at 6:56 PM Oscar Perez via user 
wrote:

> Hei,
>
> We are tuning some of the flink jobs we have in production and we would
> like to know what are the best numbers/considerations for checkpoint
> interval. We have set a default of 30 seconds for checkpoint interval and
> the checkpoint operation takes around 2 seconds.
> We have also enabled incremental checkpoint. I understand there is a
> tradeoff between recovery from failure time vs performance degradation on
> having an aggressive checkpoint policy but would like to know about what
> you guys think it is a good compromise.
>
> I read this article as reference:
> https://shopify.engineering/optimizing-apache-flink-applications-tips
>
> But what I would like is some formula or recipe in order to find out the
> best value for checkpoint interval.
>
> Regards,
> Oscar
>


-- 
Best,
Hangxiang.