Actually, I think the program you shared in the first mail of this thread
looks fine for the purpose you describe.

Timestamp and watermark assignment works as follows:
- For each records, a long timestamp is extracted (UNIX/epoch timestamp).
- A watermark is a timestamp which says that no more records with a
timestamp lower than the watermark will be processed. Records which violate
that condition are by default dropped.
- So the watermark should follow the max emitted record timestamp minus a
safety margin.
- If you use a periodic watermark assigner, the watermark function is
periodically called.

Best, Fabian


2017-03-16 22:54 GMT+01:00 Paul Smith <psm...@aconex.com>:

> I have managed to discover that my presumption of log4j log file being a _
> *guaranteed*_ sequential order of time is incorrect (race conditions).
> So some logs are out of sequence, and I was getting _*some*_ Monotonic
> Timestamp violations.  I did not discover this because somehow my local
> flink was not outputting logs properly. (a restart of the service seemed to
> fix that..)
>
>
>
> **HOWEVER**.. Once I switched to an “allow lateness” style timestamp
> assigner, I still get the same problem.
>
>
> Fundamentally I think I’ve come to the realisation that my concept of
> Event Time is not the same as Flink.  So let me start again to explain what
> I’d like to do.
>
>
> I’d like to group-by log records and sum up the metric values according to
> their timestamp – effectively a SQL group by where the timestamp of the
> event is aggegrated into 15 minute chunks.  I’d like to be able to do
> (eventually) run this same job for Batch & Live streaming.
>
>
>
> I had thought be assigning the Log rows timestamp to BE the Event Time,
> that the Time Window aggregation would work. But now I wonder if what I
> should be doing is assigning a new field, called ‘time bracket’ perhaps,
> and add that as one of the Keys for the stream (I was keying by
> “identifierType:identifier”, so change that to 
> “identifierType:identifier:timeBracket”.
> This way the Fold function can still sum up the metrics by this groupBy
> key, and then rely on some periodic _*actual time*_ watermarking
> (ingestion or Processing).
>
>
>
> I think I should disconnect the Flink time system from this aggregation
> level. I had thought that because I wanted to be able to apply this job to
> both a Live Streaming and a Batch processing (historical logs) that I
> needed to use the Event Time model..
>
>
>
> That could be my fatal flaw.  Does that make sense?
>
>
>
> Paul
>
>
>
> *From: *Fabian Hueske <fhue...@gmail.com>
> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org>
> *Date: *Thursday, 16 March 2017 at 10:22 pm
>
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Batch stream Sink delay ?
>
>
>
> What kind of timestamp and watermark extractor are you using?
>
> Can you share your implementation?
>
> You can have a look at the example programs (for example [1]). These can
> be started and debugged inside an IDE by executing the main method.
> If you run it in an external process, you should be able to connect to the
> process with the standard options.
>
> Best, Fabian
>
>
> [1] https://github.com/apache/flink/blob/master/flink-
> examples/flink-examples-streaming/src/main/java/org/
> apache/flink/streaming/examples/wordcount/WordCount.java
>
>
>
> 2017-03-16 12:10 GMT+01:00 Paul Smith <psm...@aconex.com>:
>
> Thanks again for your reply.
>
>
>
> I've tried with both Parallel=1 through to 3. Same behavior.
>
>
>
> The log file is monotonically increasing time stamps generated through an
> application using log4j. Each log line is distinctly incrementing time
> stamps it is an 8GB file I'm using as a test case and has about 8 million
> rows.
>
>
>
> Whether parallel of 1 or 3 the same output is shown, the data gets to the
> sink at the end and all looks correct - I set the folded results record
> time stamp to the end of each window and I see nice chunks of 15 minute
> blocks in the result.
>
>
>
> I'm not sure why the watermarks are not being sent as the data progresses.
>
>
>
> I might try pushing the data (same data) though Kafka and see if I get a
> different result. I might also take a sample through the file (rather than
> the whole file to see if I get differing results)
>
>
>
> Is there a wiki page anywhere that shows how to debug a job thorough an
> IDE?  Can I easily remote attach to a running process via standard java
> options?
>
>
>
> Regards
>
>
>
> Paul
>
>
> On 16 Mar 2017, at 21:15, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Paul,
>
> since each operator uses the minimum watermark of all its inputs, you must
> ensure that each parallel task is producing data.
>
> If a source does not produce data, it will not increase the timestamps of
> its watermarks.
>
> Another challenge, that you might run into is that you need to make sure
> that the file (or file chunks if it is split for parallel reading) is read
> in the increasing timestamp order.
>
> Otherwise, watermarks will be emitted too early. I
>
> If I got understood your use case correctly, you are just experimenting
> with file input to get a feeling for the API.
>
> I would try to set the parallelism of the file source to 1 to ensure that
> the data is read in the same order and that all tasks are producing data.
>
> Hope this helps,
>
> Fabian
>
>
>
> 2017-03-15 23:54 GMT+01:00 Paul Smith <psm...@aconex.com>:
>
> Thanks Fabian, I’m pretty sure you are correct here.  I can see in the
> Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks
> of it, so Watermarks are not being emitted at all until the end.  This
> stays all the way through the job.
>
>
>
> I’m not sure why this is the case.  I’ve verified that my
> TimestampExtractor class is being called, and returning the value I’d
> expect (The timestamp from the log line), and looks legitimate.  My
> WindowFunction which is doing the aggregation is not being called until
> right at the end of the job, but yet the windows comes out ok in the
> destination.
>
>
>
> I am not sure how to debug this one.  Do you or anyone have any other
> suggestions on how to debug why the Windowing is not being triggered?  I
> can’t glean any useful further ideas from the metrics I can see..
>
>
>
> Appreciate the help
>
>
>
> Cheers,
>
>
> Paul
>
>
>
> *From: *Fabian Hueske <fhue...@gmail.com>
> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org>
> *Date: *Tuesday, 14 March 2017 at 7:59 pm
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Batch stream Sink delay ?
>
>
>
> Hi Paul,
>
> This might be an issue with the watermarks. A window operation can only be
> compute and emit its results when the watermark time is later than the end
> time of the window.
>
> Each operator keeps track of the maximum timestamp of all its input tasks
> and computes its own time as the minimum of all those maximum timestamps.
>
> If one (or all) watermarks received by the window operator is not later
> than the end time of the window, the window will not be computed.
>
> When an file input is completely processed, Flink sends a Long.MAX_VALUE
> timestamp which might trigger the execution at the end of the job.
>
> I would try to debug the watermarks of your job. The web dashboard
> provides a few metrics for that.
>
> Best, Fabian
>
>
>
> 2017-03-14 2:47 GMT+01:00 Paul Smith <psm...@aconex.com>:
>
> Using Flink 1.2.
>
>
>
> Hi all, I have a question about Batch processing and Sinks.  I have a
> Flink job that parses a User Request log file that contains performance
> data per request. It accumulates metric data values into 15 minute time
> windows.  Each log line is mapped to multiple records, so that each of the
> metric value can be 'billed' against a few different categories (User,
> Organisation, Project, Action).  The goal of the Flink job is to distil the
> volume of request data into more manageable summaries.  We can then see
> breakdowns of, say, User CPU utilised by distinct categories (e.g. Users)
> and allow us to look for trends/outliers.
>
>
>
> It is working very well as a batch, but with one unexpected behaviour.
>
>
>
> What I can't understand is that the Sink does not appear to get _any_
> records until the rest of the chain has completed over the entire file.
> I've played around with parallelism (1->3), and also verified with logging
> that the Sink isn't seeing any data until the entire previous chain is
> complete.  Is this expected behaviour? I was thinking that as each Time
> Window passed a 'block' of the results would be emitted to the sink.  Since
> we use Event Time characteristics, the batch job ought to emit these chunks
> as each 15 minute segment passes?
>
>
>
> You can see the base job here, with an example of a single log line with
> the metrics here:
>
>
>
> https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152
>
>
>
> Each Category I've called an 'identifierType' (User, Organisation..), with
> the 'identifier' the value of that (a UserId for example).  I key the
> stream by this pair of records and then Fold the records by the Time window
> summing each metric type's value up.  I have yet to work out the proper use
> case for Fold versus Reduce, I may have got that wrong, but I can't see how
> that changes the flow here.
>
>
>
> The output is a beautiful rolled up summary by 15 minutes by each
> identifierType & identifier.  I have yet to attempt a live Streaming
> version of this, but had thought the Batch version would also be concurrent
> and start emitting 15 minute windows as soon as the stream chunks
> transitions into the next window.   Given the entire job takes about 5
> minutes on my laptop for 8 million raw source lines whose data is spread
> out over an entire day, I would have thought there's some part of that 5
> minutes that would be emitting chunks of summary data to the sink?  But
> nothing turns up until the entire job is done.
>
>
>
> Maybe the data is just too small.. Maybe there's buffering going on
> somewhere in the chain. ?
>
>
>
> Any pointers would be appreciated in understanding the flow here.
>
>
>
> Cheers,
>
>
>
> Paul Smith
>
>
>
>
>
>
>
>
>
>
>

Reply via email to