Re: Exactly once guarantee with beam, flink and kafka

2023-05-18 Thread Cristian Constantinescu
Hi Ifat, I don't work with Beam and Flink anymore so the below is from memory. When I went through what you are going through, I spent quite some time to get EOS working. While it kinda worked, I ran into several issues which I'll describe below. The issue with EOS is that it uses Kafka

Beam saves filepaths in Flink's state

2022-12-08 Thread Cristian Constantinescu
Hi everyone, I noticed that the Flink state contains KafkaIO's consumer config properties. When restarting the Beam pipeline (Java SDK) from state, the Flink Runner translation layer will deserialize the KafkaUnboudedReader (via UnboundedSourceWrapper) from Flink's state. This happens *before*

Beam, Flink state and Avro Schema Evolution is problematic

2022-11-18 Thread Cristian Constantinescu
Hi everyone, I'm using Beam on Flink with Avro generated records. If the record schema changes, the Flink state cannot be restored. I just want to send this email out for anyone who may need this info in the future and also ask others for possible solutions as this problem is so easily hit, that

Re: [JAVA] Batch elements from a PCollection

2022-08-10 Thread Cristian Constantinescu
Hi, If the keys bother you, you can .apply(WithKeys.of("")) before the GroupIntoBatches transform. This effectively removes parallelism as all items are funneled through one executor. Note that I think that GroupIntoBatches into batches might be broken on Flink [1]. Alternatively, create your

Re: Possible bug in ConfluentSchemaRegistryDeserializerProvider withe schema evolution

2022-07-28 Thread Cristian Constantinescu
Attaching these two links which kinda point in the same direction as my previous e-mail: https://ambitious.systems/avro-writers-vs-readers-schema https://ambitious.systems/avro-schema-resolution On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu wrote: > Hi everyone, > > W

Possible bug in ConfluentSchemaRegistryDeserializerProvider withe schema evolution

2022-07-28 Thread Cristian Constantinescu
Hi everyone, When using KafkaIO to deserialize to avro SpecificRecords in combination with ConfluentSchemaRegistryDeserializerProvider, it fails when the schema in the avro generated classes (theSpecificRecords) and the schema registry schema (used to serialize the given message) mismatch. My

GroupIntoBatches not working on Flink?

2022-07-26 Thread Cristian Constantinescu
Hi everyone, Quick question about GroupIntoBatches. When running on Flink, eventually it hits an unsupported exception "Canceling a timer by ID is not yet supported." on this line [1]. The source inputs are AVRO files for testing (batch) but will use kafka topics (streaming) when deployed. This

Re: possible data loss with Kafka I/O

2022-06-07 Thread Cristian Constantinescu
Hey Deepak, I have observed this too. See point "a" in "Other quirks I found:" in this thread [1]. [1] https://lists.apache.org/thread/ksd4nfjmzmp97hs2zgn2mfpf8fsy0myw On Tue, Jun 7, 2022 at 2:13 PM Chamikara Jayalath wrote: > > > On Tue, Jun 7, 2022 at 11:06 AM Deepak Nagaraj > wrote: > >>

Re: [QUESTION] Distinct transform precautions

2022-03-30 Thread Cristian Constantinescu
check). So even if it's not in memory, I would need to figure out if it causes issues with Flink's state requirement. On Wed, Mar 30, 2022 at 11:35 AM Reuven Lax wrote: > > > On Wed, Mar 30, 2022 at 6:16 AM Cristian Constantinescu > wrote: > >> Hi everyone, >> >>

[QUESTION] Distinct transform precautions

2022-03-30 Thread Cristian Constantinescu
Hi everyone, About the Distinct [1] transformation. I couldn't find what precautions I need to take when using it in terms of memory consumption and performance. Furthermore, how does it behave if the pipeline crashes/restarted from state, is its state restored on rerun (hence removes duplicates

Clarification on transform unique names and Flink state

2022-02-10 Thread Cristian Constantinescu
Hi everyone, It's my understanding that Beam names it's transforms in a hierarchical fashion. So for example if I have: // main p.apply("foo_name", new Foo()) // Foo.java public class Foo extends PTransform<...>{ @Override public PCollection<...> expand(PCollection<...> input){ return

Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Cristian Constantinescu
programmatically before Pipeline.create(options) call is made. Cheers, Cristian [1] https://beam.apache.org/documentation/runners/flink/ On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu wrote: > Hey Jan, > > I agree that silently ignoring the parameter is misleading and, in

Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Cristian Constantinescu
gt; > Would you file a JIRA? Or possibly create a PR to fix this? > > Best, > > Jan > On 2/3/22 07:12, Cristian Constantinescu wrote: > > Hi everyone, > > I've done some digging within the Beam source code. It looks like when the > flinkMaster argument is not set, the sav

Re: Beam State with the Flink Runner when things go wrong

2022-02-02 Thread Cristian Constantinescu
/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174 [2] https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183 On Wed, Feb 2, 2022 at 3:01 PM Cristian

Re: Beam State with the Flink Runner when things go wrong

2022-02-02 Thread Cristian Constantinescu
leases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-- > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Wed, 2 Feb 2022 at 1

Beam State with the Flink Runner when things go wrong

2022-02-02 Thread Cristian Constantinescu
Hi everyone, I'm trying to figure out how pipeline state works with Beam running on Flink Classic. Would appreciate some help with the below. My understanding is that on recovery (whether from a checkpoint or savepoint), Flink recreates the operators (I guess those are DoFns in Beam) with

Potential bug: AutoValue + Memoized fields

2021-10-28 Thread Cristian Constantinescu
Hi everyone, Looks like Beam has a little bit of an issue when using AutoValues with Memoized (cached) fields. It's not a big issue, and the workaround is simply not using Memoised fields at the cost of a little performance. (See comment in code snippet) The code further below produces this

Re: Potential Bug: Beam + Flink + AutoValue Builders

2021-10-26 Thread Cristian Constantinescu
ncy classloader munging, and that might be breaking an assumption in this > code. Passing in the correct classloader should hopefully fix this. > > Reuven > > > On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu > wrote: > >> Hi everyone, >> >&g

Potential Bug: Beam + Flink + AutoValue Builders

2021-10-26 Thread Cristian Constantinescu
Hi everyone, Not sure if anyone is using Beam with the Flink Runner and AutoValue builders. For me, it doesn't work. I have some questions and a workaround for anyone in the same boat. Beam 2.31, Flink 1.13, AutoValue 1.8.2 Here's the code: package org.whatever.testing; import

Re: Why is Avro Date field using InstantCoder?

2021-10-22 Thread Cristian Constantinescu
to the beam >> repo will take 1-2 months to make it to a non-snapshot build even if you do >> find a long term solution acceptable to all interested parties. >> >> -Daniel >> >> On Mon, Oct 18, 2021 at 1:46 PM Cristian Constantinescu >> wrote: >> >>>

Re: Why is Avro Date field using InstantCoder?

2021-10-18 Thread Cristian Constantinescu
wrote: > Do you know if it's easy to detect which version of Avro is being used? > > On Sun, Oct 17, 2021 at 10:20 PM Cristian Constantinescu > wrote: > >> If I had to change things, I would: >> >> 1. When deriving the SCHEMA add a few new types (JAVA_TIME, JAVA

Re: Why is Avro Date field using InstantCoder?

2021-10-17 Thread Cristian Constantinescu
ion code at runtime with > ByteBuddy, we could potentially just generate different conversions > depending on the Avro version. > > On Fri, Oct 15, 2021 at 11:56 PM Cristian Constantinescu > wrote: > >> Those are fair points. However please consider that there might be n

Re: Why is Avro Date field using InstantCoder?

2021-10-16 Thread Cristian Constantinescu
guess what I'm saying is that there's definitely a non-negligible cost associated with old 3rd party libs in Beam's code (even if efforts are put in to minimize them). On Sat, Oct 16, 2021 at 2:33 AM Reuven Lax wrote: > > > On Fri, Oct 15, 2021 at 11:13 PM Cristian Constantinescu

Re: Why is Avro Date field using InstantCoder?

2021-10-16 Thread Cristian Constantinescu
e > with avro 1.8? If so, this might be tricky to fix, since Beam maintains > backwards compatibility on its public API. > > On Fri, Oct 15, 2021 at 5:38 PM Cristian Constantinescu > wrote: > >> Hi all, >> >> I've created a small demo project to show the iss

Re: Why is Avro Date field using InstantCoder?

2021-10-15 Thread Cristian Constantinescu
a mapping. I *think* the intention > is that we generate logic for converting Date to/from Instant when making a > getters for a RowWithGetters backed by Avro. > > Brian > > On Thu, Oct 14, 2021 at 4:43 AM Cristian Constantinescu > wrote: > >> A little bit more co

Re: Why is Avro Date field using InstantCoder?

2021-10-14 Thread Cristian Constantinescu
take into account LogicalTypes. I think that's where the problem is. If anyone who knows that code could have a look and let me know their thoughts, I can try to fix the issue if we agree that there is one. On Thu, Oct 14, 2021 at 7:12 AM Cristian Constantinescu wrote: > Hi all, > > I have the foll

Why is Avro Date field using InstantCoder?

2021-10-14 Thread Cristian Constantinescu
Hi all, I have the following field in one of my avro schemas: { "name": "somedate", "type: {"type": "int", "logicalType": "date"} } This generates a java.time.LocalDate field in the corresponding java class (call it Foo). AvroUtils.toBeamSchema(FooClass.getSchema()) will return that field as

Re: Limit the concurrency of a Beam Step (or all the steps)

2021-09-24 Thread Cristian Constantinescu
Hey Marco, Other more senior people can correct me here. About limiting the concurrency aspect of things. Beam/Runners split PCollections of by Key. So as long as all your items have the same key, I think it will create only one executor for that ParDo. So that's what I did recently: 1. create

Re: Beam Schemas and refactoring code

2021-09-10 Thread Cristian Constantinescu
I wonder if you have recursive (or > mutually recursive) fields. > > Reuven > > On Fri, Sep 10, 2021 at 12:53 AM Cristian Constantinescu > wrote: > >> Hi everyone, >> >> Every article and talk about Beam recommends using Schemas and Row. >> Howe

Beam Schemas and refactoring code

2021-09-10 Thread Cristian Constantinescu
Hi everyone, Every article and talk about Beam recommends using Schemas and Row. However, using Row throughout my pipelines makes things very difficult to refactor code when schemas change compared to POJOs/Beans that provide static code analysis in the IDE. Does anyone have any tips or tricks

Re: Using Beam to generate unique ids with unbounded sources

2021-09-10 Thread Cristian Constantinescu
e Pipeline. This should turn the source >> into bounded source effectively. >> >> Jan >> >> [1] >> >> https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.Timestamp

Using Beam to generate unique ids with unbounded sources

2021-07-22 Thread Cristian Constantinescu
Hi All, I would like to know if there's a suggested pattern for the below scenario. TL;DR: reading state from Kafka. I have a scenario where I'm listening to a kafka topic and generate a unique id based on the properties of the incoming item. Then, I output the result to another kafka topic. The