Yep, sorry if I'm bothering you but I think I'm still not getting this,
how could I tell Beam to tell Flink to use that serializer instead of
Java standard one, because I think Beam is abstracting us from Flink
checkpointing mechanism, so I'm afraid that if we use Flink API
directly we might break other things that Beam is hidding for us...

On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> The easiest way is to use some implementation that's already there
> [1]. I already mentioned Avro and would strongly recommend giving it
> a go. If you make sure to provide a default value for as many fields
> as possible, you can always remove them later giving you great
> flexibility. I can give you more hints if you decide to go this
> route.
>
> If you want to have a custom implementation, I'd start at looking of
> one of the simpler implementations like MapSerializerSnapshot [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> (see known implementing classes).
> [2]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
>
> On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> isanj...@theworkshop.com> wrote:
> > Thanks for your complete answer Arvid, we will try to approach all
> > things you mentioned, but take into account we are using Beam on
> > top of
> > Flink, so, to be honest, I don't know how could we implement the
> > custom
> > serialization thing (
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > ) there. Could you please give us some hints? Thanks
> >
> > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > Hi Ivan,
> > >
> > > First let's address the issue with idle partitions. The solution
> > is
> > > to use a watermark assigner that also emits a watermark with some
> > > idle timeout [1].
> > >
> > > Now the second question, on why Kafka commits are committed for
> > in-
> > > flight, checkpointed data. The basic idea is that you are not
> > losing
> > > data while avoiding replicated output.
> > > So if you commit offsets only after data has been fully
> > processed,
> > > upon crash the same data point would be reprocessed jointly with
> > the
> > > restored in-flight data, so you get duplicate messages in your
> > > system.
> > > To avoid duplicates data needs to be more or less completely
> > flushed
> > > out the system before a checkpoint is performed. That would
> > produce a
> > > huge downtime.
> > > Instead, we assume that we can always resume from the
> > checkpoints.
> > >
> > > Which leads to the last question on what to do when your pipeline
> > has
> > > breaking changes.
> > > First strategy is to avoid breaking changes as much as possible.
> > > State could for example also be stored as Avro to allow schema
> > > evolution. Minor things like renamed operators will not happen
> > with a
> > > bit more expertise.
> > > Second strategy is to use state migration [2]. Alternatively, you
> > can
> > > manually convert state with state processor API [3].
> > > Last option is to do a full reprocessing of data. This can be
> > done on
> > > a non-production cluster and then a savepoint can be used to
> > > bootstrap the production cluster quickly. This option needs to be
> > > available anyways for the case that you find any logic error. But
> > of
> > > course, this option has the highest implications (may need to
> > purge
> > > sink beforehand).
> > >
> > > [1]
> > >
> > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > [2]
> > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > [3]
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > >
> > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > isanj...@theworkshop.com> wrote:
> > > > Hi, we are starting to use Beam with Flink as runner on our
> > > > applications, and recently we would like to get advantages that
> > > > Flink
> > > > checkpoiting provides, but it seems we are not understanding it
> > > > clearly.
> > > >
> > > > Simplifying, our application does the following:
> > > >   - Read meesages from a couple of Kafka topics
> > > >   - Combine them
> > > >   - Write combination result to a sink (Exasol DB)
> > > >
> > > > As application is processing messages using event time, and one
> > of
> > > > the
> > > > topics is almost idle, the first time application is started
> > > > messages
> > > > are stuck in the combiner because watermark don't advance until
> > we
> > > > have
> > > > messages arriving onto idled topic (we know this and is not a
> > > > problem
> > > > for us though).
> > > >
> > > > The problem is that we've observed, if a checkpoint is
> > triggered
> > > > when
> > > > messages are still stuck in the combiner, surprisingly for us,
> > the
> > > > checkpoint finishes successfully (and offsets committed to
> > Kafka)
> > > > even
> > > > messages haven't progressed to the sink yet. Is this expected?
> > > >
> > > > The thing is that, if in the future, we make not state
> > compatible
> > > > changes in application source code, checkpoint taken couldn't
> > be
> > > > restored. So we would like to start the application without
> > using
> > > > any
> > > > checkpoint but without losing data.
> > > > Problem here would be that data loss would happen because
> > messages
> > > > stuck in combiner are already committed to Kafka and
> > application
> > > > would
> > > > start to read from latest commited offset in Kafka if we don't
> > use
> > > > any
> > > > checkpoint, thus those messages are not going to be read from
> > the
> > > > source again.
> > > >
> > > > So, I guess our question is how are you doing in order to not
> > lose
> > > > data
> > > > when developing applications, because sooner or later you are
> > going
> > > > to
> > > > add breaking changes...
> > > >
> > > > For example, we've seen those two errors so far:
> > > >   - After changing an operator name:
> > > >
> > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > cluster
> > > > entrypoint.
> > > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed
> > to
> > > > take
> > > > leadership with session id 00000000-0000-0000-0000-
> > 000000000000.
> > > > ...
> > > > Caused by:
> > org.apache.flink.runtime.client.JobExecutionException:
> > > > Could
> > > > not set up JobManager
> > > >     at
> > > >
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag
> > > > erRu
> > > > nner.java:152)
> > > >     at
> > > >
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.
> > > > crea
> > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > >     at
> > > >
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana
> > > > gerR
> > > > unner$5(Dispatcher.java:375)
> > > >     at
> > > >
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C
> > > > heck
> > > > edSupplier.java:34)
> > > >     ... 7 more
> > > > Caused by: java.lang.IllegalStateException: Failed to rollback
> > to
> > > > checkpoint/savepoint
> > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > bcc1f65a0986.
> > > > Cannot map checkpoint/savepoint state for operator
> > > > f476451c6210bd2783f36fa331b9da5e to the new program, because
> > the
> > > > operator is not available in the new program. If you want to
> > allow
> > > > to
> > > > skip this, you can set the --allowNonRestoredState option on
> > the
> > > > CLI.
> > > >     at
> > > >
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec
> > > > kpoi
> > > > nt(Checkpoints.java:205)
> > > > ...
> > > >
> > > >   - After modifying a Java model class involved in a combine:
> > > > org.apache.flink.runtime.state.BackendBuildingException: Failed
> > > > when
> > > > trying to restore heap backend
> > > >     at
> > > >
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu
> > > > ild(
> > > > HeapKeyedStateBackendBuilder.java:116)
> > > >     at
> > > >
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye
> > > > dSta
> > > > teBackend(FsStateBackend.java:529)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > > Impl
> > > >
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29
> > > > 1)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a
> > > > ttem
> > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c
> > > > reat
> > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > > Impl
> > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > > Impl
> > > >
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135
> > > > )
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini
> > > > tial
> > > > izeState(AbstractStreamOperator.java:253)
> > > >     at
> > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState
> > > > (Str
> > > > eamTask.java:881)
> > > >     at
> > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa
> > > > sk.j
> > > > ava:395)
> > > >     at
> > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > >     at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > >     at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.io.InvalidClassException:
> > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata;
> > > > local
> > > > class incompatible: stream classdesc serialVersionUID =
> > > > 8366890161513008789, local class serialVersionUID =
> > > > 174312384610985998
> > > >     at
> > > >
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> > > >
> > > >
> > > > Apologies in advance as we are new to Flink, so may be we are
> > > > missing
> > > > something obvious here.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > confidencial. A no ser que usted sea el destinatario, no puede
> > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > información contenida en el mensaje. Si no es el destinatario,
> > debe
> > > > borrar este correo y notificar al remitente inmediatamente.
> > > > Cualquier punto de vista u opinión expresada en este correo
> > > > electrónico son únicamente del remitente, a no ser que se
> > indique
> > > > lo contrario. Todos los derechos de autor en cualquier material
> > de
> > > > este correo son reservados. Todos los correos electrónicos,
> > > > salientes o entrantes, pueden ser grabados y monitorizados para
> > uso
> > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > responsabilidad ante cualquier perdida o daño que surja o
> > resulte
> > > > de la recepción, uso o transmisión de este correo electrónico
> > hasta
> > > > el máximo permitido por la ley.
> > > >
> > > > This email and any attachment to it are confidential. Unless
> > you
> > > > are the intended recipient, you may not use, copy or disclose
> > > > either the message or any information contained in the message.
> > If
> > > > you are not the intended recipient, you should delete this
> > email
> > > > and notify the sender immediately. Any views or opinions
> > expressed
> > > > in this email are those of the sender only, unless otherwise
> > > > stated. All copyright in any of the material in this email is
> > > > reserved. All emails, incoming and outgoing, may be recorded
> > and
> > > > monitored for legitimate business purposes. We exclude all
> > > > liability for any loss or damage arising or resulting from the
> > > > receipt, use or transmission of this email to the fullest
> > extent
> > > > permitted by law.
> > >
> > >
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser 
que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el 
mensaje como cualquier información contenida en el mensaje. Si no es el 
destinatario, debe borrar este correo y notificar al remitente inmediatamente. 
Cualquier punto de vista u opinión expresada en este correo electrónico son 
únicamente del remitente, a no ser que se indique lo contrario. Todos los 
derechos de autor en cualquier material de este correo son reservados. Todos 
los correos electrónicos, salientes o entrantes, pueden ser grabados y 
monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda 
responsabilidad ante cualquier perdida o daño que surja o resulte de la 
recepción, uso o transmisión de este correo electrónico hasta el máximo 
permitido por la ley.

This email and any attachment to it are confidential. Unless you are the 
intended recipient, you may not use, copy or disclose either the message or any 
information contained in the message. If you are not the intended recipient, 
you should delete this email and notify the sender immediately. Any views or 
opinions expressed in this email are those of the sender only, unless otherwise 
stated. All copyright in any of the material in this email is reserved. All 
emails, incoming and outgoing, may be recorded and monitored for legitimate 
business purposes. We exclude all liability for any loss or damage arising or 
resulting from the receipt, use or transmission of this email to the fullest 
extent permitted by law.

Reply via email to