Re: Error restoring Flink checkpoint

2020-06-23 Thread Maximilian Michels
> Yes, I agree that serializing coders into the checkpoint creates problems. 
> I'm wondering whether it is possible to serialize the coder URN + args 
> instead.

I wish that was possible but Beam does not have a dedicated interface to
snapshot serializers. It is not possible to restore serializers using
just their coder arguments because the coder arguments do not supply all
dependencies to recreate the serializers. Note that this may work for a
subset of coders, e.g. portable coders but not in the general case.

> Anyway, I think the right fix should be move away Beam's
> CoderTypeSerializer from Flink's TypeSerializerConfigSnapshot and use
> any implementation of TypeSerializerSnapshot which doesn't use Java
> serialization in order to serialize Flink's TypeSerializer (which is
> wrapping Beam coder) inside a checkpoint.

Not possible because Flink currently needs the old serializer to be
present during restoring from a checkpoint. It would be nice to just use
the newest serializer but there is code in Flink which assumes that the
old serializer can be loaded before the new one is available. Since Beam
does not have a way to snapshot coders, that leaves us no other choice
than to serialize the entire coder.
 > Hi again, just replying here in case this could be useful for someone
> as using Flink checkpoints on Beam is not realiable at all right
> now...

That is not true. You will always have problems when you change your
serializers, even with plain Flink. It is just that Flink has taken a
great deal to hide some of the complexity.

Generally speaking, whenever you change your coder (and that includes
changing the schema that the coder uses because your runtime classes
change), you currently need to create a new coder but leave the existing
one in place. We need to have both the old and the new serializer for
the state migration to work correctly.

A simpler solution would be to just use the newest serializer, which
wouldn't allow for a proper migration. However, that is currently not
possible with Flink because the old serializer will be requested during
restoring.

I hope that sheds some light on the topic. I'll probably post more
details in a follow-up.

Cheers,
Max

On 23.06.20 08:24, Ivan San Jose wrote:
> I don't really know, my knowledge about Beam source code is not so
> deep, but I managed to modify AvroCoder in order to store a string
> containing class name (and class parameters in case it was a
> parametrized class) instead of references to Class. But, as I said,
> AvroCoder is using AVRO's ReflectData in order to infer schemas from
> POJOs and it doesn't work well with generics (may be if you explicitly
> provide the schema for the POJOs will work fine, but that was not an
> option for us).
> 
> Anyway, I think the right fix should be move away Beam's
> CoderTypeSerializer from Flink's TypeSerializerConfigSnapshot and use
> any implementation of TypeSerializerSnapshot which doesn't use Java
> serialization in order to serialize Flink's TypeSerializer (which is
> wrapping Beam coder) inside a checkpoint.
> 
> On Mon, 2020-06-22 at 23:02 -0700, Reuven Lax wrote:
>> Yes, I agree that serializing coders into the checkpoint creates
>> problems. I'm wondering whether it is possible to serialize the coder
>> URN + args instead.
>>
>> On Mon, Jun 22, 2020 at 11:00 PM Ivan San Jose <
>> isanj...@theworkshop.com> wrote:
>>> Hi again, just replying here in case this could be useful for
>>> someone
>>> as using Flink checkpoints on Beam is not realiable at all right
>>> now...
>>> Even I removed class references to the serialized object in
>>> AvroCoder,
>>> finally I couldn't make AvroCoder work as it is inferring schema
>>> using
>>> ReflectData class (through reflection Java API) and that class has
>>> a
>>> lot of problems when dealing with classes that contain generics.
>>> So we ended up using a Beam Kryo extension available in beam-sdks-
>>> java-
>>> extensions-kryo maven package and it works like a charm. If you use
>>> Kryo's CompatibleFieldSerializer, you can add fields to your POJOs
>>> and
>>> recover from saved checkpoints containing old POJO versions.
>>>
>>> On Mon, 2020-06-08 at 14:37 +, Ivan San Jose wrote:
 Hi Reuven, as far I've understood, Apache Beam coders are wrapped
 into
 Flink's TypeSerializers, so they are being serialized as part of
>>> the
 chceckpoint according to

>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95

 The problem was that this is done using Java serialization, and,
>>> as
 Beam Coders (at least ProtoCoder and AvroCoder) have references
>>> to
 the
 POJO class to be encoded/decoded, if you evolve that POJO (even
 following AVRO/Protobuf backward compatibility rules) then you
>>> Flink
 will raise a java.io.InvalidClassException when trying to restore
>>> the
 checkpoint because is using Java serializatio

Re: Flink/Portable Runner error on AWS EMR

2020-06-23 Thread Maximilian Michels
Hey Jesse,

Could you share the context of the error? Where does it occur? In the
client code or on the cluster?

Cheers,
Max

On 22.06.20 18:01, Jesse Lord wrote:
> I am trying to run the wordcount quickstart example on a flink cluster
> on AWS EMR. Beam version 2.22, Flink 1.10.
> 
>  
> 
> I get the following error:
> 
>  
> 
> ERROR:root:java.util.ServiceConfigurationError:
> com.fasterxml.jackson.databind.Module: Provider
> com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype
> 
>  
> 
> This happens with both the portable runner (using python SDK) and the
> classic flink runner using the quickstart maven project.
> 
>  
> 
> I think this error relates to this issue:
> https://issues.apache.org/jira/browse/BEAM-9239. Based on the comments
> from this issue I tried adjusting parameters for whether flink
> prioritizes loading child (user) jars or parent (flink) jars in the
> classpath but it did not resolve the issue.
> 
>  
> 
> Looking for any suggestions that might help as a workaround and
> wondering if I should open a new jira issue or only add my comment to
> the existing issue (which I have already done).
> 
>  
> 
> Thanks,
> 
> Jesse
> 


Re: Flink/Portable Runner error on AWS EMR

2020-06-23 Thread Jesse Lord
Hi Max,

The error message shows up in the flink web UI, so I think it must be reaching 
the cluster. 

For the portable runner the error is listed in the docker container output as 
well but I assume that is just receiving the error message from the flink 
cluster.

Thanks,
Jesse

On 6/23/20, 8:11 AM, "Maximilian Michels"  wrote:

Hey Jesse,

Could you share the context of the error? Where does it occur? In the
client code or on the cluster?

Cheers,
Max

On 22.06.20 18:01, Jesse Lord wrote:
> I am trying to run the wordcount quickstart example on a flink cluster
> on AWS EMR. Beam version 2.22, Flink 1.10.
> 
>  
> 
> I get the following error:
> 
>  
> 
> ERROR:root:java.util.ServiceConfigurationError:
> com.fasterxml.jackson.databind.Module: Provider
> com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype
> 
>  
> 
> This happens with both the portable runner (using python SDK) and the
> classic flink runner using the quickstart maven project.
> 
>  
> 
> I think this error relates to this issue:
> 
https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-9239&data=02%7C01%7Cjlord%40vectra.ai%7C1b00ebd9f3e84f55d76708d81787c205%7Ca6cc66bcf41945c2a9c28ff4ab685f2d%7C1%7C0%7C637285219131055724&sdata=GqQ6xx%2Fuowa6DWy%2FCvdDw4IA4C%2FvnM0Yaj6%2Fpqnnric%3D&reserved=0.
 Based on the comments
> from this issue I tried adjusting parameters for whether flink
> prioritizes loading child (user) jars or parent (flink) jars in the
> classpath but it did not resolve the issue.
> 
>  
> 
> Looking for any suggestions that might help as a workaround and
> wondering if I should open a new jira issue or only add my comment to
> the existing issue (which I have already done).
> 
>  
> 
> Thanks,
> 
> Jesse
> 



Re: KafkaIO Exactly once vs At least Once

2020-06-23 Thread Alexey Romanenko

> On 23 Jun 2020, at 07:49, Eleanore Jin  wrote:
> 
> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way I 
> understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic, 
> "mySlinkGroupId"), reading from your response, do I need additionally 
> configure KafkaProducer property enable.idempotence=true, or I only need to 
> configure this property?

No, you don’t need to do that. New KafkaProducer will be created with this 
option set in KafkaExactlyOnceSink [1].

> So can you please correct me if the above settings is not the optimal and if 
> there is anyway to reduce the latency by introducing checkpointing for EOS?

Your settings look fine for me. You probably could play with checkpoint 
intervals (why it’s 10 secs?) to reduce a latency.


[1] 
https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711


> 
> 
> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko  > wrote:
> I think you don’t need to enable EOS in this case since KafkaIO has a 
> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by 
> all runners) and it relies on setting “enable.idempotence=true” for 
> KafkaProducer.
> I’m not sure that you can achieve “at least once” semantics with current 
> KafkaIO implementation.
> 
>> On 16 Jun 2020, at 17:16, Eleanore Jin > > wrote:
>> 
>> Hi All, 
>> 
>> I previously asked a few questions regarding enable EOS (exactly once 
>> semantics) please see below.
>> 
>> Our Beam pipeline uses KafkaIO to read from source topic, and then use 
>> KafkaIO to publish to sink topic.
>> 
>> According to Max's answer to my previous questions, enable EOS with KafkaIO 
>> will introduce latency, 
>> as only after checkpoints of all message within the checkpoint interval, 
>> then the KakfaIO.ExactlyOnceWriter
>> processElement method will be called. So the latency depends on the 
>> checkpoint interval.
>> 
>> I just wonder if I relax to At Least Once, do I still need to enable EOS on 
>> KafkaIO? Or it is not required?
>> If not, can you please provide some instruction how should it be done?
>> 
>> Thanks a lot!
>> Eleanore
>> 
>> > Thanks for the response! the reason to setup the state backend is to
>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>> > code and this PR > > >, can
>> > you please help me clarify my understanding?
>> >
>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>> > EOS, ExactlyOnceWriter processElement method is annotated
>> > with @RequiresStableInput, so all the messages will be cached
>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>> > messages will be processed by ExactlyOnceWriter?
>> 
>> That's correct.
>> 
>> >
>> > 2. Upon checkpoint, will those messages cached by
>> > KeyedBufferingEleementsHandler also checkpointed?
>> 
>> Yes, the buffered elements will be checkpointed.
>> 
>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>> > stream processing, the delay is based on the checkpoint interval? How to
>> > reduce the latency while still have EOS guarantee?
>> 
>> Indeed, the checkpoint interval and the checkpoint duration limits the
>> latency. Given the current design and the guarantees, there is no other
>> way to influence the latency.
>> 
>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>> > checkpoint successfully, the checkpointed offset will be committed back
>> > to kafka, but if this operation does not finish successfully, and then
>> > the job gets cancelled/stopped, and re-submit the job again (with the
>> > same consumer group for source topics, but different jobID), then it is
>> > possible duplicated processing still exists? because the consumed offset
>> > is not committed back to kafka?
>> 
>> This option is for the Kafka consumer. AFAIK this is just a convenience
>> method to commit the latest checkpointed offset to Kafka. This offset is
>> not used when restoring from a checkpoint. However, if you don't restore
>> from a checkpoint, you can resume from that offset which might be
>> convenient or not, depending on your use case.
>> 
>> 
> 



Re: Continuous Read pipeline

2020-06-23 Thread Chamikara Jayalath
On Fri, Jun 12, 2020 at 12:52 AM TAREK ALSALEH 
wrote:

> Hi,
>
> I am using the Python SDK with Dataflow as my runner. I am looking at
> implementing a streaming pipeline that will continuously monitor a GCS
> bucket for incoming files and depending on the regex of the file, launch a
> set of transforms and write the final output back to parquet for each file
> I read.
>
> Excuse me as I am new to beam and specially the streaming bit as I have
> done the above in batch mode but we are limited by dataflow allowing only
> 100 jobs per 24 hours.
>
> I was looking into a couple of options:
>
>1. Have a beam pipeline running in streaming mode listening to a
>pubsub topic. Once a file lands in GCS a message is published. I am
>planning to use the WriteToFiles transform but it seems that there is
>a limitation where :"it currently does not have support for multiple
>trigger firings on the same window."
>   1. So what windowing strategy and trigger should I use?
>   2. Which transform should I use since there are two ReadFromPubSub
>   transforms one in the io.gcp Subpackages and another one in the
>   external.gcp?
>
>
>1. Using the TextIO.Read
>
> 
>  and
>the watchForNewFiles
>
> 
>  from
>the Java SDK within a python pipeline as I understand there is some support
>for cross-language transforms?
>
>
Sounds like watchForNewFiles transform is exactly what you are looking for
but we don't have that for Python SDK yet.
Also above transforms haven't been tested with cross-language transforms
framework yet and we don't have cross-language Python wrappers for these
yet.

Probably best solution for Python SDK today will be to use some sort of a
GCS to Cloud Pub/Sub mapping to publish events regarding new files and read
these files using  a Beam pipeline that reads from Cloud Pub/Sub. For
example following (I haven't tested this).
https://cloud.google.com/storage/docs/pubsub-notifications

For Dataflow use Pub/Sub connector in io/gcp submodule.

Thanks,
Cham


>1.
>
>
> Regards,
> Tarek
>


Re: KafkaIO Exactly once vs At least Once

2020-06-23 Thread Eleanore Jin
Hi Alexey,

Thanks a lot for the information! I will give it a try.

Regarding the checkpoint intervals, I think the Flink community suggested
something between 3-5 minutes, I am not sure yet if the checkpoint interval
can be in milliseconds? Currently, our beam pipeline is stateless, there is
no other operator state or user defined state.

Thanks a lot!
Eleanore

On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko 
wrote:

>
> On 23 Jun 2020, at 07:49, Eleanore Jin  wrote:
>
> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
> "mySlinkGroupId"), reading from your response, do I need additionally
> configure KafkaProducer property enable.idempotence=true, or I only need to
> configure this property?
>
>
> No, you don’t need to do that. New KafkaProducer will be created with this
> option set in KafkaExactlyOnceSink [1].
>
> So can you please correct me if the above settings is not the optimal and
> if there is anyway to reduce the latency by introducing checkpointing for
> EOS?
>
>
> Your settings look fine for me. You probably could play with checkpoint
> intervals (why it’s 10 secs?) to reduce a latency.
>
>
> [1]
> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>
>
>
>
> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> I think you don’t need to enable EOS in this case since KafkaIO has a
>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>> all runners) and it relies on setting “enable.idempotence=true” for
>> KafkaProducer.
>> I’m not sure that you can achieve “at least once” semantics with current
>> KafkaIO implementation.
>>
>> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
>>
>> Hi All,
>>
>> I previously asked a few questions regarding enable EOS (exactly once
>> semantics) please see below.
>>
>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>> KafkaIO to publish to sink topic.
>>
>> According to Max's answer to my previous questions, enable EOS with
>> KafkaIO will introduce latency,
>> as only after checkpoints of all message within the checkpoint interval,
>> then the KakfaIO.ExactlyOnceWriter
>> processElement method will be called. So the latency depends on the
>> checkpoint interval.
>>
>> I just wonder if I relax to At Least Once, do I still need to enable EOS
>> on KafkaIO? Or it is not required?
>> If not, can you please provide some instruction how should it be done?
>>
>> Thanks a lot!
>> Eleanore
>>
>> > Thanks for the response! the reason to setup the state backend is to
>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>> > code and this PR , can
>> > you please help me clarify my understanding?
>> >
>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>> > EOS, ExactlyOnceWriter processElement method is annotated
>> > with @RequiresStableInput, so all the messages will be cached
>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>> > messages will be processed by ExactlyOnceWriter?
>>
>> That's correct.
>>
>> >
>> > 2. Upon checkpoint, will those messages cached by
>> > KeyedBufferingEleementsHandler also checkpointed?
>>
>> Yes, the buffered elements will be checkpointed.
>>
>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>> > stream processing, the delay is based on the checkpoint interval? How to
>> > reduce the latency while still have EOS guarantee?
>>
>> Indeed, the checkpoint interval and the checkpoint duration limits the
>> latency. Given the current design and the guarantees, there is no other
>> way to influence the latency.
>>
>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>> > checkpoint successfully, the checkpointed offset will be committed back
>> > to kafka, but if this operation does not finish successfully, and then
>> > the job gets cancelled/stopped, and re-submit the job again (with the
>> > same consumer group for source topics, but different jobID), then it is
>> > possible duplicated processing still exists? because the consumed offset
>> > is not committed back to kafka?
>>
>> This option is for the Kafka consumer. AFAIK this is just a convenience
>> method to commit the latest checkpointed offset to Kafka. This offset is
>> not used when restoring from a checkpoint. However, if you don't restore
>> from a checkpoint, you can resume from that offset which might be
>> convenient or not, depending on your use case.
>>
>>
>>
>


Re: Designing an existing pipeline in Beam

2020-06-23 Thread Luke Cwik
Beam is really about parallelizing the processing. Using a single DoFn that
does everything is fine as long as the DoFn can process elements in
parallel (e.g. upstream source produces lots of elements). Composing
multiple DoFns is great for re-use and testing but it isn't strictly
necessary. Also, Beam doesn't support back edges in the processing graph so
all data flows in one direction and you can't have a cycle. This only
allows for map 1 to producie map 2 which then produces map 3 which is then
used to update map 1 if all of that logic is within a single DoFn/Transform
or you create a cycle using an external system such as write to Kafka topic
X and read from Kafka topic X within the same pipeline or update a database
downstream from where it is read. There is a lot of ordering complexity and
stale data issues whenever using an external store to create a cycle though.

On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Another way to put this question is, how do we write a beam pipeline for
> an existing pipeline (in Java) that has a dozen of custom objects and you
> have to work with multiple HashMaps of those custom objects in order to
> transform it. Currently, I am writing a beam pipeline by using the same
> Custom objects, getters and setters and HashMap *but
> inside a DoFn*. Is this the optimal way or does Beam offer something else?
>
> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi Luke,
>>
>> We can say Map 2 as a kind of a template using which you want to enrich
>> data in Map 1. As I mentioned in my previous post, this is a high level
>> scenario.
>>
>> All these logic are spread across several classes (with ~4K lines of code
>> in total). As in any Java application,
>>
>> 1. The code has been modularized with multiple method calls
>> 2. Passing around HashMaps as argument to each method
>> 3. Accessing the attributes of the custom object using getters and
>> setters.
>>
>> This is a common pattern in a normal Java application but I have not seen
>> such an example of code in Beam.
>>
>>
>> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>>
>>> Who reads map 1?
>>> Can it be stale?
>>>
>>> It is unclear what you are trying to do in parallel and why you wouldn't
>>> stick all this logic into a single DoFn / stateful DoFn.
>>>
>>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
 Hello Everyone,

 I am in the process of implementing an existing pipeline (written using
 Java and Kafka) in Apache Beam. The data from the source stream is
 contrived and had to go through several steps of enrichment using REST API
 calls and parsing of JSON data. The key
 transformation in the existing pipeline is in shown below (a super high
 level flow)

 *Method A*
 Calls *Method B*
   Creates *Map 1, Map 2*
 Calls *Method C*
  Read *Map 2*
  Create *Map 3*
 *Method C*
  Read *Map 3* and
  update *Map 1*

 The Map we use are multi-level maps and I am thinking of having
 PCollections for each Maps and pass them as side inputs in a DoFn wherever
 I have transformations that need two or more Maps. But there are certain
 tasks which I want to make sure that I am following right approach, for
 instance updating one of the side input maps inside a DoFn.

 These are my initial thoughts/questions and I would like to get some
 expert advice on how we typically design such an interleaved transformation
 in Apache Beam. Appreciate your valuable insights on this.

 --
 Thanks,
 Praveen K Viswanathan

>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Designing an existing pipeline in Beam

2020-06-23 Thread Praveen K Viswanathan
Hi Luke - Thanks for the explanation. The limitation due to directed graph
processing and the option of external storage clears most of the questions
I had with respect to designing this pipeline. I do have one more scenario
to clarify on this thread.

If I had a certain piece of logic that I had to use in more than one DoFns
how do we do that. In a normal Java application, we can put it as a
separate method and call it wherever we want. Is it possible to replicate
something like that in Beam's DoFn?

On Tue, Jun 23, 2020 at 3:47 PM Luke Cwik  wrote:

> Beam is really about parallelizing the processing. Using a single DoFn
> that does everything is fine as long as the DoFn can process elements in
> parallel (e.g. upstream source produces lots of elements). Composing
> multiple DoFns is great for re-use and testing but it isn't strictly
> necessary. Also, Beam doesn't support back edges in the processing graph so
> all data flows in one direction and you can't have a cycle. This only
> allows for map 1 to producie map 2 which then produces map 3 which is then
> used to update map 1 if all of that logic is within a single DoFn/Transform
> or you create a cycle using an external system such as write to Kafka topic
> X and read from Kafka topic X within the same pipeline or update a database
> downstream from where it is read. There is a lot of ordering complexity and
> stale data issues whenever using an external store to create a cycle though.
>
> On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Another way to put this question is, how do we write a beam pipeline for
>> an existing pipeline (in Java) that has a dozen of custom objects and you
>> have to work with multiple HashMaps of those custom objects in order to
>> transform it. Currently, I am writing a beam pipeline by using the same
>> Custom objects, getters and setters and HashMap *but
>> inside a DoFn*. Is this the optimal way or does Beam offer something
>> else?
>>
>> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hi Luke,
>>>
>>> We can say Map 2 as a kind of a template using which you want to enrich
>>> data in Map 1. As I mentioned in my previous post, this is a high level
>>> scenario.
>>>
>>> All these logic are spread across several classes (with ~4K lines of
>>> code in total). As in any Java application,
>>>
>>> 1. The code has been modularized with multiple method calls
>>> 2. Passing around HashMaps as argument to each method
>>> 3. Accessing the attributes of the custom object using getters and
>>> setters.
>>>
>>> This is a common pattern in a normal Java application but I have not
>>> seen such an example of code in Beam.
>>>
>>>
>>> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>>>
 Who reads map 1?
 Can it be stale?

 It is unclear what you are trying to do in parallel and why you
 wouldn't stick all this logic into a single DoFn / stateful DoFn.

 On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
 harish.prav...@gmail.com> wrote:

> Hello Everyone,
>
> I am in the process of implementing an existing pipeline (written
> using Java and Kafka) in Apache Beam. The data from the source stream is
> contrived and had to go through several steps of enrichment using REST API
> calls and parsing of JSON data. The key
> transformation in the existing pipeline is in shown below (a super
> high level flow)
>
> *Method A*
> Calls *Method B*
>   Creates *Map 1, Map 2*
> Calls *Method C*
>  Read *Map 2*
>  Create *Map 3*
> *Method C*
>  Read *Map 3* and
>  update *Map 1*
>
> The Map we use are multi-level maps and I am thinking of having
> PCollections for each Maps and pass them as side inputs in a DoFn wherever
> I have transformations that need two or more Maps. But there are certain
> tasks which I want to make sure that I am following right approach, for
> instance updating one of the side input maps inside a DoFn.
>
> These are my initial thoughts/questions and I would like to get some
> expert advice on how we typically design such an interleaved 
> transformation
> in Apache Beam. Appreciate your valuable insights on this.
>
> --
> Thanks,
> Praveen K Viswanathan
>

>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Webinar: Distributed Processing for Machine Learning Production Pipelines with Apache Beam

2020-06-23 Thread Aizhamal Nurmamat kyzy
Hi Beamsters!

We have scheduled the last webinar on Beam Learning Month! Robert Crowe and
Reza Rokni from Google will share how Beam is used to process large scale
data for ML pipelines. If the subject is interesting to you, please
register via this link:
https://learn.xnextcon.com/event/eventdetails/W20061010

It starts at *9.00am PDT/12:00pm EST/6:00 PM CEST* this Thursday, June 25.
It's a little early for the west coast habitants, so don't forget to grab
your coffee!

See you there!
Aizhamal


Re: Designing an existing pipeline in Beam

2020-06-23 Thread Luke Cwik
You can apply the same DoFn / Transform instance multiple times in the
graph or you can follow regular development practices where the common code
is factored into a method and two different DoFn's invoke it.

On Tue, Jun 23, 2020 at 4:28 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke - Thanks for the explanation. The limitation due to directed graph
> processing and the option of external storage clears most of the questions
> I had with respect to designing this pipeline. I do have one more scenario
> to clarify on this thread.
>
> If I had a certain piece of logic that I had to use in more than one DoFns
> how do we do that. In a normal Java application, we can put it as a
> separate method and call it wherever we want. Is it possible to replicate
> something like that in Beam's DoFn?
>
> On Tue, Jun 23, 2020 at 3:47 PM Luke Cwik  wrote:
>
>> Beam is really about parallelizing the processing. Using a single DoFn
>> that does everything is fine as long as the DoFn can process elements in
>> parallel (e.g. upstream source produces lots of elements). Composing
>> multiple DoFns is great for re-use and testing but it isn't strictly
>> necessary. Also, Beam doesn't support back edges in the processing graph so
>> all data flows in one direction and you can't have a cycle. This only
>> allows for map 1 to producie map 2 which then produces map 3 which is then
>> used to update map 1 if all of that logic is within a single DoFn/Transform
>> or you create a cycle using an external system such as write to Kafka topic
>> X and read from Kafka topic X within the same pipeline or update a database
>> downstream from where it is read. There is a lot of ordering complexity and
>> stale data issues whenever using an external store to create a cycle though.
>>
>> On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Another way to put this question is, how do we write a beam pipeline for
>>> an existing pipeline (in Java) that has a dozen of custom objects and you
>>> have to work with multiple HashMaps of those custom objects in order to
>>> transform it. Currently, I am writing a beam pipeline by using the same
>>> Custom objects, getters and setters and HashMap *but
>>> inside a DoFn*. Is this the optimal way or does Beam offer something
>>> else?
>>>
>>> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
 Hi Luke,

 We can say Map 2 as a kind of a template using which you want to enrich
 data in Map 1. As I mentioned in my previous post, this is a high level
 scenario.

 All these logic are spread across several classes (with ~4K lines of
 code in total). As in any Java application,

 1. The code has been modularized with multiple method calls
 2. Passing around HashMaps as argument to each method
 3. Accessing the attributes of the custom object using getters and
 setters.

 This is a common pattern in a normal Java application but I have not
 seen such an example of code in Beam.


 On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:

> Who reads map 1?
> Can it be stale?
>
> It is unclear what you are trying to do in parallel and why you
> wouldn't stick all this logic into a single DoFn / stateful DoFn.
>
> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> I am in the process of implementing an existing pipeline (written
>> using Java and Kafka) in Apache Beam. The data from the source stream is
>> contrived and had to go through several steps of enrichment using REST 
>> API
>> calls and parsing of JSON data. The key
>> transformation in the existing pipeline is in shown below (a super
>> high level flow)
>>
>> *Method A*
>> Calls *Method B*
>>   Creates *Map 1, Map 2*
>> Calls *Method C*
>>  Read *Map 2*
>>  Create *Map 3*
>> *Method C*
>>  Read *Map 3* and
>>  update *Map 1*
>>
>> The Map we use are multi-level maps and I am thinking of having
>> PCollections for each Maps and pass them as side inputs in a DoFn 
>> wherever
>> I have transformations that need two or more Maps. But there are certain
>> tasks which I want to make sure that I am following right approach, for
>> instance updating one of the side input maps inside a DoFn.
>>
>> These are my initial thoughts/questions and I would like to get some
>> expert advice on how we typically design such an interleaved 
>> transformation
>> in Apache Beam. Appreciate your valuable insights on this.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

 --
 Thanks,
 Praveen K Viswanathan

>>>
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --