Hi Ifat,
I don't work with Beam and Flink anymore so the below is from memory.
When I went through what you are going through, I spent quite some time to
get EOS working. While it kinda worked, I ran into several issues which
I'll describe below.
The issue with EOS is that it uses Kafka
Hi everyone,
I noticed that the Flink state contains KafkaIO's consumer config properties.
When restarting the Beam pipeline (Java SDK) from state, the Flink
Runner translation layer will deserialize the KafkaUnboudedReader (via
UnboundedSourceWrapper) from Flink's state. This happens *before*
Hi everyone,
I'm using Beam on Flink with Avro generated records. If the record
schema changes, the Flink state cannot be restored. I just want to
send this email out for anyone who may need this info in the future
and also ask others for possible solutions as this problem is so
easily hit, that
Hi,
If the keys bother you, you can .apply(WithKeys.of("")) before the
GroupIntoBatches transform. This effectively removes parallelism as all
items are funneled through one executor.
Note that I think that GroupIntoBatches into batches might be broken on
Flink [1].
Alternatively, create your
Attaching these two links which kinda point in the same direction as my
previous e-mail:
https://ambitious.systems/avro-writers-vs-readers-schema
https://ambitious.systems/avro-schema-resolution
On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu
wrote:
> Hi everyone,
>
> W
Hi everyone,
When using KafkaIO to deserialize to avro SpecificRecords in combination
with ConfluentSchemaRegistryDeserializerProvider, it fails when the schema
in the avro generated classes (theSpecificRecords) and the schema registry
schema (used to serialize the given message) mismatch.
My
Hi everyone,
Quick question about GroupIntoBatches.
When running on Flink, eventually it hits an unsupported
exception "Canceling a timer by ID is not yet supported." on this line [1].
The source inputs are AVRO files for testing (batch) but will use kafka
topics (streaming) when deployed.
This
Hey Deepak,
I have observed this too. See point "a" in "Other quirks I found:" in this
thread [1].
[1] https://lists.apache.org/thread/ksd4nfjmzmp97hs2zgn2mfpf8fsy0myw
On Tue, Jun 7, 2022 at 2:13 PM Chamikara Jayalath
wrote:
>
>
> On Tue, Jun 7, 2022 at 11:06 AM Deepak Nagaraj
> wrote:
>
>>
check). So even if it's not in memory, I would need to
figure out if it causes issues with Flink's state requirement.
On Wed, Mar 30, 2022 at 11:35 AM Reuven Lax wrote:
>
>
> On Wed, Mar 30, 2022 at 6:16 AM Cristian Constantinescu
> wrote:
>
>> Hi everyone,
>>
>>
Hi everyone,
About the Distinct [1] transformation. I couldn't find what precautions I
need to take when using it in terms of memory consumption and performance.
Furthermore, how does it behave if the pipeline crashes/restarted from
state, is its state restored on rerun (hence removes duplicates
Hi everyone,
It's my understanding that Beam names it's transforms in a
hierarchical fashion. So for example if I have:
// main
p.apply("foo_name", new Foo())
// Foo.java
public class Foo extends PTransform<...>{
@Override
public PCollection<...> expand(PCollection<...> input){
return
programmatically before
Pipeline.create(options) call is made.
Cheers,
Cristian
[1] https://beam.apache.org/documentation/runners/flink/
On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu
wrote:
> Hey Jan,
>
> I agree that silently ignoring the parameter is misleading and, in
gt;
> Would you file a JIRA? Or possibly create a PR to fix this?
>
> Best,
>
> Jan
> On 2/3/22 07:12, Cristian Constantinescu wrote:
>
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when the
> flinkMaster argument is not set, the sav
/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
[2]
https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
On Wed, Feb 2, 2022 at 3:01 PM Cristian
leases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 2 Feb 2022 at 1
Hi everyone,
I'm trying to figure out how pipeline state works with Beam running on
Flink Classic. Would appreciate some help with the below.
My understanding is that on recovery (whether from a checkpoint or
savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
with
Hi everyone,
Looks like Beam has a little bit of an issue when using AutoValues with
Memoized (cached) fields. It's not a big issue, and the workaround is
simply not using Memoised fields at the cost of a little performance. (See
comment in code snippet)
The code further below produces this
ncy classloader munging, and that might be breaking an assumption in this
> code. Passing in the correct classloader should hopefully fix this.
>
> Reuven
>
>
> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu
> wrote:
>
>> Hi everyone,
>>
>&g
Hi everyone,
Not sure if anyone is using Beam with the Flink Runner and AutoValue
builders. For me, it doesn't work. I have some questions and a workaround
for anyone in the same boat.
Beam 2.31, Flink 1.13, AutoValue 1.8.2
Here's the code:
package org.whatever.testing;
import
to the beam
>> repo will take 1-2 months to make it to a non-snapshot build even if you do
>> find a long term solution acceptable to all interested parties.
>>
>> -Daniel
>>
>> On Mon, Oct 18, 2021 at 1:46 PM Cristian Constantinescu
>> wrote:
>>
>>>
wrote:
> Do you know if it's easy to detect which version of Avro is being used?
>
> On Sun, Oct 17, 2021 at 10:20 PM Cristian Constantinescu
> wrote:
>
>> If I had to change things, I would:
>>
>> 1. When deriving the SCHEMA add a few new types (JAVA_TIME, JAVA
ion code at runtime with
> ByteBuddy, we could potentially just generate different conversions
> depending on the Avro version.
>
> On Fri, Oct 15, 2021 at 11:56 PM Cristian Constantinescu
> wrote:
>
>> Those are fair points. However please consider that there might be n
guess what I'm saying is that there's definitely a non-negligible cost
associated with old 3rd party libs in Beam's code (even if efforts are put
in to minimize them).
On Sat, Oct 16, 2021 at 2:33 AM Reuven Lax wrote:
>
>
> On Fri, Oct 15, 2021 at 11:13 PM Cristian Constantinescu
e
> with avro 1.8? If so, this might be tricky to fix, since Beam maintains
> backwards compatibility on its public API.
>
> On Fri, Oct 15, 2021 at 5:38 PM Cristian Constantinescu
> wrote:
>
>> Hi all,
>>
>> I've created a small demo project to show the iss
a mapping. I *think* the intention
> is that we generate logic for converting Date to/from Instant when making a
> getters for a RowWithGetters backed by Avro.
>
> Brian
>
> On Thu, Oct 14, 2021 at 4:43 AM Cristian Constantinescu
> wrote:
>
>> A little bit more co
take into account LogicalTypes.
I think that's where the problem is. If anyone who knows that code could
have a look and let me know their thoughts, I can try to fix the issue if
we agree that there is one.
On Thu, Oct 14, 2021 at 7:12 AM Cristian Constantinescu
wrote:
> Hi all,
>
> I have the foll
Hi all,
I have the following field in one of my avro schemas:
{
"name": "somedate",
"type: {"type": "int", "logicalType": "date"}
}
This generates a java.time.LocalDate field in the corresponding java class
(call it Foo).
AvroUtils.toBeamSchema(FooClass.getSchema()) will return that field as
Hey Marco,
Other more senior people can correct me here. About limiting the
concurrency aspect of things. Beam/Runners split PCollections of by Key. So as long as all your items have the same key, I think it
will create only one executor for that ParDo. So that's what I did recently:
1. create
I wonder if you have recursive (or
> mutually recursive) fields.
>
> Reuven
>
> On Fri, Sep 10, 2021 at 12:53 AM Cristian Constantinescu
> wrote:
>
>> Hi everyone,
>>
>> Every article and talk about Beam recommends using Schemas and Row.
>> Howe
Hi everyone,
Every article and talk about Beam recommends using Schemas and Row.
However, using Row throughout my pipelines makes things very difficult to
refactor code when schemas change compared to POJOs/Beans that provide
static code analysis in the IDE.
Does anyone have any tips or tricks
e Pipeline. This should turn the source
>> into bounded source effectively.
>>
>> Jan
>>
>> [1]
>>
>> https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.Timestamp
Hi All,
I would like to know if there's a suggested pattern for the below scenario.
TL;DR: reading state from Kafka.
I have a scenario where I'm listening to a kafka topic and generate a
unique id based on the properties of the incoming item. Then, I output the
result to another kafka topic. The
32 matches
Mail list logo