Hello all,

Any suggestions? Where am I going wrong or is there any better way of
achieving this so that I can do replay as well ?

Thanks
Mohil

On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hi everyone,
> I need a suggestion regarding usage of the side input pattern and sliding
> window, especially while replaying old kafka logs/offsets.
>
> FYI: I am running beam 2.19 on google dataflow.
>
> I have a use case where I read a continuous stream of data from Kafka and
> need to calculate one score (apart from other calculations) per key  which
> is based on the number of such requests that are received per key in the
> last one hour.
>
> Roughly my code looks like following:
>
> PCollection<POJO> = p
>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>         .withTopic("app_access_stats")
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class)
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>         .withConsumerFactoryFn(consumerFactoryObj)
>         .commitOffsetsInFinalize())
>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>         
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>         .withAllowedLateness(Duration.standardDays(380))
>         .discardingFiredPanes())
>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>         ParDo.of(new ParseKafkaLogs()));
>
>
> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>
> /**. Assume    input = incoming PCollection<POJO> as created above
>
> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>
>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>
> /**Calculate Running sum of num of reqs in sliding window
>
>     Starting sliding window of duration 1 hr every 1 sec so that we can get 
> accurate result of past 1 hr
>
> **/
>
>
> private static class WindowedNumUserRequestsPerKey extends 
> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>
>     @Override
>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>
>         return input
>             .apply("Applying_Sliding_Window_1Hr_Every1sec", 
> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>                 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>             .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>             .apply("Total_Requests_Per_Key", Combine.perKey(new 
> CalculateTotalUserRequestsPerKey()));
>     }
>
>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, 
> POJO>> {
>         @ProcessElement
>         public void processElement(@Element POJO input, 
> OutputReceiver<KV<KEY, POJO>> out) {
>             /** code that emits required KV ****/
>
>         }
>     }
>
>     private static class CalculateTotalUserRequestsPerKey extends 
> Combine.CombineFn<POJO, 
> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>         private static class TotalRequestsAccumulator implements Serializable 
> {
>             private long num_requests_running_sum = 0;
>
>             TotalRequestsAccumulator(long num_requests_running_sum) {
>                 this.num_requests_running_sum = num_requests_running_sum;
>             }
>
>             @Override
>             public boolean equals(Object o) {
>                 if (this == o) return true;
>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>                 return num_requests_running_sum == 
> that.num_requests_running_sum;
>             }
>
>             @Override
>             public int hashCode() {
>                 return Objects.hash(num_requests_running_sum);
>             }
>         }
>
>         @Override
>         public TotalRequestsAccumulator createAccumulator() {
>             return new TotalRequestsAccumulator(0);
>         }
>
>         @Override
>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
> mutableAccumulator, POJO input) {
>             mutableAccumulator.num_requests_running_sum++;
>             return mutableAccumulator;
>         }
>
>         @Override
>         public TotalRequestsAccumulator 
> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>             TotalRequestsAccumulator merged = createAccumulator();
>             for (TotalRequestsAccumulator accumulator : accumulators) {
>                 merged.num_requests_running_sum += 
> accumulator.num_requests_running_sum;
>             }
>             return merged;
>         }
>
>         @Override
>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>             Long totalUserRequestsPerKey = 
> accumulator.num_requests_running_sum;
>             return totalUserRequestsPerKey;
>         }
>     }
> }
>
> Now I calculate the score in the incoming POJO by using
> slidingWindowHourlyUserRequestsPerKeyView as side input.
>
> input.apply("Add_Score", ParDo.of(new AddScore())
>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>
>
> Above seems to be working fine, though I need a suggestion if there is a
> better way of achieving this?
>
> Also, I start getting problems when we have to stop the beam for a couple
> of hours for maintenance or some other issue while data is continuously
> being pumped in kafka.
>
> Problem:
> When the beam resumes after a couple of hours, suddenly the above sliding
> window gets bombarded with log messages and instead of honoring log's
> timestamp, it just treats all the log messages received in the beam's
> sliding window, thereby giving the wrong score. For eg, if the beam was
> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
> 50 msgs received in the last one hour as side input while processing all
> log messages between 9am and 11 am.
>
> To overcome this, I tried a few things , but every approach failed:
>
> *1. In the transformation which read message from kafka and create
> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
> work as I believe you can't output data with older timestamp in a live
> window while reading real time data stream from kafka.*
>
> *2. I thought, since the stage that add score is not honoring event's
> timestamp (as evident by printing window.startTime in DoFn), I added custom
> timestamp policy while reading logs from kafka i.e something like this:*
> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
> new CustomFieldTimePolicy(previousWatermark))*
>
> *where CustomFieldTimePolicy set time record timestamp based on received
> record's timestamp.*
> *  On doing this, through the window.startTime printed a somewhat accurate
> time which was close to even't timestamp, however,
> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
> just stalled. My print statements were showing up in
> aforementioned GroupByAggregationKey(), but then no output was emitted as
> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
> stackdriver indicating the reason for the stalled pipeline.*
>
> *Any help/suggestion for solving this case. This will be very useful in
> our replay jobs where for some reason our data sink such as elastic search
> gets corrupted and we want to read again all the old kafka offsets and
> recreate the data in the new ES cluster.*
>
>
> Thanks and Regards
> Mohil
>
>
>
>

Reply via email to