Testing 'Exactly-Once' using Apache Beam + Spark Runner

2020-05-03 Thread Praveen K Viswanathan
Hi All - I am looking to implement and test 'Exactly-Once' semantics using
Apache Beam and Spark Runner (using a locally installed spark). I have come
up with the pipeline but looking for expertise advice on how to test the
scenario in a proper way. Appreciate your inputs.

Thanks.

-- 
Thanks,
Praveen K Viswanathan


Making RPCs in Beam

2020-06-18 Thread Praveen K Viswanathan
Hello Everyone,

In my pipeline I have to make a *single RPC call* as well as a *Batched RPC
call* to fetch data for enrichment. I could not find any reference on how
to make these call within your pipeline. I am still covering my grounds in
Apache Beam and would appreciate if anyone has done this and could share a
sample code or details on how to do this.
-- 
Thanks,
Praveen K Viswanathan


Re: Making RPCs in Beam

2020-06-19 Thread Praveen K Viswanathan
Hi Brian - Thanks for responding to my post. Yes, Kenn's blog helped in
getting an high-level idea but I am more interested in the mechanics of how
to fit in a HTTP call in Beam. To be specific, the *enrichEvents* method in
his blog which making real call to HTTP URLs. Unfortunately, that
information was abstracted in the context of Batched RPC.

Thanks,
Praveen Kumar Viswanathan

On Fri, Jun 19, 2020 at 8:20 AM Brian Hulette  wrote:

> Kenn wrote a blog post showing how to do batched RPCs with the state and
> timer APIs: https://beam.apache.org/blog/timely-processing/
>
> Is that helpful?
>
> Brian
>
> On Thu, Jun 18, 2020 at 5:29 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> In my pipeline I have to make a *single RPC call* as well as a *Batched
>> RPC call* to fetch data for enrichment. I could not find any reference
>> on how to make these call within your pipeline. I am still covering my
>> grounds in Apache Beam and would appreciate if anyone has done this and
>> could share a sample code or details on how to do this.
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: Making RPCs in Beam

2020-06-19 Thread Praveen K Viswanathan
Thanks for the intro to "GroupIntoBatches" Luke. It looks very interesting
and easy for someone to understand when they read the pipeline. I will give
it a try for my batched RPC use case.

Regards,
Praveen Kumar Viswanathan

On Fri, Jun 19, 2020 at 9:24 AM Luke Cwik  wrote:

> I would suggest looking at the GroupIntoBatches[1] transform if you want
> to support batching of elements instead of writing it yourself. Single RPC
> calls should be invoked from the DoFn like you would from a regular program.
>
> 1:
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>
> On Fri, Jun 19, 2020 at 8:20 AM Brian Hulette  wrote:
>
>> Kenn wrote a blog post showing how to do batched RPCs with the state and
>> timer APIs: https://beam.apache.org/blog/timely-processing/
>>
>> Is that helpful?
>>
>> Brian
>>
>> On Thu, Jun 18, 2020 at 5:29 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hello Everyone,
>>>
>>> In my pipeline I have to make a *single RPC call* as well as a *Batched
>>> RPC call* to fetch data for enrichment. I could not find any reference
>>> on how to make these call within your pipeline. I am still covering my
>>> grounds in Apache Beam and would appreciate if anyone has done this and
>>> could share a sample code or details on how to do this.
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>

-- 
Thanks,
Praveen K Viswanathan


Designing an existing pipeline in Beam

2020-06-20 Thread Praveen K Viswanathan
Hello Everyone,

I am in the process of implementing an existing pipeline (written using
Java and Kafka) in Apache Beam. The data from the source stream is
contrived and had to go through several steps of enrichment using REST API
calls and parsing of JSON data. The key
transformation in the existing pipeline is in shown below (a super high
level flow)

*Method A*
Calls *Method B*
  Creates *Map 1, Map 2*
Calls *Method C*
 Read *Map 2*
 Create *Map 3*
*Method C*
 Read *Map 3* and
 update *Map 1*

The Map we use are multi-level maps and I am thinking of having
PCollections for each Maps and pass them as side inputs in a DoFn wherever
I have transformations that need two or more Maps. But there are certain
tasks which I want to make sure that I am following right approach, for
instance updating one of the side input maps inside a DoFn.

These are my initial thoughts/questions and I would like to get some expert
advice on how we typically design such an interleaved transformation in
Apache Beam. Appreciate your valuable insights on this.

-- 
Thanks,
Praveen K Viswanathan


Re: Designing an existing pipeline in Beam

2020-06-22 Thread Praveen K Viswanathan
Hi Luke,

We can say Map 2 as a kind of a template using which you want to enrich
data in Map 1. As I mentioned in my previous post, this is a high level
scenario.

All these logic are spread across several classes (with ~4K lines of code
in total). As in any Java application,

1. The code has been modularized with multiple method calls
2. Passing around HashMaps as argument to each method
3. Accessing the attributes of the custom object using getters and setters.

This is a common pattern in a normal Java application but I have not seen
such an example of code in Beam.


On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:

> Who reads map 1?
> Can it be stale?
>
> It is unclear what you are trying to do in parallel and why you wouldn't
> stick all this logic into a single DoFn / stateful DoFn.
>
> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> I am in the process of implementing an existing pipeline (written using
>> Java and Kafka) in Apache Beam. The data from the source stream is
>> contrived and had to go through several steps of enrichment using REST API
>> calls and parsing of JSON data. The key
>> transformation in the existing pipeline is in shown below (a super high
>> level flow)
>>
>> *Method A*
>> Calls *Method B*
>>   Creates *Map 1, Map 2*
>> Calls *Method C*
>>  Read *Map 2*
>>  Create *Map 3*
>> *Method C*
>>  Read *Map 3* and
>>  update *Map 1*
>>
>> The Map we use are multi-level maps and I am thinking of having
>> PCollections for each Maps and pass them as side inputs in a DoFn wherever
>> I have transformations that need two or more Maps. But there are certain
>> tasks which I want to make sure that I am following right approach, for
>> instance updating one of the side input maps inside a DoFn.
>>
>> These are my initial thoughts/questions and I would like to get some
>> expert advice on how we typically design such an interleaved transformation
>> in Apache Beam. Appreciate your valuable insights on this.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: Designing an existing pipeline in Beam

2020-06-22 Thread Praveen K Viswanathan
Another way to put this question is, how do we write a beam pipeline for
an existing pipeline (in Java) that has a dozen of custom objects and you
have to work with multiple HashMaps of those custom objects in order to
transform it. Currently, I am writing a beam pipeline by using the same
Custom objects, getters and setters and HashMap *but inside
a DoFn*. Is this the optimal way or does Beam offer something else?

On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke,
>
> We can say Map 2 as a kind of a template using which you want to enrich
> data in Map 1. As I mentioned in my previous post, this is a high level
> scenario.
>
> All these logic are spread across several classes (with ~4K lines of code
> in total). As in any Java application,
>
> 1. The code has been modularized with multiple method calls
> 2. Passing around HashMaps as argument to each method
> 3. Accessing the attributes of the custom object using getters and setters.
>
> This is a common pattern in a normal Java application but I have not seen
> such an example of code in Beam.
>
>
> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>
>> Who reads map 1?
>> Can it be stale?
>>
>> It is unclear what you are trying to do in parallel and why you wouldn't
>> stick all this logic into a single DoFn / stateful DoFn.
>>
>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hello Everyone,
>>>
>>> I am in the process of implementing an existing pipeline (written using
>>> Java and Kafka) in Apache Beam. The data from the source stream is
>>> contrived and had to go through several steps of enrichment using REST API
>>> calls and parsing of JSON data. The key
>>> transformation in the existing pipeline is in shown below (a super high
>>> level flow)
>>>
>>> *Method A*
>>> Calls *Method B*
>>>   Creates *Map 1, Map 2*
>>> Calls *Method C*
>>>  Read *Map 2*
>>>  Create *Map 3*
>>> *Method C*
>>>  Read *Map 3* and
>>>  update *Map 1*
>>>
>>> The Map we use are multi-level maps and I am thinking of having
>>> PCollections for each Maps and pass them as side inputs in a DoFn wherever
>>> I have transformations that need two or more Maps. But there are certain
>>> tasks which I want to make sure that I am following right approach, for
>>> instance updating one of the side input maps inside a DoFn.
>>>
>>> These are my initial thoughts/questions and I would like to get some
>>> expert advice on how we typically design such an interleaved transformation
>>> in Apache Beam. Appreciate your valuable insights on this.
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


-- 
Thanks,
Praveen K Viswanathan


Re: Designing an existing pipeline in Beam

2020-06-23 Thread Praveen K Viswanathan
Hi Luke - Thanks for the explanation. The limitation due to directed graph
processing and the option of external storage clears most of the questions
I had with respect to designing this pipeline. I do have one more scenario
to clarify on this thread.

If I had a certain piece of logic that I had to use in more than one DoFns
how do we do that. In a normal Java application, we can put it as a
separate method and call it wherever we want. Is it possible to replicate
something like that in Beam's DoFn?

On Tue, Jun 23, 2020 at 3:47 PM Luke Cwik  wrote:

> Beam is really about parallelizing the processing. Using a single DoFn
> that does everything is fine as long as the DoFn can process elements in
> parallel (e.g. upstream source produces lots of elements). Composing
> multiple DoFns is great for re-use and testing but it isn't strictly
> necessary. Also, Beam doesn't support back edges in the processing graph so
> all data flows in one direction and you can't have a cycle. This only
> allows for map 1 to producie map 2 which then produces map 3 which is then
> used to update map 1 if all of that logic is within a single DoFn/Transform
> or you create a cycle using an external system such as write to Kafka topic
> X and read from Kafka topic X within the same pipeline or update a database
> downstream from where it is read. There is a lot of ordering complexity and
> stale data issues whenever using an external store to create a cycle though.
>
> On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Another way to put this question is, how do we write a beam pipeline for
>> an existing pipeline (in Java) that has a dozen of custom objects and you
>> have to work with multiple HashMaps of those custom objects in order to
>> transform it. Currently, I am writing a beam pipeline by using the same
>> Custom objects, getters and setters and HashMap *but
>> inside a DoFn*. Is this the optimal way or does Beam offer something
>> else?
>>
>> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hi Luke,
>>>
>>> We can say Map 2 as a kind of a template using which you want to enrich
>>> data in Map 1. As I mentioned in my previous post, this is a high level
>>> scenario.
>>>
>>> All these logic are spread across several classes (with ~4K lines of
>>> code in total). As in any Java application,
>>>
>>> 1. The code has been modularized with multiple method calls
>>> 2. Passing around HashMaps as argument to each method
>>> 3. Accessing the attributes of the custom object using getters and
>>> setters.
>>>
>>> This is a common pattern in a normal Java application but I have not
>>> seen such an example of code in Beam.
>>>
>>>
>>> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>>>
>>>> Who reads map 1?
>>>> Can it be stale?
>>>>
>>>> It is unclear what you are trying to do in parallel and why you
>>>> wouldn't stick all this logic into a single DoFn / stateful DoFn.
>>>>
>>>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>>>> harish.prav...@gmail.com> wrote:
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> I am in the process of implementing an existing pipeline (written
>>>>> using Java and Kafka) in Apache Beam. The data from the source stream is
>>>>> contrived and had to go through several steps of enrichment using REST API
>>>>> calls and parsing of JSON data. The key
>>>>> transformation in the existing pipeline is in shown below (a super
>>>>> high level flow)
>>>>>
>>>>> *Method A*
>>>>> Calls *Method B*
>>>>>   Creates *Map 1, Map 2*
>>>>> Calls *Method C*
>>>>>  Read *Map 2*
>>>>>  Create *Map 3*
>>>>> *Method C*
>>>>>  ----Read *Map 3* and
>>>>>  update *Map 1*
>>>>>
>>>>> The Map we use are multi-level maps and I am thinking of having
>>>>> PCollections for each Maps and pass them as side inputs in a DoFn wherever
>>>>> I have transformations that need two or more Maps. But there are certain
>>>>> tasks which I want to make sure that I am following right approach, for
>>>>> instance updating one of the side input maps inside a DoFn.
>>>>>
>>>>> These are my initial thoughts/questions and I would like to get some
>>>>> expert advice on how we typically design such an interleaved 
>>>>> transformation
>>>>> in Apache Beam. Appreciate your valuable insights on this.
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Praveen K Viswanathan
>>>>>
>>>>
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: Designing an existing pipeline in Beam

2020-06-24 Thread Praveen K Viswanathan
Thanks Luke, I would like to try the latter approach. Would be able to
share any pseudo-code or point to any example on how to call a common
method inside a DoFn's, let's say, ProcessElement method?

On Tue, Jun 23, 2020 at 6:35 PM Luke Cwik  wrote:

> You can apply the same DoFn / Transform instance multiple times in the
> graph or you can follow regular development practices where the common code
> is factored into a method and two different DoFn's invoke it.
>
> On Tue, Jun 23, 2020 at 4:28 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi Luke - Thanks for the explanation. The limitation due to directed
>> graph processing and the option of external storage clears most of the
>> questions I had with respect to designing this pipeline. I do have one more
>> scenario to clarify on this thread.
>>
>> If I had a certain piece of logic that I had to use in more than one
>> DoFns how do we do that. In a normal Java application, we can put it as a
>> separate method and call it wherever we want. Is it possible to replicate
>> something like that in Beam's DoFn?
>>
>> On Tue, Jun 23, 2020 at 3:47 PM Luke Cwik  wrote:
>>
>>> Beam is really about parallelizing the processing. Using a single DoFn
>>> that does everything is fine as long as the DoFn can process elements in
>>> parallel (e.g. upstream source produces lots of elements). Composing
>>> multiple DoFns is great for re-use and testing but it isn't strictly
>>> necessary. Also, Beam doesn't support back edges in the processing graph so
>>> all data flows in one direction and you can't have a cycle. This only
>>> allows for map 1 to producie map 2 which then produces map 3 which is then
>>> used to update map 1 if all of that logic is within a single DoFn/Transform
>>> or you create a cycle using an external system such as write to Kafka topic
>>> X and read from Kafka topic X within the same pipeline or update a database
>>> downstream from where it is read. There is a lot of ordering complexity and
>>> stale data issues whenever using an external store to create a cycle though.
>>>
>>> On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
>>>> Another way to put this question is, how do we write a beam pipeline
>>>> for an existing pipeline (in Java) that has a dozen of custom objects and
>>>> you have to work with multiple HashMaps of those custom objects in order to
>>>> transform it. Currently, I am writing a beam pipeline by using the same
>>>> Custom objects, getters and setters and HashMap *but
>>>> inside a DoFn*. Is this the optimal way or does Beam offer something
>>>> else?
>>>>
>>>> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
>>>> harish.prav...@gmail.com> wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> We can say Map 2 as a kind of a template using which you want to
>>>>> enrich data in Map 1. As I mentioned in my previous post, this is a high
>>>>> level scenario.
>>>>>
>>>>> All these logic are spread across several classes (with ~4K lines of
>>>>> code in total). As in any Java application,
>>>>>
>>>>> 1. The code has been modularized with multiple method calls
>>>>> 2. Passing around HashMaps as argument to each method
>>>>> 3. Accessing the attributes of the custom object using getters and
>>>>> setters.
>>>>>
>>>>> This is a common pattern in a normal Java application but I have not
>>>>> seen such an example of code in Beam.
>>>>>
>>>>>
>>>>> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>>>>>
>>>>>> Who reads map 1?
>>>>>> Can it be stale?
>>>>>>
>>>>>> It is unclear what you are trying to do in parallel and why you
>>>>>> wouldn't stick all this logic into a single DoFn / stateful DoFn.
>>>>>>
>>>>>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>>>>>> harish.prav...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Everyone,
>>>>>>>
>>>>>>> I am in the process of implementing an existing pipeline (written
>>>>>>> using Java and Kafka) in Apache Beam. The data from the source stream is
>>>>&g

Unable to commit offset using KafkaIO

2020-06-24 Thread Praveen K Viswanathan
.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer

*Pipeline:*
p.apply("Read from raw", KafkaIO. read()
.withBootstrapServers(localhost)
.withTopic(rawTopic)
.withConsumerConfigUpdates(ImmutableMap.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
ConsumerConfig.GROUP_ID_CONFIG,
"enrichment-consumer-group",
"security.protocol", "SASL_PLAINTEXT",
"sasl.mechanism", "PLAIN",
""sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"user\" password=\"pwd\";";
))
.withConsumerConfigUpdates(ImmutableMap.of(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true
))
-- 
Thanks,
Praveen K Viswanathan


Caching data inside DoFn

2020-06-26 Thread Praveen K Viswanathan
Hi All - I have a DoFn which generates data (KV pair) for each element that
it is processing. It also has to read from that KV for other elements based
on a key which means, the KV has to retain all the data that's getting
added to it while processing every element. I was thinking about the
"slow-caching side input pattern" but it is more of caching outside the
DoFn and then using it inside. It doesn't update the cache inside a DoFn.
Please share if anyone has thoughts on how to approach this case.

Element 1 > Add a record to a KV > . Element 5 > Used the value from KV
if there is a match in the key

-- 
Thanks,
Praveen K Viswanathan


Re: Caching data inside DoFn

2020-06-26 Thread Praveen K Viswanathan
Thank you Luke. I will work on implementing my use case with Stateful ParDo
itself and come back if I have any questions.

Appreciate your help.

On Fri, Jun 26, 2020 at 8:14 AM Luke Cwik  wrote:

> Use a stateful DoFn and buffer the elements in a bag state. You'll want to
> use a key that contains enough data to match your join condition you are
> trying to match. For example, if your trying to match on a customerId then
> you would do something like:
> element 1 -> ParDo(extract customer id) -> KV ->
> stateful ParDo(buffer element 1 in bag state)
> ...
> element 5 -> ParDo(extract customer id) -> KV ->
> stateful ParDo(output all element in bag)
>
> If you are matching on cudomerId and eventId then you would use a
> composite key (customerId, eventId).
>
> You can always use a single global key but you will lose all parallelism
> during processing (for small pipelines this likely won't matter).
>
> On Fri, Jun 26, 2020 at 7:29 AM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi All - I have a DoFn which generates data (KV pair) for each element
>> that it is processing. It also has to read from that KV for other elements
>> based on a key which means, the KV has to retain all the data that's
>> getting added to it while processing every element. I was thinking
>> about the "slow-caching side input pattern" but it is more of caching
>> outside the DoFn and then using it inside. It doesn't update the cache
>> inside a DoFn. Please share if anyone has thoughts on how to approach this
>> case.
>>
>> Element 1 > Add a record to a KV > . Element 5 > Used the value from
>> KV if there is a match in the key
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


DoFn with SideInput

2020-06-28 Thread Praveen K Viswanathan
Hi All - I am facing an issue while using *side-input*.

*What am I doing:*
>From my main program, I am calling a custom PTransform with a
PCollectionView as parameter. Inside custom PTransform, I am passing the
PCollectionView as a side-input to a DoFn.

*Issue:*
When I run the pipeline, I am expecting the log statement inside my DoFn's
processElement to get executed but it is not getting logged. If I remove
the side-input to my DoFn then the log is getting printed. I am suspecting
whether it could be related to windowing/execution order or my side-input
somehow being empty. Appreciate if you can clarify on what is going wrong
here.

*Code Structure:*


*Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());

 // Get two tuple tags from first transformation
 PCollection1 = tuple.get(tag1).setCoder(...);
 PCollection2 = tuple.get(tag2).setCoder(...);

 // Converting PCollection1 to PCollectionView to use as a side-input
 // Note: I need to introduce a global window here as my source is
unbounded and when we use View.asList() it does GroupByKey internally
  which inturn demands a window
 PView = PCollection1.apply(Window.>into(new
GlobalWindows()) // Everything into global window.

   .triggering(Repeatedly.forever(DefaultTrigger.of()))

   .discardingFiredPanes()).apply(Values.create()).apply(View.asList());

// Pass PCollectionView to SecondTx as a param
PCollection3 = PCollection2.apply(new SecondTx(PView));

*SecondTx:*
Inside my SecondTx, I am getting the PView from constructor (this.PView =
PView) and calling a DoFn

public PCollection expand(PCollection >> input) {
input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
...
}

// DoFn
class UpdateFn extends DoFn>>,
CustomObject> {
@ProcessElement
public void processElement(@Element Map>> input, OutputReceiver out) {
   * Log.of("UpdateFn " + input);*
out.output(new CustomObject());
}
}

-- 
Thanks,
Praveen K Viswanathan


Re: DoFn with SideInput

2020-06-29 Thread Praveen K Viswanathan
Thank you Luke. I changed DefaultTrigger.of() to AfterProcessingTime.
pastFirstElementInPane() and it worked.

On Mon, Jun 29, 2020 at 9:09 AM Luke Cwik  wrote:

> The UpdateFn won't be invoked till the side input is ready which requires
> either the watermark to pass the end of the global window + allowed
> lateness (to show that the side input is empty) or at least one firing to
> populate it with data. See this general section on side inputs[1] and some
> useful patterns[2] (there are some examples for how to get globally
> windowed side inputs to work).
>
> 1: https://beam.apache.org/documentation/programming-guide/#side-inputs
> 2: https://beam.apache.org/documentation/patterns/side-inputs/
>
> On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>>
>> Hi All - I am facing an issue while using *side-input*.
>>
>> *What am I doing:*
>> From my main program, I am calling a custom PTransform with a
>> PCollectionView as parameter. Inside custom PTransform, I am passing the
>> PCollectionView as a side-input to a DoFn.
>>
>> *Issue:*
>> When I run the pipeline, I am expecting the log statement inside my
>> DoFn's processElement to get executed but it is not getting logged. If I
>> remove the side-input to my DoFn then the log is getting printed. I am
>> suspecting whether it could be related to windowing/execution order or my
>> side-input somehow being empty. Appreciate if you can clarify on what is
>> going wrong here.
>>
>> *Code Structure:*
>>
>>
>> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());
>>
>>  // Get two tuple tags from first transformation
>>  PCollection1 = tuple.get(tag1).setCoder(...);
>>  PCollection2 = tuple.get(tag2).setCoder(...);
>>
>>  // Converting PCollection1 to PCollectionView to use as a side-input
>>  // Note: I need to introduce a global window here as my source is
>> unbounded and when we use View.asList() it does GroupByKey internally
>>   which inturn demands a window
>>  PView = PCollection1.apply(Window.>into(new
>> GlobalWindows()) // Everything into global window.
>>
>>  .triggering(Repeatedly.forever(DefaultTrigger.of()))
>>
>>  .discardingFiredPanes()).apply(Values.create()).apply(View.asList());
>>
>> // Pass PCollectionView to SecondTx as a param
>> PCollection3 = PCollection2.apply(new SecondTx(PView));
>>
>> *SecondTx:*
>> Inside my SecondTx, I am getting the PView from constructor (this.PView =
>> PView) and calling a DoFn
>>
>> public PCollection expand(PCollection > >> input) {
>> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
>> ...
>> }
>>
>> // DoFn
>> class UpdateFn extends DoFn> String>>>, CustomObject> {
>> @ProcessElement
>> public void processElement(@Element Map> Map>> input, OutputReceiver out) {
>>* Log.of("UpdateFn " + input);*
>> out.output(new CustomObject());
>> }
>> }
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Conditional branching during pipeline execution time

2020-07-07 Thread Praveen K Viswanathan
Hello Everyone,

Apache Beam allows conditional branching during pipeline construction time,
but I have to decide whether to execute DoFn A or DoFn B during run time
(based upon a PCollection flag).

My DoFns A and B are inside a custom transformation class and I am passing
my flag as PCollectionView to the transformation class. However, Beam does
not wait for the actual value of the PCollectionView and decides which DoFn
to call during DAG preparation itself (always goes to else part)

class CustomTx {
   public CustomTx(flag) {
this.flag = flag;
   }

 public expand {
  if (flag)
 DoFn A
  else
 DoFn B
  }
}

class DoFn A {
}

class DoFn B {
}

If I have a DoFn inside my transformation's expand method and pass the flag
as side input it gives the correct value but then, I cannot call a DoFn
inside a DoFn. Appreciate any pointers on the best way to approach this
branching case.

-- 
Thanks,
Praveen K Viswanathan


Re: Conditional branching during pipeline execution time

2020-07-07 Thread Praveen K Viswanathan
Thanks Luke. I changed the pipeline structure as shown below. I could see
that this flow would work for my original issue but now while building, I
am getting "j*ava.lang.IllegalArgumentException: Type of @Element must
match the DoFn type CustomTx/Filter
A/Values/Map/ParMultiDo(Anonymous).output [PCollection]*" error. I could
not get much input from docs or in other forums on this error. Appreciate
your help.

CustomTx {

public PCollection expand(KV<> input){

// Tuple Tags
final TupleTag> tagA = new TupleTag>(){};
final TupleTag> tagB = new TupleTag>(){};

PCollectionTuple tuple = input.apply(ParDo.of(new DoFn , KV<>>(){
   @ProcessElement
   public void processElement(ProcessContext c) {
  String str = c.sideInput(s);
  if ("A".equals(str))
 c.output(tagA, c.element);
 else
c.output(tagB, c.element);
  }
}).withSideInputs(s)
 .withOutputTags(tagA, TupleTagList.of(tagB)));

// Output PCollection
PCollection output;

// A Flow
output = tuple.get(tagA).setCoder(kvCoder.of(...))
 .apply("Filter A", Values.create())
 .apply(ParDo.of(new DoFnA()))
 ...

// B Flow
output = tuple.get(tagB).setCoder(KvCoder.of(...))
 .apply("Filter B", Values.create())
 .apply(ParDo.of(new DoFnB()))
 ...

return output;
}
}

class DoFnA(){

}

class DoFnB(){

}

On Tue, Jul 7, 2020 at 7:39 PM Luke Cwik  wrote:

> Have both DoFns A and B in the graph at the same time and instead use
> another decider DoFn that outputs to either the PCollection that goes to
> DoFn A or DoFn B based upon the contents of the side input. Graph would
> look something like:
>
> PCollectionView -\
> PCollection -> ParDo(Decider) -outA-> PCollection ->
> ParDo(DoFnA)
> \outB-> PCollection ->
> ParDo(DoFnB)
>
> See[1] for how to create a DoFn with multiple outputs.
>
> 1:
> https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs
>
> On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> Apache Beam allows conditional branching during pipeline construction
>> time, but I have to decide whether to execute DoFn A or DoFn B during run
>> time (based upon a PCollection flag).
>>
>> My DoFns A and B are inside a custom transformation class and I am
>> passing my flag as PCollectionView to the transformation class. However,
>> Beam does not wait for the actual value of the PCollectionView and decides
>> which DoFn to call during DAG preparation itself (always goes to else part)
>>
>> class CustomTx {
>>public CustomTx(flag) {
>> this.flag = flag;
>>}
>>
>>  public expand {
>>   if (flag)
>>  DoFn A
>>   else
>>  DoFn B
>>   }
>> }
>>
>> class DoFn A {
>> }
>>
>> class DoFn B {
>> }
>>
>> If I have a DoFn inside my transformation's expand method and pass the
>> flag as side input it gives the correct value but then, I cannot call a
>> DoFn inside a DoFn. Appreciate any pointers on the best way to approach
>> this branching case.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: Conditional branching during pipeline execution time

2020-07-08 Thread Praveen K Viswanathan
No worries, the error was due to the DoFn element not defined as an
instance of SchemaCoder.

On Tue, Jul 7, 2020 at 10:46 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Thanks Luke. I changed the pipeline structure as shown below. I could see
> that this flow would work for my original issue but now while building, I
> am getting "j*ava.lang.IllegalArgumentException: Type of @Element must
> match the DoFn type CustomTx/Filter
> A/Values/Map/ParMultiDo(Anonymous).output [PCollection]*" error. I could
> not get much input from docs or in other forums on this error. Appreciate
> your help.
>
> CustomTx {
>
> public PCollection expand(KV<> input){
>
> // Tuple Tags
> final TupleTag> tagA = new TupleTag>(){};
> final TupleTag> tagB = new TupleTag>(){};
>
> PCollectionTuple tuple = input.apply(ParDo.of(new DoFn , KV<>>(){
>@ProcessElement
>public void processElement(ProcessContext c) {
>   String str = c.sideInput(s);
>   if ("A".equals(str))
>  c.output(tagA, c.element);
>  else
> c.output(tagB, c.element);
>   }
> }).withSideInputs(s)
>  .withOutputTags(tagA, TupleTagList.of(tagB)));
>
> // Output PCollection
> PCollection output;
>
> // A Flow
> output = tuple.get(tagA).setCoder(kvCoder.of(...))
>  .apply("Filter A", Values.create())
>  .apply(ParDo.of(new DoFnA()))
>  ...
>
> // B Flow
> output = tuple.get(tagB).setCoder(KvCoder.of(...))
>  .apply("Filter B", Values.create())
>  .apply(ParDo.of(new DoFnB()))
>  ...
>
> return output;
> }
> }
>
> class DoFnA(){
>
> }
>
> class DoFnB(){
>
> }
>
> On Tue, Jul 7, 2020 at 7:39 PM Luke Cwik  wrote:
>
>> Have both DoFns A and B in the graph at the same time and instead use
>> another decider DoFn that outputs to either the PCollection that goes to
>> DoFn A or DoFn B based upon the contents of the side input. Graph would
>> look something like:
>>
>> PCollectionView -\
>> PCollection -> ParDo(Decider) -outA-> PCollection ->
>> ParDo(DoFnA)
>>         \outB-> PCollection ->
>> ParDo(DoFnB)
>>
>> See[1] for how to create a DoFn with multiple outputs.
>>
>> 1:
>> https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs
>>
>> On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hello Everyone,
>>>
>>> Apache Beam allows conditional branching during pipeline construction
>>> time, but I have to decide whether to execute DoFn A or DoFn B during run
>>> time (based upon a PCollection flag).
>>>
>>> My DoFns A and B are inside a custom transformation class and I am
>>> passing my flag as PCollectionView to the transformation class. However,
>>> Beam does not wait for the actual value of the PCollectionView and decides
>>> which DoFn to call during DAG preparation itself (always goes to else part)
>>>
>>> class CustomTx {
>>>public CustomTx(flag) {
>>> this.flag = flag;
>>>}
>>>
>>>  public expand {
>>>   if (flag)
>>>  DoFn A
>>>   else
>>>  DoFn B
>>>   }
>>> }
>>>
>>> class DoFn A {
>>> }
>>>
>>> class DoFn B {
>>> }
>>>
>>> If I have a DoFn inside my transformation's expand method and pass the
>>> flag as side input it gives the correct value but then, I cannot call a
>>> DoFn inside a DoFn. Appreciate any pointers on the best way to approach
>>> this branching case.
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


-- 
Thanks,
Praveen K Viswanathan


Re: Output from Window not getting materialized

2020-09-17 Thread Praveen K Viswanathan
Hello Luke,

Thanks for the reference. The plan is to go with "MonotonicallyIncreasing
Watermark Estimator" but not sure about how to implement it along with our
source which is "Oracle Streaming Service" (OSS). For other sources like
Kafka I can see the availability of "TimeStampPolicy" through which
Watermark in the IO talks natively to Kafka. I looked for something
similar in OSS but did not find any on its SDK. I am planning to check with
the OSS development team on this and would help if you could share what
exactly we would need from the source to implement watermark. If you have
any other code base for implementing a monotonically increasing watermark,
that will also be helpful.

Regards,
Praveen

On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:

> Is the watermark advancing[1, 2] for the SDF such that the windows can
> close allowing for the Count transform to produce output?
>
> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>
> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
> wrote:
>
>> Hi everyone!
>>
>> We are developing a new IO connector using the SDF API, and testing it
>> with the following simple counting pipeline:
>>
>>
>>
>> p.apply(MyIO.read()
>>
>> .withStream(inputStream)
>>
>> .withStreamPartitions(Arrays.asList(0))
>>
>> .withConsumerConfig(config)
>>
>> ) // gets a PCollection>
>>
>>
>>
>>
>>
>> .apply(Values.*create*()) // PCollection
>>
>>
>>
>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))
>>
>> .withAllowedLateness(Duration.standardDays(1))
>>
>> .accumulatingFiredPanes())
>>
>>
>>
>> .apply(Count.perElement())
>>
>>
>>
>>
>>
>> // write PCollection> to stream
>>
>> .apply(MyIO.write()
>>
>>     .withStream(outputStream)
>>
>> .withConsumerConfig(config));
>>
>>
>>
>>
>>
>> Without the window transform, we can read from the stream and write to
>> it, however, I don’t see output after the Window transform. Could you
>> please help pin down the issue?
>>
>> Thank you,
>>
>> Gaurav
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: Output from Window not getting materialized

2020-09-17 Thread Praveen K Viswanathan
Hi Luke,

I am also looking at the `WatermarkEstimators.manual` option, in parallel.
Now we are getting data past our Fixed Window but the aggregation is not as
expected.  The doc says setWatermark will "set timestamp before or at the
timestamps of all future elements produced by the associated DoFn". If I
output with a timestamp as below then could you please clarify on how we
should set the watermark for this manual watermark estimator?

receiver.outputWithTimestamp(ossRecord, Instant.now());

Thanks,
Praveen

On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:

> Is the watermark advancing[1, 2] for the SDF such that the windows can
> close allowing for the Count transform to produce output?
>
> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>
> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
> wrote:
>
>> Hi everyone!
>>
>> We are developing a new IO connector using the SDF API, and testing it
>> with the following simple counting pipeline:
>>
>>
>>
>> p.apply(MyIO.read()
>>
>> .withStream(inputStream)
>>
>> .withStreamPartitions(Arrays.asList(0))
>>
>> .withConsumerConfig(config)
>>
>> ) // gets a PCollection>
>>
>>
>>
>>
>>
>> .apply(Values.*create*()) // PCollection
>>
>>
>>
>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))
>>
>> .withAllowedLateness(Duration.standardDays(1))
>>
>> .accumulatingFiredPanes())
>>
>>
>>
>> .apply(Count.perElement())
>>
>>
>>
>>
>>
>> // write PCollection> to stream
>>
>> .apply(MyIO.write()
>>
>> .withStream(outputStream)
>>
>> .withConsumerConfig(config));
>>
>>
>>
>>
>>
>> Without the window transform, we can read from the stream and write to
>> it, however, I don’t see output after the Window transform. Could you
>> please help pin down the issue?
>>
>> Thank you,
>>
>> Gaurav
>>
>

-- 
Thanks,
Praveen K Viswanathan


Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

2020-10-02 Thread Praveen K Viswanathan
Hi - We have a beam pipeline reading and writing using an SDF based IO
connector working fine in a local machine using Direct Runner or Flink
Runner. However when we build an image of that pipeline along with Flink
and deploy in a cluster we get below exception.

ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
> Unhandled exception.
> org.apache.flink.client.program.ProgramInvocationException: The program
> caused an error:
>
> Classpath:
> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
> System.out: (none)
> System.err: (none)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
> at
> org.apache.beam.runners.core.construction.PTransformTranslation.(PTransformTranslation.java:199)
> at
> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
> at
> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> at com.org.cx.signals.Booster.main(Booster.java:278)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
> ... 8 more


In our pom.xml we have created a profile for flink-runner as shown below.


>
>   flink-runner
>  
>  
>   
>org.apache.beam
>beam-runners-flink-1.10
>2.21.0
>   
>   
>   
>   
> 


And the docker image has below flink version

FROM flink:1.10.0-scala_2.12


Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
Appreciate if you can guide us on what is causing this exception.

-- 
Thanks,
Praveen K Viswanathan


Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

2020-10-05 Thread Praveen K Viswanathan
reatePCollectionView/Combine.globally(Concatenate)/Values/Values
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |   |visitPrimitiveTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |visitPrimitiveTransform-
> View.AsList/View.CreatePCollectionView/CreateStreamingFlinkView.CreateFlinkPCollectionView
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  leaveCompositeTransform- View.AsList/View.CreatePCollectionView
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  leaveCompositeTransform- View.AsList
> 2020-10-03 00:42:31,116 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- Read from rawsignal
> 2020-10-03 00:42:31,116 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  visitPrimitiveTransform- Read from rawsignal/Impulse
> 2020-10-03 00:42:31,116 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  -
> *beam:transform:impulse:v1*


On Mon, Oct 5, 2020 at 9:51 AM Luke Cwik  wrote:

> In your pom.xml you are stating you want Flink 2.21.0 but you are using
> 2.23 elsewhere. You want these versions to match. Try updating your profile
> to:
>
>   flink-runner
>  
>  
>   
>org.apache.beam
>beam-runners-flink-1.10
>*2.23.0*
>   
>   
>   
>   
>
> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki  wrote:
>
>> I suspect your dependencies have conflict. I develop Linkage Checker
>> enforcer rule to identify incompatible dependencies. Do you want to give it
>> a try?
>>
>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>>
>> Regards,
>> Tomo
>>
>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>>> connector working fine in a local machine using Direct Runner or Flink
>>> Runner. However when we build an image of that pipeline along with Flink
>>> and deploy in a cluster we get below exception.
>>>
>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>>>> Unhandled exception.
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> caused an error:
>>>>
>>>> Classpath:
>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>>> System.out: (none)
>>>> System.err: (none)
>>>> at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>> at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>> at
>>>> org.apache.flink.runtime.webmonitor.han

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

2020-10-05 Thread Praveen K Viswanathan
Thanks for sharing the tool Tomo. I will give it a try and let you know.

Regards,
Praveen

On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki  wrote:

> I suspect your dependencies have conflict. I develop Linkage Checker
> enforcer rule to identify incompatible dependencies. Do you want to give it
> a try?
>
> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>
> Regards,
> Tomo
>
> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>> connector working fine in a local machine using Direct Runner or Flink
>> Runner. However when we build an image of that pipeline along with Flink
>> and deploy in a cluster we get below exception.
>>
>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>>> Unhandled exception.
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> caused an error:
>>>
>>> Classpath:
>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>> System.out: (none)
>>> System.err: (none)
>>> at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>> at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>> at
>>> org.apache.beam.runners.core.construction.PTransformTranslation.(PTransformTranslation.java:199)
>>> at
>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>> at
>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>> at
>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>> at
>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>> at com.org.cx.signals.Booster.main(Booster.java:278)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>     ... 8 more
>>
>>
>> In our pom.xml we have created a profile for flink-runner as shown below.
>>
>> 
>>>
>>>   flink-runner
>>>  
>>>  
>>>   
>>>org.apache.beam
>>>beam-runners-flink-1.10
>>>2.21.0
>>>   
>>>   
>>>   
>>>   
>>> 
>>
>>
>> And the docker image has below flink version
>>
>> FROM flink:1.10.0-scala_2.12
>>
>>
>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
>> Appreciate if you can guide us on what is causing this exception.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
> --
> Regards,
> Tomo
>


-- 
Thanks,
Praveen K Viswanathan


Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

2020-10-06 Thread Praveen K Viswanathan
Thanks Luke. I will check on the "portable" mode with Flink. Also hoping
that 2.25.0 will be released soon.

Regards,
Praveen

On Mon, Oct 5, 2020 at 11:46 AM Luke Cwik  wrote:

> Impulse in a released version of Apache Beam is only supported if you run
> your pipeline in "portable" mode with Flink. See
> https://beam.apache.org/documentation/runners/flink/ for some example
> instructions on how to run a "portable" pipeline.
>
> I added support for impulse in non portable pipeline execution to Flink in
> https://github.com/apache/beam/pull/12708 which will be available in
> 2.25.0 release (this release is currently underway).
>
> On Mon, Oct 5, 2020 at 11:28 AM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Thanks for sharing the tool Tomo. I will give it a try and let you know.
>>
>> Regards,
>> Praveen
>>
>> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki  wrote:
>>
>>> I suspect your dependencies have conflict. I develop Linkage Checker
>>> enforcer rule to identify incompatible dependencies. Do you want to give it
>>> a try?
>>>
>>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>>>
>>> Regards,
>>> Tomo
>>>
>>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
>>>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>>>> connector working fine in a local machine using Direct Runner or Flink
>>>> Runner. However when we build an image of that pipeline along with Flink
>>>> and deploy in a cluster we get below exception.
>>>>
>>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>>>>> Unhandled exception.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> program caused an error:
>>>>>
>>>>> Classpath:
>>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>>>> System.out: (none)
>>>>> System.err: (none)
>>>>> at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>>> at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>>> at
>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>>> at
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>>> at
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>>> at
>>>>> org.apache.beam.runners.core.construction.PTransformTranslation.(PTransformTranslation.java:199)
>>>>> at
>>>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>>> at
>>>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>>> at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>>> at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>>> at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>>> at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>>> at
>>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>>> at org.apache.beam.sdk.Pipeli