Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-24 Thread knur
Bump? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Change Flink checkpoint configuration at runtime

2019-01-22 Thread knur
I'm running a streaming job that uses the following config:

checkpointInterval = 5 mins
minPauseBetweenCheckpoints = 2 mins
checkpointTimeout = 1 minute
maxConcurrentCheckpoints = 1

This is using incremental, async checkpoints with the RocksDb backend. So
far around 2K checkpoints have been triggered, but I just noticed that after
the first ~1K the checkpoints have been failing with:

Checkpoint 1560 of job 9054d277265950c07ab90cf7ba0641d0 expired before
completing.

Now I'm in a very interesting position: I want to trigger a `savepoint` or a
`cancel -s`, but both of those commands will fail because they are coupled
to the checkpoint mechanism. i.e. both commands fail precisely because the
checkpoints are timing out.

Hence my question... is there a way to change the configuration of the
checkpoints at runtime? It seems like there is no such thing, but also not a
good reason why it couldn't be implemented (we already allow modifying the
parallelism of a job which looks like a harder problem to solve).

Assuming there is no way to do this... how should I try to save my job? I do
have enabled the `RETAIN_ON_CANCELLATION` policy.

Should I be able to resume the job from the last checkpoint using the
--savepoint flag?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-17 Thread knur
Hello Jamie.

Thanks for taking a look at this. So, yes, I want to write only the last
data for each key every X minutes. In other words, I want a snapshot of the
whole database every X minutes.

>  The issue is that the window never get's PURGED so the data just
> continues to accumulate in the window.  This will grow without bound.

The window not being purged does not necessarily mean that the data will be
accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.

The reduce function has an implicit evictor that automatically removes
events from the window pane that are no longer needed. i.e. it keeps in
state only the element that was reduced. Here is an example:

env.socketTextStream("localhost", )
  .keyBy { it.first().toString() }
  .window(GlobalWindows.create())
 
.trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
  .reduce { left, right ->
println("left: $left, right: $right")
if (left.length > right.length) {
  left
} else {
  right
}
  }
  .printToErr()

For your claim to hold true, every time the trigger fires one would expect
to see ALL the elements by a key being printed over and over again in the
reduce function. However, if you run a job similar to this one in your lang
of choice, you will notice that the print statement is effectively called
only once per event per key.

In fact, not using purge is intentional. Because I want to hold every record
(the last one by its primary key) of the database in state so that I can
write a snapshot of the whole database.

So for instance, let's say my table has two columns: id and time. And I have
the following events:

1,January
2,February
1,March

I want to write to S3 two records: "1,March", and "2,February".

Now, let's say two more events come into the stream:

3,April
1,June

Then I want to write to S3 three records: "1,June", "2,February" and
"3,April".

In other words, I can't just purge the windows, because I would lose the
record with id 2.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Dump snapshot of big table in real time using StreamingFileSink

2019-01-17 Thread knur
Hello there.

So we have some Postgres tables that are mutable, and we want to create a
snapshot of them in S3 every X minutes. So we plan to use Debezium to send a
CDC log of every row change into a Kafka topic, and then have Flink keep the
latest state of each row to save that data into S3 subsequently.

Our current job looks like this and works somehow well in most cases:

   // checkpoint interval is set to run every 10 minutes

kafkaSource
  .keyB { it.id }
  .window(GlobalWindows.create())
  .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.minutes(5)))
  .reduce { left, right ->
if (left.timestamp() > right.timestamp()) {
  left
} else {
  right
}
  }
  .addSink(StreamingFileSink
.forBulkFormat(Path(outputDir),
ParquetAvroWriters.forGenericRecord(avroSchema))
   
.withBucketAssigner(DateTimeBucketAssignerr("'date='-MM-dd/'hour='HH/'minute='mm"))
.build())

We use `GlobalWindows.create()` because we want to hold in Flink's state ALL
the changes send into Kafka (the reduce function, according to the docs,
will make sure to evict all events except the last one).

This works, but we know there could be some edge cases. For instance, if the
trigger fires around the same time that a checkpoint, we could get into a
position where StreamingFileSink rolls an incomplete set of all the events
triggered.

So a couple of questions:

1. Is there a way to mark the events with the timestamp of the trigger that
fired them?
2. Is the approach we took fine? (keep in mind that we will deal with giant
tables, so a batch job that queries them every N seconds is not an option).
3. Do you foresee any other edge cases?

Thanks for taking a look at this.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/