Perfect, thank you so much Arvid, I'd expect more people using Beam on
top of Flink, but it seems is not so popular.

On Tue, 2020-05-19 at 12:46 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> I'm fearing that only a few mailing list users have actually deeper
> Beam experience. I only used it briefly 3 years ago. Most users here
> are using Flink directly to avoid these kinds of double-abstraction
> issues.
>
> It might be better to switch to the Beam mailing list if you have
> Beam-specific questions including how the Flink runner actually
> translates the Beam program to Flink.
>
> On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <
> isanj...@theworkshop.com> wrote:
> > Actually I'm also thinking about how Beam coders are related with
> > runner's serialization... I mean, on Beam you specify a coder per
> > each
> > Java type in order to Beam be able to serialize/deserialize that
> > type,
> > but then, is the runner used under the hood
> > serializing/deserializing
> > again the result, so that is doing a double serialization, does it
> > make
> > sense? Or how does it work?
> >
> > On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote:
> > > Yep, sorry if I'm bothering you but I think I'm still not getting
> > > this,
> > > how could I tell Beam to tell Flink to use that serializer
> > instead of
> > > Java standard one, because I think Beam is abstracting us from
> > Flink
> > > checkpointing mechanism, so I'm afraid that if we use Flink API
> > > directly we might break other things that Beam is hidding for
> > us...
> > >
> > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > The easiest way is to use some implementation that's already
> > there
> > > > [1]. I already mentioned Avro and would strongly recommend
> > giving
> > > > it
> > > > a go. If you make sure to provide a default value for as many
> > > > fields
> > > > as possible, you can always remove them later giving you great
> > > > flexibility. I can give you more hints if you decide to go this
> > > > route.
> > > >
> > > > If you want to have a custom implementation, I'd start at
> > looking
> > > > of
> > > > one of the simpler implementations like MapSerializerSnapshot
> > [2].
> > > >
> > > > [1]
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > > > (see known implementing classes).
> > > > [2]
> > > >
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> > > >
> > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > > > isanj...@theworkshop.com> wrote:
> > > > > Thanks for your complete answer Arvid, we will try to
> > approach
> > > > > all
> > > > > things you mentioned, but take into account we are using Beam
> > on
> > > > > top of
> > > > > Flink, so, to be honest, I don't know how could we implement
> > the
> > > > > custom
> > > > > serialization thing (
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > ) there. Could you please give us some hints? Thanks
> > > > >
> > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > > > Hi Ivan,
> > > > > >
> > > > > > First let's address the issue with idle partitions. The
> > > > > > solution
> > > > > is
> > > > > > to use a watermark assigner that also emits a watermark
> > with
> > > > > > some
> > > > > > idle timeout [1].
> > > > > >
> > > > > > Now the second question, on why Kafka commits are committed
> > for
> > > > > in-
> > > > > > flight, checkpointed data. The basic idea is that you are
> > not
> > > > > losing
> > > > > > data while avoiding replicated output.
> > > > > > So if you commit offsets only after data has been fully
> > > > > processed,
> > > > > > upon crash the same data point would be reprocessed jointly
> > > > > > with
> > > > > the
> > > > > > restored in-flight data, so you get duplicate messages in
> > your
> > > > > > system.
> > > > > > To avoid duplicates data needs to be more or less
> > completely
> > > > > flushed
> > > > > > out the system before a checkpoint is performed. That would
> > > > > produce a
> > > > > > huge downtime.
> > > > > > Instead, we assume that we can always resume from the
> > > > > checkpoints.
> > > > > > Which leads to the last question on what to do when your
> > > > > > pipeline
> > > > > has
> > > > > > breaking changes.
> > > > > > First strategy is to avoid breaking changes as much as
> > > > > > possible.
> > > > > > State could for example also be stored as Avro to allow
> > schema
> > > > > > evolution. Minor things like renamed operators will not
> > happen
> > > > > with a
> > > > > > bit more expertise.
> > > > > > Second strategy is to use state migration [2].
> > Alternatively,
> > > > > > you
> > > > > can
> > > > > > manually convert state with state processor API [3].
> > > > > > Last option is to do a full reprocessing of data. This can
> > be
> > > > > done on
> > > > > > a non-production cluster and then a savepoint can be used
> > to
> > > > > > bootstrap the production cluster quickly. This option needs
> > to
> > > > > > be
> > > > > > available anyways for the case that you find any logic
> > error.
> > > > > > But
> > > > > of
> > > > > > course, this option has the highest implications (may need
> > to
> > > > > purge
> > > > > > sink beforehand).
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > > > > [2]
> > > > > >
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > > [3]
> > > > > >
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > > > > isanj...@theworkshop.com> wrote:
> > > > > > > Hi, we are starting to use Beam with Flink as runner on
> > our
> > > > > > > applications, and recently we would like to get
> > advantages
> > > > > > > that
> > > > > > > Flink
> > > > > > > checkpoiting provides, but it seems we are not
> > understanding
> > > > > > > it
> > > > > > > clearly.
> > > > > > >
> > > > > > > Simplifying, our application does the following:
> > > > > > >   - Read meesages from a couple of Kafka topics
> > > > > > >   - Combine them
> > > > > > >   - Write combination result to a sink (Exasol DB)
> > > > > > >
> > > > > > > As application is processing messages using event time,
> > and
> > > > > > > one
> > > > > of
> > > > > > > the
> > > > > > > topics is almost idle, the first time application is
> > started
> > > > > > > messages
> > > > > > > are stuck in the combiner because watermark don't advance
> > > > > > > until
> > > > > we
> > > > > > > have
> > > > > > > messages arriving onto idled topic (we know this and is
> > not a
> > > > > > > problem
> > > > > > > for us though).
> > > > > > >
> > > > > > > The problem is that we've observed, if a checkpoint is
> > > > > triggered
> > > > > > > when
> > > > > > > messages are still stuck in the combiner, surprisingly
> > for
> > > > > > > us,
> > > > > the
> > > > > > > checkpoint finishes successfully (and offsets committed
> > to
> > > > > Kafka)
> > > > > > > even
> > > > > > > messages haven't progressed to the sink yet. Is this
> > > > > > > expected?
> > > > > > >
> > > > > > > The thing is that, if in the future, we make not state
> > > > > compatible
> > > > > > > changes in application source code, checkpoint taken
> > couldn't
> > > > > be
> > > > > > > restored. So we would like to start the application
> > without
> > > > > using
> > > > > > > any
> > > > > > > checkpoint but without losing data.
> > > > > > > Problem here would be that data loss would happen because
> > > > > messages
> > > > > > > stuck in combiner are already committed to Kafka and
> > > > > application
> > > > > > > would
> > > > > > > start to read from latest commited offset in Kafka if we
> > > > > > > don't
> > > > > use
> > > > > > > any
> > > > > > > checkpoint, thus those messages are not going to be read
> > from
> > > > > the
> > > > > > > source again.
> > > > > > >
> > > > > > > So, I guess our question is how are you doing in order to
> > not
> > > > > lose
> > > > > > > data
> > > > > > > when developing applications, because sooner or later you
> > are
> > > > > going
> > > > > > > to
> > > > > > > add breaking changes...
> > > > > > >
> > > > > > > For example, we've seen those two errors so far:
> > > > > > >   - After changing an operator name:
> > > > > > >
> > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > > > > cluster
> > > > > > > entrypoint.
> > > > > > > org.apache.flink.runtime.dispatcher.DispatcherException:
> > > > > > > Failed
> > > > > to
> > > > > > > take
> > > > > > > leadership with session id 00000000-0000-0000-0000-
> > > > > 000000000000.
> > > > > > > ...
> > > > > > > Caused by:
> > > > > org.apache.flink.runtime.client.JobExecutionException:
> > > > > > > Could
> > > > > > > not set up JobManager
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan
> > > > > ag
> > > > > > > erRu
> > > > > > > nner.java:152)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor
> > > > > y.
> > > > > > > crea
> > > > > > >
> > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa
> > > > > na
> > > > > > > gerR
> > > > > > > unner$5(Dispatcher.java:375)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0
> > > > > (C
> > > > > > > heck
> > > > > > > edSupplier.java:34)
> > > > > > >     ... 7 more
> > > > > > > Caused by: java.lang.IllegalStateException: Failed to
> > > > > > > rollback
> > > > > to
> > > > > > > checkpoint/savepoint
> > > > > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > > > > bcc1f65a0986.
> > > > > > > Cannot map checkpoint/savepoint state for operator
> > > > > > > f476451c6210bd2783f36fa331b9da5e to the new program,
> > because
> > > > > the
> > > > > > > operator is not available in the new program. If you want
> > to
> > > > > allow
> > > > > > > to
> > > > > > > skip this, you can set the --allowNonRestoredState option
> > on
> > > > > the
> > > > > > > CLI.
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh
> > > > > ec
> > > > > > > kpoi
> > > > > > > nt(Checkpoints.java:205)
> > > > > > > ...
> > > > > > >
> > > > > > >   - After modifying a Java model class involved in a
> > combine:
> > > > > > > org.apache.flink.runtime.state.BackendBuildingException:
> > > > > > > Failed
> > > > > > > when
> > > > > > > trying to restore heap backend
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.
> > > > > bu
> > > > > > > ild(
> > > > > > > HeapKeyedStateBackendBuilder.java:116)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe
> > > > > ye
> > > > > > > dSta
> > > > > > > teBackend(FsStateBackend.java:529)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > > > >
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:
> > > > > 29
> > > > > > > 1)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > > .a
> > > > > > > ttem
> > > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > > .c
> > > > > > > reat
> > > > > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > > > >
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1
> > > > > 35
> > > > > > > )
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i
> > > > > ni
> > > > > > > tial
> > > > > > > izeState(AbstractStreamOperator.java:253)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta
> > > > > te
> > > > > > > (Str
> > > > > > > eamTask.java:881)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream
> > > > > Ta
> > > > > > > sk.j
> > > > > > > ava:395)
> > > > > > >     at
> > > > > > >
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705
> > > > > > > )
> > > > > > >     at
> > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > > >     at java.lang.Thread.run(Thread.java:748)
> > > > > > > Caused by: java.io.InvalidClassException:
> > > > > > >
> > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata
> > > > > > > ;
> > > > > > > local
> > > > > > > class incompatible: stream classdesc serialVersionUID =
> > > > > > > 8366890161513008789, local class serialVersionUID =
> > > > > > > 174312384610985998
> > > > > > >     at
> > > > > > >
> > > > >
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699
> > > > > )
> > > > > > >
> > > > > > > Apologies in advance as we are new to Flink, so may be we
> > are
> > > > > > > missing
> > > > > > > something obvious here.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > >
> > > > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > > > confidencial. A no ser que usted sea el destinatario, no
> > > > > > > puede
> > > > > > > utilizar, copiar o desvelar tanto el mensaje como
> > cualquier
> > > > > > > información contenida en el mensaje. Si no es el
> > > > > > > destinatario,
> > > > > debe
> > > > > > > borrar este correo y notificar al remitente
> > inmediatamente.
> > > > > > > Cualquier punto de vista u opinión expresada en este
> > correo
> > > > > > > electrónico son únicamente del remitente, a no ser que se
> > > > > indique
> > > > > > > lo contrario. Todos los derechos de autor en cualquier
> > > > > > > material
> > > > > de
> > > > > > > este correo son reservados. Todos los correos
> > electrónicos,
> > > > > > > salientes o entrantes, pueden ser grabados y
> > monitorizados
> > > > > > > para
> > > > > uso
> > > > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > > > responsabilidad ante cualquier perdida o daño que surja o
> > > > > resulte
> > > > > > > de la recepción, uso o transmisión de este correo
> > electrónico
> > > > > hasta
> > > > > > > el máximo permitido por la ley.
> > > > > > >
> > > > > > > This email and any attachment to it are confidential.
> > Unless
> > > > > you
> > > > > > > are the intended recipient, you may not use, copy or
> > disclose
> > > > > > > either the message or any information contained in the
> > > > > > > message.
> > > > > If
> > > > > > > you are not the intended recipient, you should delete
> > this
> > > > > email
> > > > > > > and notify the sender immediately. Any views or opinions
> > > > > expressed
> > > > > > > in this email are those of the sender only, unless
> > otherwise
> > > > > > > stated. All copyright in any of the material in this
> > email is
> > > > > > > reserved. All emails, incoming and outgoing, may be
> > recorded
> > > > > and
> > > > > > > monitored for legitimate business purposes. We exclude
> > all
> > > > > > > liability for any loss or damage arising or resulting
> > from
> > > > > > > the
> > > > > > > receipt, use or transmission of this email to the fullest
> > > > > extent
> > > > > > > permitted by law.
> > > > >
> > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > confidencial. A no ser que usted sea el destinatario, no
> > puede
> > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > > información contenida en el mensaje. Si no es el
> > destinatario,
> > > > > debe
> > > > > borrar este correo y notificar al remitente inmediatamente.
> > > > > Cualquier punto de vista u opinión expresada en este correo
> > > > > electrónico son únicamente del remitente, a no ser que se
> > indique
> > > > > lo contrario. Todos los derechos de autor en cualquier
> > material
> > > > > de
> > > > > este correo son reservados. Todos los correos electrónicos,
> > > > > salientes o entrantes, pueden ser grabados y monitorizados
> > para
> > > > > uso
> > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > responsabilidad ante cualquier perdida o daño que surja o
> > resulte
> > > > > de la recepción, uso o transmisión de este correo electrónico
> > > > > hasta
> > > > > el máximo permitido por la ley.
> > > > >
> > > > > This email and any attachment to it are confidential. Unless
> > you
> > > > > are the intended recipient, you may not use, copy or disclose
> > > > > either the message or any information contained in the
> > message.
> > > > > If
> > > > > you are not the intended recipient, you should delete this
> > email
> > > > > and notify the sender immediately. Any views or opinions
> > > > > expressed
> > > > > in this email are those of the sender only, unless otherwise
> > > > > stated. All copyright in any of the material in this email is
> > > > > reserved. All emails, incoming and outgoing, may be recorded
> > and
> > > > > monitored for legitimate business purposes. We exclude all
> > > > > liability for any loss or damage arising or resulting from
> > the
> > > > > receipt, use or transmission of this email to the fullest
> > extent
> > > > > permitted by law.
> > >
> > > Este correo electrónico y sus adjuntos son de naturaleza
> > > confidencial. A no ser que usted sea el destinatario, no puede
> > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > información contenida en el mensaje. Si no es el destinatario,
> > debe
> > > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier
> > > punto de vista u opinión expresada en este correo electrónico son
> > > únicamente del remitente, a no ser que se indique lo contrario.
> > Todos
> > > los derechos de autor en cualquier material de este correo son
> > > reservados. Todos los correos electrónicos, salientes o
> > entrantes,
> > > pueden ser grabados y monitorizados para uso legítimo del
> > negocio.
> > > Nos encontramos exentos de toda responsabilidad ante cualquier
> > > perdida o daño que surja o resulte de la recepción, uso o
> > transmisión
> > > de este correo electrónico hasta el máximo permitido por la ley.
> > >
> > > This email and any attachment to it are confidential. Unless you
> > are
> > > the intended recipient, you may not use, copy or disclose either
> > the
> > > message or any information contained in the message. If you are
> > not
> > > the intended recipient, you should delete this email and notify
> > the
> > > sender immediately. Any views or opinions expressed in this email
> > are
> > > those of the sender only, unless otherwise stated. All copyright
> > in
> > > any of the material in this email is reserved. All emails,
> > incoming
> > > and outgoing, may be recorded and monitored for legitimate
> > business
> > > purposes. We exclude all liability for any loss or damage arising
> > or
> > > resulting from the receipt, use or transmission of this email to
> > the
> > > fullest extent permitted by law.
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser 
que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el 
mensaje como cualquier información contenida en el mensaje. Si no es el 
destinatario, debe borrar este correo y notificar al remitente inmediatamente. 
Cualquier punto de vista u opinión expresada en este correo electrónico son 
únicamente del remitente, a no ser que se indique lo contrario. Todos los 
derechos de autor en cualquier material de este correo son reservados. Todos 
los correos electrónicos, salientes o entrantes, pueden ser grabados y 
monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda 
responsabilidad ante cualquier perdida o daño que surja o resulte de la 
recepción, uso o transmisión de este correo electrónico hasta el máximo 
permitido por la ley.

This email and any attachment to it are confidential. Unless you are the 
intended recipient, you may not use, copy or disclose either the message or any 
information contained in the message. If you are not the intended recipient, 
you should delete this email and notify the sender immediately. Any views or 
opinions expressed in this email are those of the sender only, unless otherwise 
stated. All copyright in any of the material in this email is reserved. All 
emails, incoming and outgoing, may be recorded and monitored for legitimate 
business purposes. We exclude all liability for any loss or damage arising or 
resulting from the receipt, use or transmission of this email to the fullest 
extent permitted by law.

Reply via email to