Sorry, I know this is not the Kafka forum. Sorry about that. But I thought I 
would share this as the root cause of the inconsistency I have described below.
Ok. I see its Kafka that doesn't send records to KafkaIO() in the same exact 
order as its being sent to it.I proved it with a stand alone consumer several 
times and it shows.As per Kafka docs suggestions, I recreated a new topic with 
number of partitions=1 which Kafka docs say that guarantees exact order in a 
single partition.It still doesn't send them in the right order even with the 
number of replications being just 1 i.e. no parallelism at all.
My records MUST come in order as they have been sent. So, at the moment seems 
like KafkaIO() is not an option due to the above Kafka sentiments.I know 
TextIO(), the poor man's way, works. Proven/tested fact. 
Any other suggestions is appreciated.Thanks+regards,Amir-

      From: Thomas Groh <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Tuesday, August 9, 2016 1:22 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
ConcurrentHashMaps can be interacted with in a way that does not preserve the 
intended semantics. If you are using exclusively atomic mutation operations 
(putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can ensure that the 
mutation semantics are obtained; however, using a ConcurrentMap purely like a 
map can cause Time-of-check-time-of-use errors. Otherwise, ConcurrentMaps 
provide happens-before and visibility guarantees only.
For the second question, this is mainly about interacting with mutable 
per-element state - if you interact with, for example, mutable instance fields 
that have a base and a current state, the base state must be reset per-element. 
It doesn't sound like this is your problem.
On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <[email protected]> wrote:

Hi Thomas,I spent time to digest all of this. I think I understand it to a good 
extent. The only hang up I still have is controlling the execution trajectory 
with persisting state which you say its not guaranteed in Beam.Have some 
further questions Q below & appreciate your valuable time to respond to them. I 
reiterated your statements in " " for quick reference above them.
"We do not encourage sharing objects between DoFn instances, and any shared 
state must be accessed in a thread-safe manner, and modifications to shared 
state should be idempotent, as otherwise retries and speculative execution may 
cause that state to be inconsistent." Q: I persisted state in (single instance) 
Redis. I got varying result at each run. I then replaced Redis with java 
(static) ConcurrentHashMaps which are automatically thread safe. Interesting 
enough, the very first run after this change produced precise result & I 
thought I GOT IT! Re-run, and I got varying results again till this moment I am 
typing this email. How would you suggest to "any shared state must be accessed 
in a thread-safe manner" different than using Concurrent HashMaps?

"A DoFn will be reused for multiple elements across a single bundle, and may be 
reused across multiple bundles - if you require the DoFn to be "fresh" per 
element, it should perform any required setup at the start of the 
ProcessElement method."
Q: What do you suggest to "it should perform any required setup at the start of 
the ProcessElement method."?I can think of persisting the DoFn Obj's HashCode 
at the Object class level (every-time ProcessElement is invoked)  & compare it 
later on for uniqueness with Object's equals(Obj). It gets a little hairy when 
"parallelism" manifests in execution I know.I appreciate your suggestions.

Thanks+have a great day.Amir

      From: Thomas Groh <[email protected]>
 To: [email protected] ; amir bahmanyari <[email protected]> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. 
This is part of the design of ParDo to work efficiently across multiple 
processes and machines[1], where multiple instances of a DoFn must exist in 
order for progress to be made in a timely fashion. This includes processing the 
same element across multiple machines at the same time, with only one of the 
results being available in the output PCollection, as well as retries of failed 
elements.
A runner is required to interact with a DoFn instance in a single-threaded 
manner - however, it is permitted to have multiple different DoFn instances 
active within a single process and across processes at any given time (for the 
same reasons as above). There's no support in the Beam model to restrict this 
type of execution. We do not encourage sharing objects between DoFn instances, 
and any shared state must be accessed in a thread-safe manner, and 
modifications to shared state should be idempotent, as otherwise retries and 
speculative execution may cause that state to be inconsistent. A DoFn will be 
reused for multiple elements across a single bundle, and may be reused across 
multiple bundles - if you require the DoFn to be "fresh" per element, it should 
perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing 
to a single element at a time would be to group all of the elements to a single 
key. Note that this will not solve the problem in all cases, as a runner is 
permitted to execute the group of elements multiple times so long as it only 
takes one completed bundle as the result, and additionally this removes the 
ability of the runner to balance work and introduces a performance bottleneck. 
To do so, you would key the inputs to a single static key and apply a 
GroupByKey, running the processing method on the output Iterable produced by 
the GroupByKey (directly; expanding the input iterable in a separate 
PCollection allows a runner to rebalance the elements, which will reintroduce 
parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ 
sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <[email protected]> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your 
questions.You have a specific collection of records stored in Kafka. You run 
your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the 
records. I have confirmed that I dont get duplicates from Kafka. However,for 
some reason, certain parts of my code execute beyond the actual number of 
expected number of records, and subsequently produce extra resulting data. I 
tried playing with the Triggering. Stretching the window interval, 
DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that 
one record at a time executes in one unique instance of the inner class 
object?I have all the shared objects synchronized and am using Java concurrent 
hashmaps. How can I guarantee synchronized operations amongst "parallel 
pipelines"? Analogous to multiple threads accessing a shared object and trying 
to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = 
p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( 
topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( 
Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. 
standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). 
withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               
kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, 
String>() {.//I expect one record at a time to one object 
here------------------------------ ------------------------------ 
------------------------------ ------------------------------ 
-----------------------
Have you confirmed that you're getting duplicate records via other library 
transforms (such as applying Count.globally() to k afkarecords)?==>>No 
duplicates from Kafka.------------------------------ 
------------------------------ ------------------------------ 
------------------------------ -----------------------Additionally, I'm not 
sure what you mean by "executes till a record lands on method"==>>Sorry for my 
confusing statement. Like I mentioned above, I expect each record coming from 
Kafka gets assigned to one instance of the inner class and therefore one 
instance of the pipeline executed it in parallel with others executing their 
own unique records.
------------------------------ ------------------------------ 
------------------------------ ------------------------------ 
-----------------------
Additionally additionally, is this reproducible if you execute with the 
DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <[email protected]>
 To: [email protected] ; amir bahmanyari <[email protected]> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your 
pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library 
transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on 
method"
Additionally additionally, is this reproducible if you execute with the 
DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough 
testing.I think I did.My core code works perfect & produces the expect result 
every single time without wrapping it with Beam KafkaIO to receive the 
data.Without KafkaIO, it receives the records from a flat data file. I repeated 
it and it always produced the right result.With including a Beam KarkaIO and 
embedding exact same code in a anonymous class running Beam pipelines, I get a 
different result every time I rerun it.Below is the snippet from where KafkaIO 
executes till a record lands on method.Kafka sends precise number of records. 
No duplicates. all good.While executing in Beam, when the records are finished 
& I expect a correct result, it always produces something different. Different 
in different runs.I appreciate shedding light on this issue.  And thanks for 
your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = 
PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming 
engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is 
a Streaming process (as opposed to Batch=false) options.setStreaming(true); 
//Create the DAG pipeline for parallel processing of independent LR records 
Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as 
"lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). 
withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( 
StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). 
apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))       
    .triggering(AfterWatermark. pastEndOfWindow()). 
withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());             
  kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, 
String>() {                        
                        public void processElement(ProcessContext ctx) throws 
Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



   

Reply via email to