Hello all, Any suggestions? Where am I going wrong or is there any better way of achieving this so that I can do replay as well ?
Thanks Mohil On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote: > Hi everyone, > I need a suggestion regarding usage of the side input pattern and sliding > window, especially while replaying old kafka logs/offsets. > > FYI: I am running beam 2.19 on google dataflow. > > I have a use case where I read a continuous stream of data from Kafka and > need to calculate one score (apart from other calculations) per key which > is based on the number of such requests that are received per key in the > last one hour. > > Roughly my code looks like following: > > PCollection<POJO> = p > .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() > .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) > .withTopic("app_access_stats") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withConsumerConfigUpdates(kafkaConsumerProperties) > .withConsumerFactoryFn(consumerFactoryObj) > .commitOffsetsInFinalize()) > .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, > byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) > .withAllowedLateness(Duration.standardDays(380)) > .discardingFiredPanes()) > .apply("Convert_KafkaRecord_To_PCollection<POJO>", > ParDo.of(new ParseKafkaLogs())); > > > /*** Class that handles incoming PCollection<POJO> and calculate score ***/ > > /**. Assume input = incoming PCollection<POJO> as created above > > PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView > > = input.apply("Calculate_Total_UserRequests_Past_1Hr", new > WindowedNumUserRequestsPerKey()).apply(View.asMap()); > > /**Calculate Running sum of num of reqs in sliding window > > Starting sliding window of duration 1 hr every 1 sec so that we can get > accurate result of past 1 hr > > **/ > > > private static class WindowedNumUserRequestsPerKey extends > PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> { > > @Override > public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) { > > return input > .apply("Applying_Sliding_Window_1Hr_Every1sec", > Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) > .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) > .apply("Total_Requests_Per_Key", Combine.perKey(new > CalculateTotalUserRequestsPerKey())); > } > > private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, > POJO>> { > @ProcessElement > public void processElement(@Element POJO input, > OutputReceiver<KV<KEY, POJO>> out) { > /** code that emits required KV ****/ > > } > } > > private static class CalculateTotalUserRequestsPerKey extends > Combine.CombineFn<POJO, > CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { > private static class TotalRequestsAccumulator implements Serializable > { > private long num_requests_running_sum = 0; > > TotalRequestsAccumulator(long num_requests_running_sum) { > this.num_requests_running_sum = num_requests_running_sum; > } > > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (!(o instanceof TotalRequestsAccumulator)) return false; > TotalRequestsAccumulator that = (TotalRequestsAccumulator) o; > return num_requests_running_sum == > that.num_requests_running_sum; > } > > @Override > public int hashCode() { > return Objects.hash(num_requests_running_sum); > } > } > > @Override > public TotalRequestsAccumulator createAccumulator() { > return new TotalRequestsAccumulator(0); > } > > @Override > public TotalRequestsAccumulator addInput(TotalRequestsAccumulator > mutableAccumulator, POJO input) { > mutableAccumulator.num_requests_running_sum++; > return mutableAccumulator; > } > > @Override > public TotalRequestsAccumulator > mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) { > TotalRequestsAccumulator merged = createAccumulator(); > for (TotalRequestsAccumulator accumulator : accumulators) { > merged.num_requests_running_sum += > accumulator.num_requests_running_sum; > } > return merged; > } > > @Override > public Long extractOutput(TotalRequestsAccumulator accumulator) { > Long totalUserRequestsPerKey = > accumulator.num_requests_running_sum; > return totalUserRequestsPerKey; > } > } > } > > Now I calculate the score in the incoming POJO by using > slidingWindowHourlyUserRequestsPerKeyView as side input. > > input.apply("Add_Score", ParDo.of(new AddScore()) > .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); > > > Above seems to be working fine, though I need a suggestion if there is a > better way of achieving this? > > Also, I start getting problems when we have to stop the beam for a couple > of hours for maintenance or some other issue while data is continuously > being pumped in kafka. > > Problem: > When the beam resumes after a couple of hours, suddenly the above sliding > window gets bombarded with log messages and instead of honoring log's > timestamp, it just treats all the log messages received in the beam's > sliding window, thereby giving the wrong score. For eg, if the beam was > stopped between 9 am and 11 am and there was 20 msgs between 9-10 am and > 30 msgs between 10-11 am, beam on resuming at 11, will consider the total > 50 msgs received in the last one hour as side input while processing all > log messages between 9am and 11 am. > > To overcome this, I tried a few things , but every approach failed: > > *1. In the transformation which read message from kafka and create > PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't > work as I believe you can't output data with older timestamp in a live > window while reading real time data stream from kafka.* > > *2. I thought, since the stage that add score is not honoring event's > timestamp (as evident by printing window.startTime in DoFn), I added custom > timestamp policy while reading logs from kafka i.e something like this:* > * KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) -> > new CustomFieldTimePolicy(previousWatermark))* > > *where CustomFieldTimePolicy set time record timestamp based on received > record's timestamp.* > * On doing this, through the window.startTime printed a somewhat accurate > time which was close to even't timestamp, however, > "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It > just stalled. My print statements were showing up in > aforementioned GroupByAggregationKey(), but then no output was emitted as > if the pipeline was stuck at that stage. I couldn't find any log in GCP's > stackdriver indicating the reason for the stalled pipeline.* > > *Any help/suggestion for solving this case. This will be very useful in > our replay jobs where for some reason our data sink such as elastic search > gets corrupted and we want to read again all the old kafka offsets and > recreate the data in the new ES cluster.* > > > Thanks and Regards > Mohil > > > >