Re: KafkaIO write in case on topic name present in PCollection

2020-06-02 Thread Alexey Romanenko
Hi Mohil,

In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will need to 
provide a PCollection> as an input collection where you 
set a desired output topic for every record inside ProducerRecord metadata.
It could look something like this:

PCollection>  teams = ...;

PCollection> records = teams.apply(ParDo.of(new 
KV2ProducerRecord());

private static class KV2ProducerRecord
extends DoFn, ProducerRecord> {
// create ProducerRecord and set your topic there
...
}

records.apply(KafkaIO.writeRecords()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results”) // default sink topic 
.withKeySerializer(...)
.withValueSerializer(...));



> On 2 Jun 2020, at 03:27, Mohil Khare  wrote:
> 
> Hello everyone,
> 
> Does anyone know if  it is possible to provide a topic name embedded in a 
> PCollection object to kafkaIO while writing ?
> 
> We have a use case where we have a team specific kafka topic for eg 
> teamA_topicname, teamB_topicname.
> 
> From beam, we create PCollection> and we need to send this 
> data to kafka over aforementioned team specific topics.
> Is it possible to provide topic names dynamically to 
> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
> 
> Thanks and regards
> Mohil
> 
> 
> 
> 



Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-02 Thread Luke Cwik
Using side inputs is fine and is a common pattern. You should take a look
at "slowly changing side inputs"[1] as there is some example code there.

1:
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing

On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare  wrote:

> Thanks Luke for your reply.
> I see. I am trying to recall why I added allowedLateness as 360 days.
> Anyways I will try without that.
>
> But do you think the approach I am using to keep getting a running score
> in a sliding window and then using it as a side input to decorate the main
> log  is correct ? Or I can achieve same thing is a much better and
> optimized way.
>
> Thanks again
> Mohil
>
> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:
>
>> Your allowed lateness is 360 days and since the trigger you have doesn't
>> emit speculative results, you'll have to wait till the watermark advances
>> to the end of windows timestamp + 360 days before something is output from
>> the grouping aggregation/available at the side input.
>>
>>
>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>>
>>> Hello all,
>>>
>>> Any suggestions? Where am I going wrong or is there any better way of
>>> achieving this so that I can do replay as well ?
>>>
>>> Thanks
>>> Mohil
>>>
>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>>>
 Hi everyone,
 I need a suggestion regarding usage of the side input pattern and
 sliding window, especially while replaying old kafka logs/offsets.

 FYI: I am running beam 2.19 on google dataflow.

 I have a use case where I read a continuous stream of data from Kafka
 and need to calculate one score (apart from other calculations) per key
 which is based on the number of such requests that are received per key in
 the last one hour.

 Roughly my code looks like following:

 PCollection = p
 .apply("Read__Logs_From_Kafka", KafkaIO.read()
 .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
 .withTopic("app_access_stats")
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ByteArrayDeserializer.class)
 .withConsumerConfigUpdates(kafkaConsumerProperties)
 .withConsumerFactoryFn(consumerFactoryObj)
 .commitOffsetsInFinalize())
 .apply("Applying_Fixed_Window_Logs", Window.>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
 
 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
 .withAllowedLateness(Duration.standardDays(380))
 .discardingFiredPanes())
 .apply("Convert_KafkaRecord_To_PCollection",
 ParDo.of(new ParseKafkaLogs()));


 /*** Class that handles incoming PCollection and calculate score ***/

 /**. Assumeinput = incoming PCollection as created above

 PCollectionView> slidingWindowHourlyUserRequestsPerKeyView

= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
 WindowedNumUserRequestsPerKey()).apply(View.asMap());

 /**Calculate Running sum of num of reqs in sliding window

 Starting sliding window of duration 1 hr every 1 sec so that we can 
 get accurate result of past 1 hr

 **/


 private static class WindowedNumUserRequestsPerKey extends 
 PTransform, PCollection>> {

 @Override
 public PCollection> expand(PCollection input) {

 return input
 .apply("Applying_Sliding_Window_1Hr_Every1sec", 
 Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
 
 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
 .apply("Grouping_per_Key", ParDo.of(new 
 GroupByAggregationKey()))
 .apply("Total_Requests_Per_Key", Combine.perKey(new 
 CalculateTotalUserRequestsPerKey()));
 }

 private static class GroupByAggregationKey extends DoFn>>> POJO>> {
 @ProcessElement
 public void processElement(@Element POJO input, 
 OutputReceiver> out) {
 /** code that emits required KV /

 }
 }

 private static class CalculateTotalUserRequestsPerKey extends 
 Combine.CombineFn>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
 private static class TotalRequestsAccumulator implements 
 Serializable {
 private long num_requests_running_sum = 0;

 TotalRequestsAccumulator(long num_requests_running_sum) {
 this.num_requests_running_sum = num_requests_running_sum;
 }

 @Override
 public boolean

Beam First Steps Workshop - 9 June

2020-06-02 Thread Austin Bennett
Hi Beam Users,

Wanted to share the Workshop that I'll give at Berlin Buzzword's next week:

https://berlinbuzzwords.de/session/first-steps-apache-beam-writing-portable-pipelines-using-java-python-go

Do consider joining if you are able and interested (if you're here and
already using, then the workshop would likely be too basic).  Should you
not able to find a way to get a pass, do feel free to write and I'll see
what can be done (no promises, unsure what/whether anything).  If there is
eventually unmet demand, we can see about offering a free/public event
(like the Beam Learning month(s) events), and may have something similar at
https://beamsummit.org/.

Cheers,
Austin


New Addition to the Katas Family: Kotlin

2020-06-02 Thread Rion Williams
Hi all,

You may remember an e-mail across the mailing lists a few weeks back that 
informed everyone of the wonderful Beam Katas available on Stepik 
 for learning more about writing Beam applications, working 
with its various APIs, all from the comfort of your favorite IDEs.

Today, I'm letting everyone know about a new addition to the Beam family for 
Kotlin . Unlike the other Katas which focus on 
languages that directly correspond to the existing SDKs, this one targets a 
popular alternative to Java (that interops with it as well). I've been using 
Kotlin ever since being introduced to Beam and it's provided a fantastic 
developer experience, so I thought it'd be worth sharing with others. If you 
would love a change of pace from the "big three" of Beam, I'd encourage you to 
give it a try (or if you are already familiar with Beam and want to give 
another language a whirl).

You can find the Kotlin and the other excellent Katas below (or by just 
searching for "Beam Katas" within IntelliJ or through the EduTools plugin):

1. Kotlin - https://stepik.org/course/72488

2.  Java - https://stepik.org/course/54530

3.  Python -  https://stepik.org/course/54532

4.  Go (in development) - https://stepik.org/course/70387

I'd like to extend a very special thanks to Henry Suryawirawan for his creation 
of the original series of Katas and his support during the review process, as 
well as Pablo Estrada for helping this thing get merged in.

Thanks everyone and feel free to reach out if you run into any issues with the 
course!

Rion Williams


Re: KafkaIO write in case on topic name present in PCollection

2020-06-02 Thread Mohil Khare
Hey Alexey,

Thanks a lot for your quick response. This worked for me :). Awesome.

Regards
Mohil


On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko 
wrote:

> Hi Mohil,
>
> In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will
> need to provide a PCollection> as an input collection
> where you set a desired output topic for every record inside ProducerRecord
> metadata.
> It could look something like this:
>
> PCollection>  teams = ...;
>
> PCollection> records = teams.apply(ParDo.
> of(new KV2ProducerRecord());
>
> private static class KV2ProducerRecord
> extends DoFn, ProducerRecord> {
> // create ProducerRecord and set your topic there
> ...
> }
>
> records.apply(KafkaIO.writeRecords()
> .withBootstrapServers("broker_1:9092,broker_2:9092")
> .withTopic("results”) // default sink topic
> .withKeySerializer(...)
> .withValueSerializer(...));
>
>
>
> On 2 Jun 2020, at 03:27, Mohil Khare  wrote:
>
> Hello everyone,
>
> Does anyone know if  it is possible to provide a topic name embedded in a
> PCollection object to kafkaIO while writing ?
>
> We have a use case where we have a team specific kafka topic for eg
> teamA_topicname, teamB_topicname.
>
> From beam, we create PCollection> and we need to send
> this data to kafka over aforementioned team specific topics.
> Is it possible to provide topic names dynamically to
> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
>
> Thanks and regards
> Mohil
>
>
>
>
>
>


Re: New Addition to the Katas Family: Kotlin

2020-06-02 Thread Henry Suryawirawan
Finally the Kotlin Katas course is now live 🙌
Thanks for your contribution and submission Rion!



On Wed, Jun 3, 2020 at 5:20 AM Rion Williams  wrote:

> Hi all,
>
> You may remember an e-mail across the mailing lists a few weeks back that
> informed everyone of the wonderful Beam Katas available on Stepik <
> https://stepik.org> for learning more about writing Beam applications,
> working with its various APIs, all from the comfort of your favorite IDEs.
>
> Today, I'm letting everyone know about a new addition to the Beam family
> for Kotlin . Unlike the other Katas which focus
> on languages that directly correspond to the existing SDKs, this one
> targets a popular alternative to Java (that interops with it as well). I've
> been using Kotlin ever since being introduced to Beam and it's provided a
> fantastic developer experience, so I thought it'd be worth sharing with
> others. If you would love a change of pace from the "big three" of Beam,
> I'd encourage you to give it a try (or if you are already familiar with
> Beam and want to give another language a whirl).
>
> You can find the Kotlin and the other excellent Katas below (or by just
> searching for "Beam Katas" within IntelliJ or through the EduTools plugin):
>
> 1. Kotlin - https://stepik.org/course/72488
>
> 2.  Java - https://stepik.org/course/54530
>
> 3.  Python -  https://stepik.org/course/54532
>
> 4.  Go (in development) - https://stepik.org/course/70387
>
> I'd like to extend a very special thanks to Henry Suryawirawan for his
> creation of the original series of Katas and his support during the review
> process, as well as Pablo Estrada for helping this thing get merged in.
>
> Thanks everyone and feel free to reach out if you run into any issues with
> the course!
>
> Rion Williams
>