class Foo extends TupleN {}

2022-01-12 Thread Jin Yi
probably a dumb quesiton, but will the serde performance in flink for the class Foo (from the subject) behave like a POJO or a TupleN?

Re: Parallelism of Flink SQL LookupTableSource in 1.14 ..

2022-01-12 Thread Chesnay Schepler
From what I can tell there's currently no way to control the parallelism of the LookupTableSource. It /seems/ to be bound to the parallelism of the input stream (i.e., the CDC stream). I'd suggest to create a jira ticket for this. On 11/01/2022 06:43, Jonathan Weaver wrote: I'm attempting to d

Re: what is efficient way to write Left join in flink

2022-01-12 Thread Chesnay Schepler
Your best bet is to try out both approaches with some representative data. On 12/01/2022 08:11, Ronak Beejawat (rbeejawa) wrote: Hi Team, Can you please help me with the below query, I wanted to know which approach will be better and efficient for multiple left join within one min tumbling wi

Re: class Foo extends TupleN {}

2022-01-12 Thread Chesnay Schepler
It _should_ be the same, but I'd double-check the logs that Flink uses the right serializer. On 12/01/2022 08:59, Jin Yi wrote: probably a dumb quesiton, but will the serde performance in flink for the class Foo (from the subject) behave like a POJO or a TupleN?

Re: JVM SEGV crash in 1.14.2 for scala 2.12

2022-01-12 Thread Chesnay Schepler
This appears to be some general incompatibility between Mac OS Monterey and OpenJDK 8 that we can't do much about. Starting from Flink 1.15 support for Java 8 is deprecated (Java 11 is now the default) and will be removed in the future. On 12/01/2022 03:11, Eugene Chung wrote: Hi all, I dow

Re: Sorting/grouping keys and State management in BATCH mode

2022-01-12 Thread Dawid Wysakowicz
Hey Krzysztof, Re 1. I believe you are asking where the state is kept. It is stored in memory, but bear in mind there is only ever state kept for the current key. Once all records for a key are processed the corresponding state is discarded as it won't be needed anymore. Re 2. The sorting algorit

Re: Async IO code not working

2022-01-12 Thread Chesnay Schepler
It would have good to clarify which line causes the error; as is I can only guess. Please make sure you use the scala variant of the AsyncDataStream (org.apache.flink.streaming.api.scala.AsyncDataStream). On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote: I am using below code to get the data f

Re: Flink native k8s integration vs. operator

2022-01-12 Thread Konstantin Knauf
cc dev@ Hi Thomas, Hi everyone, Thank you for starting this discussion and sorry for chiming in late. I agree with Thomas' and David's assessment of Flink's "Native Kubernetes Integration", in particular, it does actually not integrate well with the Kubernetes ecosystem despite being called "nat

Re: Could not find any factory for identifier 'jdbc'

2022-01-12 Thread Chesnay Schepler
I would try double-checking whether the jdbc connector was truly bundled in your jar, specifically whether org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory is. I can't think of a reason why this shouldn't work for the JDBC connector. On 12/01/2022 06:34, Ronak Beejawat (rbeejawa)

Re: Could not find any factory for identifier 'jdbc'

2022-01-12 Thread Roman Khachatryan
Hi, I think Chesnay's suggestion to double-check the bundle makes sense. Additionally, I'd try flink-connector-jdbc_2.12 instead of flink-connector-jdbc_2.11. Regards, Roman On Wed, Jan 12, 2022 at 12:23 PM Chesnay Schepler wrote: > > I would try double-checking whether the jdbc connector was t

Re: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-12 Thread Roman Khachatryan
Hi Ronak, You shared a screenshot of JM. Do you mean that exception also happens on JM? (I'd rather assume TM). Could you explain the join clause: left join ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) "version" doesn't sound very selective, so maybe you end up with (almost) Carte

Re: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-12 Thread Martijn Visser
Hi Ronak, I would like to ask you to stop cross-posting to all the Flink mailing lists and then also post the same question to Stackoverflow. Both the mailing lists and Stackoverflow are designed for asynchronous communication and you should allow the community some days to address your question.

Re: Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-12 Thread Fabian Paul
Hi Kevin, No, the state is not compatible but it is also not necessary because if the FlinkKafkaProducer is stopped with a savepoint all transactions are finalized and the new KafkaSink uses a different mechanism to track transaction ids. [1] It should be enough to recover from the savepoint with

Re: Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-12 Thread Kevin Lam
Perfect, thanks for your consistently helpful and prompt responses Fabian! On Wed, Jan 12, 2022 at 10:25 AM Fabian Paul wrote: > Hi Kevin, > > No, the state is not compatible but it is also not necessary because > if the FlinkKafkaProducer is stopped with a savepoint all transactions > are final

How to avoid duplicates and enable Exactly Once - End to End

2022-01-12 Thread Patrick.Eifler
Hi, we are working on a flink pipeline and running into duplicates in case of checkpoint failures. The pipeline is running on Flink 1.13.2 and uses the source and sink classes from the flink kafka connector library. The checkpointing is set to exactly once and we do care about correctness of

Re: How to avoid duplicates and enable Exactly Once - End to End

2022-01-12 Thread Roman Khachatryan
Hi, In exactly-once mode, Flink sends processing results to Kafka in a transaction. It only commits this transaction once the checkpoint succeeds; otherwise, the transaction is rolled back. So reading the same records again on recovery should not create duplicates. You're probably seeing duplicat

Re: Async IO code not working

2022-01-12 Thread Siddhesh Kalgaonkar
Hi Chesnay, Thanks for your time. Much appreciated. I am getting error on the below line: val res:DataStream[String]=AsyncDataStream.unorderedWait(goodRecords,new CassandraAsyncSink(),1000,TimeUnit.SECONDS,100) Yes, earlier it was a wrong import but it is still giving me the below error: type

Re: Flink native k8s integration vs. operator

2022-01-12 Thread Thomas Weise
Hi everyone, Thanks for the feedback and discussion. A few additional thoughts: [Konstantin] > With respect to common lifecycle management operations: these features are > not available (within Apache Flink) for any of the other resource providers > (YARN, Standalone) either. From this perspectiv