Yes :) I figured it out and it compiled.now ClassNotFound ar runtime:
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn
Researching the right runners package...Thanks 1000000000000 Thomas...hope I
will see reasonable results at least just to convince my own team why we get
different results at different runs for a GEEAD REASON..Cheers
From: Thomas Groh <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Monday, August 8, 2016 5:26 PM
Subject: Re: Is Beam pipeline runtime behavior inconsistent?
Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))
On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <[email protected]> wrote:
Hi Thomas,I used the following & get a compilation error:"The method
apply(PTransform<? super PCollection<Iterable<String>>, OutputT>) in the type
PCollection<Iterable<String>> is not applicable for the arguments (new
DoFn<Iterable<String>,String>( ){})"
PCollection<String> kafkarecords = p.apply(KafkaIO.read().
withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder(
StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()).
apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))
.triggering(AfterWatermark. pastEndOfWindow()).
withAllowedLateness(Duration. ZERO) .discardingFiredPanes());
kafkarecords.apply(WithKeys.< Integer, String>of(1))
.apply(GroupByKey.<Integer, String>create()) .apply(Values.<Iterable<
String>>create()) .apply(new DoFn<Iterable<String>, String>() {......
I compared with some GroupByKey examples ...nothing that matched it.Should I
simplify the KafkaIO() call to avoid this compilation error?Thanks for your
help.Amir- From: Thomas Groh <[email protected]>
To: [email protected] ; amir bahmanyari <[email protected]>
Sent: Monday, August 8, 2016 3:50 PM
Subject: Re: Is Beam pipeline runtime behavior inconsistent?
You would performance no better than single-threaded behavior if you group
everything into a single key, hence why this approach is strongly not
recommended. You can still get continuous output, depending on the triggering,
but you lose all of scaling benefits of running a pipeline as opposed to a
simple Java program, plus may incur some additional overhead.
To enforce this sort of threading you would do something among the lines of:
kafkarecords.apply(WithKeys.< Integer, String>of(1))
.apply(GroupByKey.<Integer, String>create()) .apply(Values.<Iterable<
String>>create()) .apply(new DoFn<Iterable<String>, String>() {...});
Where the DoFn unrolls its input and on each element applies the processing.
On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <[email protected]> wrote:
Thanks so much Thomas. Fantastic answer & great learning about whats really
going on underneath the hood.Have a question on your suggestion: "To do so, you
would key the inputs to a single static key and apply a GroupByKey, running the
processing method on the output Iterable produced by the GroupByKey"...Wouldn't
doing such defeats the "real-time Streaming" objectives?To me the above leads
to a simulation of a simple single threaded java process but its executing in a
massively parallel infrastructure in a"fancy" way :-)Is there an example that
demonstrates how to actually implement your suggestion above without any hidden
loopholes pls? I can at least try it and see how far it gets for R&D purposes &
share the results with the community.Cheers+have a wonderful day.
From: Thomas Groh <[email protected]>
To: [email protected] ; amir bahmanyari <[email protected]>
Sent: Monday, August 8, 2016 1:44 PM
Subject: Re: Is Beam pipeline runtime behavior inconsistent?
There's no way to guarantee that exactly one record is processed at a time.
This is part of the design of ParDo to work efficiently across multiple
processes and machines[1], where multiple instances of a DoFn must exist in
order for progress to be made in a timely fashion. This includes processing the
same element across multiple machines at the same time, with only one of the
results being available in the output PCollection, as well as retries of failed
elements.
A runner is required to interact with a DoFn instance in a single-threaded
manner - however, it is permitted to have multiple different DoFn instances
active within a single process and across processes at any given time (for the
same reasons as above). There's no support in the Beam model to restrict this
type of execution. We do not encourage sharing objects between DoFn instances,
and any shared state must be accessed in a thread-safe manner, and
modifications to shared state should be idempotent, as otherwise retries and
speculative execution may cause that state to be inconsistent. A DoFn will be
reused for multiple elements across a single bundle, and may be reused across
multiple bundles - if you require the DoFn to be "fresh" per element, it should
perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing
to a single element at a time would be to group all of the elements to a single
key. Note that this will not solve the problem in all cases, as a runner is
permitted to execute the group of elements multiple times so long as it only
takes one completed bundle as the result, and additionally this removes the
ability of the runner to balance work and introduces a performance bottleneck.
To do so, you would key the inputs to a single static key and apply a
GroupByKey, running the processing method on the output Iterable produced by
the GroupByKey (directly; expanding the input iterable in a separate
PCollection allows a runner to rebalance the elements, which will reintroduce
parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/
sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <[email protected]> wrote:
Hi Thomas,Thanks so much for your response. Here are answers to your
questions.You have a specific collection of records stored in Kafka. You run
your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the
records. I have confirmed that I dont get duplicates from Kafka. However,for
some reason, certain parts of my code execute beyond the actual number of
expected number of records, and subsequently produce extra resulting data. I
tried playing with the Triggering. Stretching the window interval,
DiscardingFiredPanes etc. all kinds of modes.Same. How can I guarantee that
one record at a time executes in one unique instance of the inner class
object?I have all the shared objects synchronized and am using Java concurrent
hashmaps. How can I guarantee synchronized operations amongst "parallel
pipelines"? Analogous to multiple threads accessing a shared object and trying
to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords =
p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics(
topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration.
standardMinutes(1))) .triggering(AfterWatermark. pastEndOfWindow()).
withAllowedLateness(Duration. ZERO) .discardingFiredPanes());
kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String,
String>() {.//I expect one record at a time to one object
here------------------------------ ------------------------------
------------------------------ ------------------------------
-----------------------
Have you confirmed that you're getting duplicate records via other library
transforms (such as applying Count.globally() to k afkarecords)?==>>No
duplicates from Kafka.------------------------------
------------------------------ ------------------------------
------------------------------ -----------------------Additionally, I'm not
sure what you mean by "executes till a record lands on method"==>>Sorry for my
confusing statement. Like I mentioned above, I expect each record coming from
Kafka gets assigned to one instance of the inner class and therefore one
instance of the pipeline executed it in parallel with others executing their
own unique records.
------------------------------ ------------------------------
------------------------------ ------------------------------
-----------------------
Additionally additionally, is this reproducible if you execute with the
DirectRunner? ==>>I have not tried DirectRunner. Should I?
Thanks so much Thomas.
From: Thomas Groh <[email protected]>
To: [email protected] ; amir bahmanyari <[email protected]>
Sent: Monday, August 8, 2016 11:43 AM
Subject: Re: Is Beam pipeline runtime behavior inconsistent?
Just to make sure I understand the problem:
You have a specific collection of records stored in Kafka. You run your
pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library
transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on
method"
Additionally additionally, is this reproducible if you execute with the
DirectRunner?
On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]> wrote:
Hi Colleagues,I refrained from posting this email before completing thorough
testing.I think I did.My core code works perfect & produces the expect result
every single time without wrapping it with Beam KafkaIO to receive the
data.Without KafkaIO, it receives the records from a flat data file. I repeated
it and it always produced the right result.With including a Beam KarkaIO and
embedding exact same code in a anonymous class running Beam pipelines, I get a
different result every time I rerun it.Below is the snippet from where KafkaIO
executes till a record lands on method.Kafka sends precise number of records.
No duplicates. all good.While executing in Beam, when the records are finished
& I expect a correct result, it always produces something different. Different
in different runs.I appreciate shedding light on this issue. And thanks for
your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {
// Create Beam Options for the Flink Runner. FlinkPipelineOptions options =
PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming
engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is
a Streaming process (as opposed to Batch=false) options.setStreaming(true);
//Create the DAG pipeline for parallel processing of independent LR records
Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as
"lroad" List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read().
withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder(
StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()).
apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))
.triggering(AfterWatermark. pastEndOfWindow()).
withAllowedLateness(Duration. ZERO) .accumulatingFiredPanes());
kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String,
String>() {
public void processElement(ProcessContext ctx) throws
Exception {
My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class