Hi, we are starting to use Beam with Flink as runner on our
applications, and recently we would like to get advantages that Flink
checkpoiting provides, but it seems we are not understanding it
clearly.

Simplifying, our application does the following:
  - Read meesages from a couple of Kafka topics
  - Combine them
  - Write combination result to a sink (Exasol DB)

As application is processing messages using event time, and one of the
topics is almost idle, the first time application is started messages
are stuck in the combiner because watermark don't advance until we have
messages arriving onto idled topic (we know this and is not a problem
for us though).

The problem is that we've observed, if a checkpoint is triggered when
messages are still stuck in the combiner, surprisingly for us, the
checkpoint finishes successfully (and offsets committed to Kafka) even
messages haven't progressed to the sink yet. Is this expected?

The thing is that, if in the future, we make not state compatible
changes in application source code, checkpoint taken couldn't be
restored. So we would like to start the application without using any
checkpoint but without losing data.
Problem here would be that data loss would happen because messages
stuck in combiner are already committed to Kafka and application would
start to read from latest commited offset in Kafka if we don't use any
checkpoint, thus those messages are not going to be read from the
source again.

So, I guess our question is how are you doing in order to not lose data
when developing applications, because sooner or later you are going to
add breaking changes...

For example, we've seen those two errors so far:
  - After changing an operator name:

2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
leadership with session id 00000000-0000-0000-0000-000000000000.
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
not set up JobManager
    at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRu
nner.java:152)
    at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea
teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
    at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR
unner$5(Dispatcher.java:375)
    at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check
edSupplier.java:34)
    ... 7 more
Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
Cannot map checkpoint/savepoint state for operator
f476451c6210bd2783f36fa331b9da5e to the new program, because the
operator is not available in the new program. If you want to allow to
skip this, you can set the --allowNonRestoredState option on the CLI.
    at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi
nt(Checkpoints.java:205)
...

  - After modifying a Java model class involved in a combine:
org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(
HeapKeyedStateBackendBuilder.java:116)
    at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta
teBackend(FsStateBackend.java:529)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial
izeState(AbstractStreamOperator.java:253)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str
eamTask.java:881)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j
ava:395)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local
class incompatible: stream classdesc serialVersionUID =
8366890161513008789, local class serialVersionUID = 174312384610985998
    at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)


Apologies in advance as we are new to Flink, so may be we are missing
something obvious here.

Thanks


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser 
que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el 
mensaje como cualquier información contenida en el mensaje. Si no es el 
destinatario, debe borrar este correo y notificar al remitente inmediatamente. 
Cualquier punto de vista u opinión expresada en este correo electrónico son 
únicamente del remitente, a no ser que se indique lo contrario. Todos los 
derechos de autor en cualquier material de este correo son reservados. Todos 
los correos electrónicos, salientes o entrantes, pueden ser grabados y 
monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda 
responsabilidad ante cualquier perdida o daño que surja o resulte de la 
recepción, uso o transmisión de este correo electrónico hasta el máximo 
permitido por la ley.

This email and any attachment to it are confidential. Unless you are the 
intended recipient, you may not use, copy or disclose either the message or any 
information contained in the message. If you are not the intended recipient, 
you should delete this email and notify the sender immediately. Any views or 
opinions expressed in this email are those of the sender only, unless otherwise 
stated. All copyright in any of the material in this email is reserved. All 
emails, incoming and outgoing, may be recorded and monitored for legitimate 
business purposes. We exclude all liability for any loss or damage arising or 
resulting from the receipt, use or transmission of this email to the fullest 
extent permitted by law.

Reply via email to