Hi Amir,

It is not clear what you want do. I am not even sure if we both mean the
same when we say 'aggregation' :).
Can you confirm if you understand how Beam WordCount.java
<https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L208>
example works? Especially the lines 208-211.


On Tue, Aug 2, 2016 at 11:25 AM, amir bahmanyari <[email protected]>
wrote:

> Hi Raghu,
> Any opinion on this pls? I appreciate your time...
> Cheers
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Sunday, July 31, 2016 3:05 PM
>
> *Subject:* Re: Any data gets cached?
>
> Hi Raghu,
> Thanks so much for your response. Following is how I am reading unbounded
> records from Kafka through KafkaIO() & processing them in its corresponding
> inner class.
> How do I include "aggregation" in this call?
> have a great weekend.
>
> Pipeline p = Pipeline.create(options);
> ............................etc...........................
> try {
> PCollection<KV<String, String>> kafkarecords =
> p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
> .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> .apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> ...................etc......................
>
>                                             @Override
> public void processElement(ProcessContext ctx) throws Exception {
> ............................etc......................................
> ------------------------------
> *From:* Raghu Angadi <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Saturday, July 30, 2016 2:15 PM
> *Subject:* Re: Any data gets cached?
>
> On the surface it looks like you are asking about basic aggregations.
> These are of course provided by Beam too. Almost all Beam examples make use
> of these. See 'Count.<string>PerElement()
> <https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L152>'
> in WordCound.java example. If not either post your Beam code or roughly
> equivalent SQL here.
>
> On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Raghu,
> Is this the right assumption that if results are not aggregated we may
> have inconsistency in what the final result may look like?
> What would be the best aggregation approach to guarantee consistency? Even
> if there is perf. cost.
> Thanks
>
> ------------------------------
> *From:* Raghu Angadi <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Friday, July 29, 2016 2:32 PM
> *Subject:* Re: Any data gets cached?
>
> I suggest you try aggregating using Beam primitives (GroupByKey, count
> etc), see if it produced in consistent results.
>
> On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Sorry colleagues,
> I am having a moving target in my Beam app with FlinkRunner in a Flink
> Cluster of 2 nodes.
> Every run produces a different result while we know what the result MUST
> be: its an expected fixed number.
> I checked and see Kafka is NOT sending any extra records.
> My first suspect was Redis thread-UNsafe hashmap objects.
> I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very
> first time every PERFECT.
> I then re-run exact same thing expecting the exact same previous result.
> But its different. Run it again. Another different wrong result. A
> different result each time.
> No code change nothing different.
>
> I am wondering if some previous data/record gets cached by
> Beam/FlinkRunner/KafkaIO invocation etc. somewhere.
> Sorry for the long email. Am losing my mind catching this moving target :))
> Appreciate your kind feedback on this.
> Cheers+have a great weekend.
> Amir-
>
>
>
>
>
>
>
>
>
>

Reply via email to