Re: Session mode on Kubernetes and # of TMs

2021-05-10 Thread Yangze Guo
Hi, Youngwoo In K8S session, the number of TMs depends on how many slots your job needs and the number of slots per task managers (config key: taskmanager.numberOfTaskSlots). In this case, # of TM = Ceil(total slots need / taskmanager.numberOfTaskSlots) How many your job's topology and parallel

Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Yang Wang
Just a side input, not only the persistent volume could help with keeping the local state for the TaskManager pod, but also the ephemeral storage. Ephemeral storage is bound to the lifecycle of TaskManager pod. And it could be shared between different restarts of TaskManager container. Best, Yan

Session mode on Kubernetes and # of TMs

2021-05-10 Thread 김영우
Hi, I have deployed a cluster with session mode on kubernetes and I can see one deployment, services and one JM. I'm trying to run a SQL query through sql client. for instance, 'INSERT INTO ... SELECT ...;' When I run the query in cli, the Flink session is spinning up a TM for the query and then

Re: Query on passing memory parameters

2021-05-10 Thread Chesnay Schepler
yes, the space is expected. The logging settings do not have a space because they are set as environment variables, whereas the memory options are essentially just set as program arguments. On 5/10/2021 2:15 PM, V N, Suchithra (Nokia - IN/Bangalore) wrote: Hello, We are using Apache flink 1.

sideOutputLateData not propagating late reports once window expires

2021-05-10 Thread Slotterback, Chris
Hey Flink Users, I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to th

Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Sonam Mandal
Hi Till, Sure, that sounds good. I'll open a FLIP for this when we start working on it. Thanks for the insights! Regards, Sonam From: Till Rohrmann Sent: Monday, May 10, 2021 2:26 AM To: Sonam Mandal Cc: dev ; user@flink.apache.org Subject: Re: Task Local Reco

Re: Splitting stream

2021-05-10 Thread Arvid Heise
Hi Nikola, side outputs definitively are at least as efficient as using two filters but they are also harder to implement and maintain. Do you actually have a use case where every bit of performance counts? If so, please also check enableObjectReuse [1] and look into serialization [2]. Also if y

Re: Splitting stream

2021-05-10 Thread Taher Koitawala
I think what your looking for is a a side output. Change the logic to a process function. What is true goes to collector false can go to a side output. Which now gives you 2 streams On Mon, May 10, 2021, 8:14 PM Nikola Hrusov wrote: > Hi Arvid, > > In my case it's the latter, thus I have also th

Re: Splitting stream

2021-05-10 Thread Nikola Hrusov
Hi Arvid, In my case it's the latter, thus I have also thought about using the filter (map is not useful in my case). What I am not sure which is better to be used? In what case would you split a stream with side output and in what case with filter? Would there be any performance gain/pain based

Could not resolve ResourceManager address in native kubernetes

2021-05-10 Thread Valentin Wallyn
Hi, I'm trying to use Flink on native kubernetes ( https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/) but I have an error even with the example from the documentation. The job get submitted but stays in "created" status until it tim

can someone explain how Flink works with Java generics

2021-05-10 Thread Zhenhao Li
Hi there, I mostly work with Scala and don't know too much about the internal machinery of Java. There is something very confusing to me. Since Java generics don't support type variations like Scala, why do you have things like ``` public class TumblingEventTimeWindows extends WindowAssigner ```

Re: Splitting stream

2021-05-10 Thread Arvid Heise
Hi Nikola, if you just want to apply a different user function to the records depending on the property "exist" the simplest way is to use source -> map(if exist do this else that) -> sink If it turns out that you want to apply a different subgraph, you can do source -> filter(if exist) -> do t

Re: Questions about implementing a flink source

2021-05-10 Thread Arvid Heise
Hi Evan, A few replies / questions inline. Somewhat relatedly, I'm also wondering > where this connector should live. I saw that there's already a pubsub > connector in > https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/m

Splitting stream

2021-05-10 Thread Nikola Hrusov
Hi, I am trying to find some information on what is the best way to split a stream of the same data. For the given scenario: I have an object which has a property "exist" I want to split the stream based on this property, do something, and afterwards join it again into a single stream. Initial

Re: Flink: Clarification required

2021-05-10 Thread Dawid Wysakowicz
Hi Jessy, I have a stateless Flink application where the source and sink are two different Kafka topics. Is there any benefit in adding checkpointing for this application?. will it help in some way for the rewind and replays while restarting from the failure? If you do want to mak

Query on passing memory parameters

2021-05-10 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello, We are using Apache flink 1.12.1 version and using standalone-job.sh to start the per-job cluster mode flink. In the logs we could see some memory configurations parameters are being added as dynamic parameters. When checked the Java process system properties we can see the below output:

How to process recent events from Kafka and older ones from another storage?

2021-05-10 Thread Svend
Hi everyone, What is the typical architectural approach with Flink SQL for processing recent events from Kafka and older events from some separate cheaper storage? I currently have the following situation in mind: * events are appearing in Kafka and retained there for, say, 1 month * events are

Re: Possible way to avoid unnecessary serialization calls.

2021-05-10 Thread Dawid Wysakowicz
Hi Alex, If you are sure that the operations in between do not change the partitioning of the data and keep the key constant for the whole pipeline you could use the reinterpretAsKeyedStream[1]. I guess this answers your questions 1 & 2. As for the third question, first of all you should look int

Re: What does enableObjectReuse exactly do?

2021-05-10 Thread Dawid Wysakowicz
Hi, In the streaming API, the biggest difference is that if you do not disable object reuse, records will be duplicated/copied when forwarding from an operator to the downstream one. If you are sure you work with immutable objects, I'd highly recommend enabling object reuse. Best, Dawid On 08/0

Re: Read kafka offsets from checkpoint - state processor

2021-05-10 Thread Dawid Wysakowicz
Hi, You would need to look into the internals of FlinkKafkaConsumerBase. In the current master the state for offsets is initialized in here: https://github.com/apache/flink/blob/fbf84acf63102db455c89cb8e497cda423a1c4d5/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming

Re: [Avro] TypeSerializer Example

2021-05-10 Thread Sandeep khanzode
Hello, Can someone please assist for this query? Thanks! Thanks, Sandeep > On 06-May-2021, at 10:30 AM, Sandeep khanzode wrote: > > Hi, > > Is there a working example somewhere that I can refer for writing Avro > entities in Flink state as well as Avro serializaition in > KafkaConsumer/Prod

Re: Unsubscribe

2021-05-10 Thread Dawid Wysakowicz
Hi all, Before reaching out to the INFRA team. May I ask all of you to make sure that you follow the two-step process? After sending the initial mail to the user-unsubscr...@flink.apache.org you should receive a request for a confirmation. If you did conf

Re: callback by using process function

2021-05-10 Thread Dawid Wysakowicz
Hi, I am sorry, but I think I don't fully get your question. Could you try to rephrase it? Maybe an example could help. Generally speaking the KeyedProcessFunction is scoped to a single key. Whenever you access a state (MapState, ValueState, ... ) it keeps the current value of that state for the

Re: some questions about data skew

2021-05-10 Thread Dawid Wysakowicz
Hi, What you could do to improve processing of a skewed data is to introduce an artificial preaggregation. You could add some artificial uniformly distributed secondary key and calculate your aggregates on (original key, secondary uniform key) and then do the final aggregation in an additional ste

Re: Job failing after enabling Checkpointing

2021-05-10 Thread sudhansu jena
Hi Chesnay, Thank you so much for the help. The fix is working now. Thanks, Sudhansu On Mon, May 10, 2021 at 2:48 PM Chesnay Schepler wrote: > From what I can tell this method does exist in 1.12.2 . > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/ap

Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Till Rohrmann
Hi Sonam, I think it would be great to create a FLIP for this feature. FLIPs don't have to be super large and in this case, I could see it work to express the general idea to make local recovery work across TaskManager failures and then outline the different ideas we had so far. If we then decide

Re: Job failing after enabling Checkpointing

2021-05-10 Thread Chesnay Schepler
From what I can tell this method does exist in 1.12.2 . https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/ExecutionConfig.html#addDefaultKryoSerializer-java.lang.Class-java.lang.Class- https://github.com/apache/flink/blob/release-1.12.2/flink-core/

Flink: Clarification required

2021-05-10 Thread Jessy Ping
Hi all, Currently, we are exploring the various features of Flink and need some clarification on the below-mentioned questions. - I have a stateless Flink application where the source and sink are two different Kafka topics. Is there any benefit in adding checkpointing for this applica

Re: Job failing after enabling Checkpointing

2021-05-10 Thread sudhansu jena
Hi, Thanks for the prompt response. I have already visited that page but in the current flink version i.e 1.12.2, the method addDefaultKryoSerializer is not available in the config object. Thanks, Sudhansu On Mon, May 10, 2021 at 2:24 PM Chesnay Schepler wrote: > Please have a look at > https:

Re: Job failing after enabling Checkpointing

2021-05-10 Thread Chesnay Schepler
Please have a look at https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink On 5/10/2021 10:48 AM, sudhansu jena wrote: Hi Team, We have recently enabled check pointing in our flink job using S3 as the state backend, but while submitting the

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Miguel Araújo
I've realized this is not such a big issue because it's also upper bounded by the number of watermarks received, and it won't be one per event. Miguel Araújo escreveu no dia segunda, 10/05/2021 à(s) 09:39: > Thanks Dawid, having a look at CepOperator was useful. I implemented > something with on

Job failing after enabling Checkpointing

2021-05-10 Thread sudhansu jena
Hi Team, We have recently enabled check pointing in our flink job using S3 as the state backend, but while submitting the Job, it fails with the below error.Can you please let us know what is going wrong here. Below is the code snippet for enabling check pointing. env.setStateBackend(new

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Miguel Araújo
Thanks Dawid, having a look at CepOperator was useful. I implemented something with one difference I feel might be important: I noticed that in the CepOperator the timer is being registered for currentWatermark+1, instead of using the event's timestamp. Is there a reason for this? I think this imp

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Dawid Wysakowicz
Hey Miguel, I think you could take a look at the CepOperator which does pretty much what you are describing. As for more direct answers for your questions. If you use KeyedProcessFunction it is always scoped to a single Key. There is no way to process events from other keys. If you want to have m

Re: The problem of getting data by Rest API

2021-05-10 Thread Chesnay Schepler
The short answer is that it is not (really); it's just some display artifact due to how to actual data from the REST API is propagated to the UI. On 5/10/2021 3:10 AM, penguin. wrote: Thanks for your reply,so why is the data of Web UI updated every 3S? At 2021-05-07 15:19:50, "Chesnay S