:)))Ok I give it a try.
// TextIO to read a line from input-file set in options p.apply("ReadLines",
TextIO.Read.from(options.getInputFile()))//Instantiate CountWords & and pass
the line that was just read in the previous line to it for process
.apply(new CountWords())// counts same word occurence and return it as a long//
Format the long result created by CountWords object to text
.apply(MapElements.via(new FormatAsTextFn()))// TextIO to write result as text
into an output-file set in options
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
Did I pass? :))Thanks From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Tuesday, August 2, 2016 3:59 PM
Subject: Re: Any data gets cached?
Hi Amir,
It is not clear what you want do. I am not even sure if we both mean the same
when we say 'aggregation' :). Can you confirm if you understand how Beam
WordCount.java example works? Especially the lines 208-211.
On Tue, Aug 2, 2016 at 11:25 AM, amir bahmanyari <[email protected]> wrote:
Hi Raghu,Any opinion on this pls? I appreciate your time...Cheers
From: amir bahmanyari <[email protected]>
To: "[email protected]" <[email protected]>
Sent: Sunday, July 31, 2016 3:05 PM
Subject: Re: Any data gets cached?
Hi Raghu,Thanks so much for your response. Following is how I am reading
unbounded records from Kafka through KafkaIO() & processing them in its
corresponding inner class.How do I include "aggregation" in this call?have a
great weekend.
Pipeline p = Pipeline.create(options);
............................etc...........................try {
PCollection<KV<String, String>> kafkarecords =
p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
.withValueCoder(StringUtf8Coder.of()).withoutMetadata())
.apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String,
String>>() {...................etc......................
@Override public void
processElement(ProcessContext ctx) throws Exception
{............................etc......................................
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Saturday, July 30, 2016 2:15 PM
Subject: Re: Any data gets cached?
On the surface it looks like you are asking about basic aggregations. These are
of course provided by Beam too. Almost all Beam examples make use of these. See
'Count.<string>PerElement()' in WordCound.java example. If not either post your
Beam code or roughly equivalent SQL here.
On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <[email protected]> wrote:
Hi Raghu,Is this the right assumption that if results are not aggregated we may
have inconsistency in what the final result may look like?What would be the
best aggregation approach to guarantee consistency? Even if there is perf.
cost.Thanks
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Friday, July 29, 2016 2:32 PM
Subject: Re: Any data gets cached?
I suggest you try aggregating using Beam primitives (GroupByKey, count etc),
see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <[email protected]> wrote:
Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in
a Flink Cluster of 2 nodes.Every run produces a different result while we know
what the result MUST be: its an expected fixed number.I checked and see Kafka
is NOT sending any extra records.My first suspect was Redis thread-UNsafe
hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked
for the very first time every PERFECT.I then re-run exact same thing expecting
the exact same previous result.But its different. Run it again. Another
different wrong result. A different result each time.No code change nothing
different.
I am wondering if some previous data/record gets cached by
Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am
losing my mind catching this moving target :))Appreciate your kind feedback on
this.Cheers+have a great weekend.Amir-