Hi Ivan,

I'm fearing that only a few mailing list users have actually deeper Beam
experience. I only used it briefly 3 years ago. Most users here are using
Flink directly to avoid these kinds of double-abstraction issues.

It might be better to switch to the Beam mailing list if you have
Beam-specific questions including how the Flink runner actually translates
the Beam program to Flink.

On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <isanj...@theworkshop.com>
wrote:

> Actually I'm also thinking about how Beam coders are related with
> runner's serialization... I mean, on Beam you specify a coder per each
> Java type in order to Beam be able to serialize/deserialize that type,
> but then, is the runner used under the hood serializing/deserializing
> again the result, so that is doing a double serialization, does it make
> sense? Or how does it work?
>
> On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote:
> > Yep, sorry if I'm bothering you but I think I'm still not getting
> > this,
> > how could I tell Beam to tell Flink to use that serializer instead of
> > Java standard one, because I think Beam is abstracting us from Flink
> > checkpointing mechanism, so I'm afraid that if we use Flink API
> > directly we might break other things that Beam is hidding for us...
> >
> > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > > Hi Ivan,
> > >
> > > The easiest way is to use some implementation that's already there
> > > [1]. I already mentioned Avro and would strongly recommend giving
> > > it
> > > a go. If you make sure to provide a default value for as many
> > > fields
> > > as possible, you can always remove them later giving you great
> > > flexibility. I can give you more hints if you decide to go this
> > > route.
> > >
> > > If you want to have a custom implementation, I'd start at looking
> > > of
> > > one of the simpler implementations like MapSerializerSnapshot [2].
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > > (see known implementing classes).
> > > [2]
> > >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> > >
> > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > > isanj...@theworkshop.com> wrote:
> > > > Thanks for your complete answer Arvid, we will try to approach
> > > > all
> > > > things you mentioned, but take into account we are using Beam on
> > > > top of
> > > > Flink, so, to be honest, I don't know how could we implement the
> > > > custom
> > > > serialization thing (
> > > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > ) there. Could you please give us some hints? Thanks
> > > >
> > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > > Hi Ivan,
> > > > >
> > > > > First let's address the issue with idle partitions. The
> > > > > solution
> > > > is
> > > > > to use a watermark assigner that also emits a watermark with
> > > > > some
> > > > > idle timeout [1].
> > > > >
> > > > > Now the second question, on why Kafka commits are committed for
> > > > in-
> > > > > flight, checkpointed data. The basic idea is that you are not
> > > > losing
> > > > > data while avoiding replicated output.
> > > > > So if you commit offsets only after data has been fully
> > > > processed,
> > > > > upon crash the same data point would be reprocessed jointly
> > > > > with
> > > > the
> > > > > restored in-flight data, so you get duplicate messages in your
> > > > > system.
> > > > > To avoid duplicates data needs to be more or less completely
> > > > flushed
> > > > > out the system before a checkpoint is performed. That would
> > > > produce a
> > > > > huge downtime.
> > > > > Instead, we assume that we can always resume from the
> > > > checkpoints.
> > > > > Which leads to the last question on what to do when your
> > > > > pipeline
> > > > has
> > > > > breaking changes.
> > > > > First strategy is to avoid breaking changes as much as
> > > > > possible.
> > > > > State could for example also be stored as Avro to allow schema
> > > > > evolution. Minor things like renamed operators will not happen
> > > > with a
> > > > > bit more expertise.
> > > > > Second strategy is to use state migration [2]. Alternatively,
> > > > > you
> > > > can
> > > > > manually convert state with state processor API [3].
> > > > > Last option is to do a full reprocessing of data. This can be
> > > > done on
> > > > > a non-production cluster and then a savepoint can be used to
> > > > > bootstrap the production cluster quickly. This option needs to
> > > > > be
> > > > > available anyways for the case that you find any logic error.
> > > > > But
> > > > of
> > > > > course, this option has the highest implications (may need to
> > > > purge
> > > > > sink beforehand).
> > > > >
> > > > > [1]
> > > > >
> > > >
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > > > [2]
> > > > >
> > > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > [3]
> > > > >
> > > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > > > isanj...@theworkshop.com> wrote:
> > > > > > Hi, we are starting to use Beam with Flink as runner on our
> > > > > > applications, and recently we would like to get advantages
> > > > > > that
> > > > > > Flink
> > > > > > checkpoiting provides, but it seems we are not understanding
> > > > > > it
> > > > > > clearly.
> > > > > >
> > > > > > Simplifying, our application does the following:
> > > > > >   - Read meesages from a couple of Kafka topics
> > > > > >   - Combine them
> > > > > >   - Write combination result to a sink (Exasol DB)
> > > > > >
> > > > > > As application is processing messages using event time, and
> > > > > > one
> > > > of
> > > > > > the
> > > > > > topics is almost idle, the first time application is started
> > > > > > messages
> > > > > > are stuck in the combiner because watermark don't advance
> > > > > > until
> > > > we
> > > > > > have
> > > > > > messages arriving onto idled topic (we know this and is not a
> > > > > > problem
> > > > > > for us though).
> > > > > >
> > > > > > The problem is that we've observed, if a checkpoint is
> > > > triggered
> > > > > > when
> > > > > > messages are still stuck in the combiner, surprisingly for
> > > > > > us,
> > > > the
> > > > > > checkpoint finishes successfully (and offsets committed to
> > > > Kafka)
> > > > > > even
> > > > > > messages haven't progressed to the sink yet. Is this
> > > > > > expected?
> > > > > >
> > > > > > The thing is that, if in the future, we make not state
> > > > compatible
> > > > > > changes in application source code, checkpoint taken couldn't
> > > > be
> > > > > > restored. So we would like to start the application without
> > > > using
> > > > > > any
> > > > > > checkpoint but without losing data.
> > > > > > Problem here would be that data loss would happen because
> > > > messages
> > > > > > stuck in combiner are already committed to Kafka and
> > > > application
> > > > > > would
> > > > > > start to read from latest commited offset in Kafka if we
> > > > > > don't
> > > > use
> > > > > > any
> > > > > > checkpoint, thus those messages are not going to be read from
> > > > the
> > > > > > source again.
> > > > > >
> > > > > > So, I guess our question is how are you doing in order to not
> > > > lose
> > > > > > data
> > > > > > when developing applications, because sooner or later you are
> > > > going
> > > > > > to
> > > > > > add breaking changes...
> > > > > >
> > > > > > For example, we've seen those two errors so far:
> > > > > >   - After changing an operator name:
> > > > > >
> > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > > > cluster
> > > > > > entrypoint.
> > > > > > org.apache.flink.runtime.dispatcher.DispatcherException:
> > > > > > Failed
> > > > to
> > > > > > take
> > > > > > leadership with session id 00000000-0000-0000-0000-
> > > > 000000000000.
> > > > > > ...
> > > > > > Caused by:
> > > > org.apache.flink.runtime.client.JobExecutionException:
> > > > > > Could
> > > > > > not set up JobManager
> > > > > >     at
> > > > > >
> > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan
> > > > ag
> > > > > > erRu
> > > > > > nner.java:152)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor
> > > > y.
> > > > > > crea
> > > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa
> > > > na
> > > > > > gerR
> > > > > > unner$5(Dispatcher.java:375)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0
> > > > (C
> > > > > > heck
> > > > > > edSupplier.java:34)
> > > > > >     ... 7 more
> > > > > > Caused by: java.lang.IllegalStateException: Failed to
> > > > > > rollback
> > > > to
> > > > > > checkpoint/savepoint
> > > > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > > > bcc1f65a0986.
> > > > > > Cannot map checkpoint/savepoint state for operator
> > > > > > f476451c6210bd2783f36fa331b9da5e to the new program, because
> > > > the
> > > > > > operator is not available in the new program. If you want to
> > > > allow
> > > > > > to
> > > > > > skip this, you can set the --allowNonRestoredState option on
> > > > the
> > > > > > CLI.
> > > > > >     at
> > > > > >
> > > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh
> > > > ec
> > > > > > kpoi
> > > > > > nt(Checkpoints.java:205)
> > > > > > ...
> > > > > >
> > > > > >   - After modifying a Java model class involved in a combine:
> > > > > > org.apache.flink.runtime.state.BackendBuildingException:
> > > > > > Failed
> > > > > > when
> > > > > > trying to restore heap backend
> > > > > >     at
> > > > > >
> > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.
> > > > bu
> > > > > > ild(
> > > > > > HeapKeyedStateBackendBuilder.java:116)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe
> > > > ye
> > > > > > dSta
> > > > > > teBackend(FsStateBackend.java:529)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > er
> > > > > > Impl
> > > > > >
> > > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:
> > > > 29
> > > > > > 1)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > .a
> > > > > > ttem
> > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > .c
> > > > > > reat
> > > > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > er
> > > > > > Impl
> > > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > er
> > > > > > Impl
> > > > > >
> > > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1
> > > > 35
> > > > > > )
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i
> > > > ni
> > > > > > tial
> > > > > > izeState(AbstractStreamOperator.java:253)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta
> > > > te
> > > > > > (Str
> > > > > > eamTask.java:881)
> > > > > >     at
> > > > > >
> > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream
> > > > Ta
> > > > > > sk.j
> > > > > > ava:395)
> > > > > >     at
> > > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705
> > > > > > )
> > > > > >     at
> > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > >     at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by: java.io.InvalidClassException:
> > > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata
> > > > > > ;
> > > > > > local
> > > > > > class incompatible: stream classdesc serialVersionUID =
> > > > > > 8366890161513008789, local class serialVersionUID =
> > > > > > 174312384610985998
> > > > > >     at
> > > > > >
> > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699
> > > > )
> > > > > >
> > > > > > Apologies in advance as we are new to Flink, so may be we are
> > > > > > missing
> > > > > > something obvious here.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > >
> > > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > > confidencial. A no ser que usted sea el destinatario, no
> > > > > > puede
> > > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > > > información contenida en el mensaje. Si no es el
> > > > > > destinatario,
> > > > debe
> > > > > > borrar este correo y notificar al remitente inmediatamente.
> > > > > > Cualquier punto de vista u opinión expresada en este correo
> > > > > > electrónico son únicamente del remitente, a no ser que se
> > > > indique
> > > > > > lo contrario. Todos los derechos de autor en cualquier
> > > > > > material
> > > > de
> > > > > > este correo son reservados. Todos los correos electrónicos,
> > > > > > salientes o entrantes, pueden ser grabados y monitorizados
> > > > > > para
> > > > uso
> > > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > > responsabilidad ante cualquier perdida o daño que surja o
> > > > resulte
> > > > > > de la recepción, uso o transmisión de este correo electrónico
> > > > hasta
> > > > > > el máximo permitido por la ley.
> > > > > >
> > > > > > This email and any attachment to it are confidential. Unless
> > > > you
> > > > > > are the intended recipient, you may not use, copy or disclose
> > > > > > either the message or any information contained in the
> > > > > > message.
> > > > If
> > > > > > you are not the intended recipient, you should delete this
> > > > email
> > > > > > and notify the sender immediately. Any views or opinions
> > > > expressed
> > > > > > in this email are those of the sender only, unless otherwise
> > > > > > stated. All copyright in any of the material in this email is
> > > > > > reserved. All emails, incoming and outgoing, may be recorded
> > > > and
> > > > > > monitored for legitimate business purposes. We exclude all
> > > > > > liability for any loss or damage arising or resulting from
> > > > > > the
> > > > > > receipt, use or transmission of this email to the fullest
> > > > extent
> > > > > > permitted by law.
> > > >
> > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > confidencial. A no ser que usted sea el destinatario, no puede
> > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > información contenida en el mensaje. Si no es el destinatario,
> > > > debe
> > > > borrar este correo y notificar al remitente inmediatamente.
> > > > Cualquier punto de vista u opinión expresada en este correo
> > > > electrónico son únicamente del remitente, a no ser que se indique
> > > > lo contrario. Todos los derechos de autor en cualquier material
> > > > de
> > > > este correo son reservados. Todos los correos electrónicos,
> > > > salientes o entrantes, pueden ser grabados y monitorizados para
> > > > uso
> > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > responsabilidad ante cualquier perdida o daño que surja o resulte
> > > > de la recepción, uso o transmisión de este correo electrónico
> > > > hasta
> > > > el máximo permitido por la ley.
> > > >
> > > > This email and any attachment to it are confidential. Unless you
> > > > are the intended recipient, you may not use, copy or disclose
> > > > either the message or any information contained in the message.
> > > > If
> > > > you are not the intended recipient, you should delete this email
> > > > and notify the sender immediately. Any views or opinions
> > > > expressed
> > > > in this email are those of the sender only, unless otherwise
> > > > stated. All copyright in any of the material in this email is
> > > > reserved. All emails, incoming and outgoing, may be recorded and
> > > > monitored for legitimate business purposes. We exclude all
> > > > liability for any loss or damage arising or resulting from the
> > > > receipt, use or transmission of this email to the fullest extent
> > > > permitted by law.
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente. Cualquier
> > punto de vista u opinión expresada en este correo electrónico son
> > únicamente del remitente, a no ser que se indique lo contrario. Todos
> > los derechos de autor en cualquier material de este correo son
> > reservados. Todos los correos electrónicos, salientes o entrantes,
> > pueden ser grabados y monitorizados para uso legítimo del negocio.
> > Nos encontramos exentos de toda responsabilidad ante cualquier
> > perdida o daño que surja o resulte de la recepción, uso o transmisión
> > de este correo electrónico hasta el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you are
> > the intended recipient, you may not use, copy or disclose either the
> > message or any information contained in the message. If you are not
> > the intended recipient, you should delete this email and notify the
> > sender immediately. Any views or opinions expressed in this email are
> > those of the sender only, unless otherwise stated. All copyright in
> > any of the material in this email is reserved. All emails, incoming
> > and outgoing, may be recorded and monitored for legitimate business
> > purposes. We exclude all liability for any loss or damage arising or
> > resulting from the receipt, use or transmission of this email to the
> > fullest extent permitted by law.
>
>
> Este correo electrónico y sus adjuntos son de naturaleza confidencial. A
> no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar
> tanto el mensaje como cualquier información contenida en el mensaje. Si no
> es el destinatario, debe borrar este correo y notificar al remitente
> inmediatamente. Cualquier punto de vista u opinión expresada en este correo
> electrónico son únicamente del remitente, a no ser que se indique lo
> contrario. Todos los derechos de autor en cualquier material de este correo
> son reservados. Todos los correos electrónicos, salientes o entrantes,
> pueden ser grabados y monitorizados para uso legítimo del negocio. Nos
> encontramos exentos de toda responsabilidad ante cualquier perdida o daño
> que surja o resulte de la recepción, uso o transmisión de este correo
> electrónico hasta el máximo permitido por la ley.
>
> This email and any attachment to it are confidential. Unless you are the
> intended recipient, you may not use, copy or disclose either the message or
> any information contained in the message. If you are not the intended
> recipient, you should delete this email and notify the sender immediately.
> Any views or opinions expressed in this email are those of the sender only,
> unless otherwise stated. All copyright in any of the material in this email
> is reserved. All emails, incoming and outgoing, may be recorded and
> monitored for legitimate business purposes. We exclude all liability for
> any loss or damage arising or resulting from the receipt, use or
> transmission of this email to the fullest extent permitted by law.
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to