:)))Ok I give it a try.
// TextIO to read a line from input-file set in options p.apply("ReadLines", 
TextIO.Read.from(options.getInputFile()))//Instantiate CountWords & and pass 
the line that was just read in the previous line to it for process     
.apply(new CountWords())// counts same word occurence and return it as a long// 
Format the long result created by CountWords  object to text     
.apply(MapElements.via(new FormatAsTextFn()))// TextIO to write result as text 
into an output-file set in options
     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
Did I pass? :))Thanks       From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Tuesday, August 2, 2016 3:59 PM
 Subject: Re: Any data gets cached?
   
Hi Amir,
It is not clear what you want do. I am not even sure if we both mean the same 
when we say 'aggregation' :). Can you confirm if you understand how Beam 
WordCount.java example works? Especially the lines 208-211.

On Tue, Aug 2, 2016 at 11:25 AM, amir bahmanyari <[email protected]> wrote:

Hi Raghu,Any opinion on this pls? I appreciate your time...Cheers

      From: amir bahmanyari <[email protected]>
 To: "[email protected]" <[email protected]> 
 Sent: Sunday, July 31, 2016 3:05 PM
 Subject: Re: Any data gets cached?
   
Hi Raghu,Thanks so much for your response. Following is how I am reading 
unbounded records from Kafka through KafkaIO() & processing them in its 
corresponding inner class.How do I include "aggregation" in this call?have a 
great weekend.
Pipeline p = Pipeline.create(options);
............................etc...........................try { 
PCollection<KV<String, String>> kafkarecords = 
p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
 .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) 
.apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, 
String>>() {...................etc......................
                                            @Override public void 
processElement(ProcessContext ctx) throws Exception 
{............................etc......................................       
From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Saturday, July 30, 2016 2:15 PM
 Subject: Re: Any data gets cached?
  
On the surface it looks like you are asking about basic aggregations. These are 
of course provided by Beam too. Almost all Beam examples make use of these. See 
'Count.<string>PerElement()' in WordCound.java example. If not either post your 
Beam code or roughly equivalent SQL here.
On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <[email protected]> wrote:

Hi Raghu,Is this the right assumption that if results are not aggregated we may 
have inconsistency in what the final result may look like?What would be the 
best aggregation approach to guarantee consistency? Even if there is perf. 
cost.Thanks

      From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
  
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), 
see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <[email protected]> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in 
a Flink Cluster of 2 nodes.Every run produces a different result while we know 
what the result MUST be: its an expected fixed number.I checked and see Kafka 
is NOT sending any extra records.My first suspect was Redis thread-UNsafe 
hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked 
for the very first time every PERFECT.I then re-run exact same thing expecting 
the exact same previous result.But its different. Run it again. Another 
different wrong result. A different result each time.No code change nothing 
different.
I am wondering if some previous data/record gets cached by 
Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am 
losing my mind catching this moving target :))Appreciate your kind feedback on 
this.Cheers+have a great weekend.Amir-



   



   

   



  

Reply via email to