Re: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Juan Carlos Garcia
If you want a single pipeline, as Robert mentioned you need an Streaming
pipeline which requires an unbounded source (like kafka, or gcp pub/sub).
In your example your are creating your source from a fixed list which on
definition is a bounded source.

JC

Robert Bradshaw  schrieb am Mo., 16. Sep. 2019, 21:38:

> An external scheduler would also create a new job every time. The only
> way I see to continuously process results in a single job is to have a
> streaming job.
>
> On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi
>  wrote:
> >
> > Hi Juan,
> >
> > Thanks for the reply ! I want to know if there is any way in dataflow to
> achieve this before trying external scheduler.
> >
> > Regards,
> > Anjana
> > 
> > From: Juan Carlos Garcia [jcgarc...@gmail.com]
> > Sent: Monday, September 16, 2019 11:23 AM
> > To: user@beam.apache.org
> > Cc: Richard Amrith Lourdu
> > Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop)
> >
> > Hi Anjana,
> >
> > You need to separate your line of thoughts between the pipeline
> definition vs what happens when you call *run* on the pipeline, given that
> you need externalize the scheduling using something like a crontab,
> jenkins, or another mechanism.
> >
> > Best regards,
> > JC
> >
> > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi 
> wrote:
> >>
> >> Hi,
> >>
> >> I am trying to run a task using an infinite while loop with change in
> input parameters as below but it creates a new job for every time. (I
> expect it to use same pipeline that is created before while loop and should
> be in a single job)
> >>
> >> p = beam.Pipeline(options=PipelineOptions())
> >>
> >> while True:
> >> to_date = time.time()
> >>
> >> (p
> >> | 'create surveys' >> beam.Create(id_list)
> >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
> from_date, to_date))
> >>| 'send to output' >> beam.FlatMap(lambda input:
> (send_to_api(input)))
> >>)
> >> from_date = to_date + timedelta(microseconds=1)
> >> time.sleep(30)
> >>
> >> p.run().wait_until_finish()
> >>
> >> It works properly(in only one job) when there is no while loop as below:
> >>
> >> p = beam.Pipeline(options=PipelineOptions())
> >>
> >> (p
> >> | 'create surveys' >> beam.Create(id_list)
> >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
> from_date, to_date))
> >>| 'send to output' >> beam.FlatMap(lambda input:
> (send_to_api(input)))
> >>)
> >>
> >> p.run().wait_until_finish()
> >>
> >> Could someone please suggest how to make the task run in same job
> instead of creating multiple jobs.
> >>
> >> Please let me know in case if any additional information needed.
> >>
> >> Thanks,
> >> Anjana
> >>
> >>
> >>
> ---
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
> >
> >
> >
> > --
> >
> > JC
> >
> >
> ---
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
>


Re: Design question regarding streaming and sorting

2019-08-24 Thread Juan Carlos Garcia
Sorry I hit too soon, then after updating the priority I would execute
group by DoFn and then use an external sorting (by this priority key) to
guarantee that on the next DoFn you have a sorted Iterable.


JC

Juan Carlos Garcia  schrieb am Sa., 24. Aug. 2019,
11:08:

> Hi,
>
> The main puzzle here is how to deliver the priority change of a row during
> a given window, my best shot would be to have a side input
> (PCollectionView) containing the change of priority, then in the slow
> worker beam transform extract this side input and update the corresponding
> row with the new priority.
>
> Again is just an idea.
>
> JC
>
> Chad Dombrova  schrieb am Sa., 24. Aug. 2019, 03:32:
>
>> Hi all,
>> Our team is brainstorming how to solve a particular type of problem with
>> Beam, and it's a bit beyond our experience level, so I thought I'd turn to
>> the experts for some advice.
>>
>> Here are the pieces of our puzzle:
>>
>>- a data source with the following properties:
>>   - rows represent work to do
>>   - each row has an integer priority
>>   - rows can be added or deleted
>>   - priorities of a row can be changed
>>   - <10k rows
>>- a slow worker Beam transform (Map/ParDo) that consumes a row at a
>>time
>>
>>
>> We want a streaming pipeline that delivers rows from our data store to
>> the worker transform,  resorting the source based on priority each time a
>> new row is delivered.  The goal is that last second changes in priority can
>> affect the order of the slowly yielding read.  Throughput is not a major
>> concern since the worker is the bottleneck.
>>
>> I have a few questions:
>>
>>- is the sort of problem that BeamSQL can solve? I'm not sure how
>>sorting and resorting are handled there in a streaming context...
>>- I'm unclear on how back-pressure in Flink affects streaming reads.
>>It's my hope that data/messages are left in the data source until
>>back-pressure subsides, rather than read eagerly into memory.  Can someone
>>clarify this for me?
>>- is there a combination of windowing and triggering that can solve
>>this continual resorting plus slow yielding problem?  It's not 
>> unreasonable
>>to keep all of our rows in memory on Flink, as long as we're snapshotting
>>state.
>>
>>
>> Any advice on how an expert Beamer would solve this is greatly
>> appreciated!
>>
>> Thanks,
>> chad
>>
>>


Re: Design question regarding streaming and sorting

2019-08-24 Thread Juan Carlos Garcia
Hi,

The main puzzle here is how to deliver the priority change of a row during
a given window, my best shot would be to have a side input
(PCollectionView) containing the change of priority, then in the slow
worker beam transform extract this side input and update the corresponding
row with the new priority.

Again is just an idea.

JC

Chad Dombrova  schrieb am Sa., 24. Aug. 2019, 03:32:

> Hi all,
> Our team is brainstorming how to solve a particular type of problem with
> Beam, and it's a bit beyond our experience level, so I thought I'd turn to
> the experts for some advice.
>
> Here are the pieces of our puzzle:
>
>- a data source with the following properties:
>   - rows represent work to do
>   - each row has an integer priority
>   - rows can be added or deleted
>   - priorities of a row can be changed
>   - <10k rows
>- a slow worker Beam transform (Map/ParDo) that consumes a row at a
>time
>
>
> We want a streaming pipeline that delivers rows from our data store to the
> worker transform,  resorting the source based on priority each time a new
> row is delivered.  The goal is that last second changes in priority can
> affect the order of the slowly yielding read.  Throughput is not a major
> concern since the worker is the bottleneck.
>
> I have a few questions:
>
>- is the sort of problem that BeamSQL can solve? I'm not sure how
>sorting and resorting are handled there in a streaming context...
>- I'm unclear on how back-pressure in Flink affects streaming reads.
>It's my hope that data/messages are left in the data source until
>back-pressure subsides, rather than read eagerly into memory.  Can someone
>clarify this for me?
>- is there a combination of windowing and triggering that can solve
>this continual resorting plus slow yielding problem?  It's not unreasonable
>to keep all of our rows in memory on Flink, as long as we're snapshotting
>state.
>
>
> Any advice on how an expert Beamer would solve this is greatly appreciated!
>
> Thanks,
> chad
>
>


Re: Why is my RabbitMq message never acknowledged ?

2019-06-14 Thread Juan Carlos Garcia
In my opinion, for such crucial behavior i would expect the pipeline to
fail with a clear message stating the reason, like in the same way when you
implement a new Codec and forget to override the verifyDeterministic method
(don't recall the right name of it).

Just my 2 cents.

Maximilian Michels  schrieb am Fr., 14. Juni 2019, 16:48:

> This has come up before: https://issues.apache.org/jira/browse/BEAM-4520
>
> The issue is that checkpoints won't be acknowledged if checkpointing is
> disabled in Flink. We throw a WARN when unbounded sources are used without
> checkpointing. Not all unbounded sources actually need to finalize
> checkpoint marks.
>
> Seeing that this is still an issue, we might want to at least periodically
> acknowledge checkpoint marks when checkpointing is disabled. The
> alternative would be to throw an exception, perhaps with the option to
> override this in case the user knows what he/she does.
>
> Thanks,
> Max
>
> On 14.06.19 10:52, Ismaël Mejía wrote:
> > Is there a JIRA for this ? if this solves an issue to multiple users
> > maybe is worth of integrating the patch.
> > Would you be up to do this Augustin?
> >
> > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere
> >  wrote:
> > >
> > > Hello Nicolas,
> > > I also encountered the same problem.
> > > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls
> but this was not clear to me on when this method is called because no
> message were ack on pipeline runtime.
> > > I finally decided to implement a patch of the RabbitMqIO to set auto
> ack of received messages, this is fine for my current use case but is not
> the safest way of consuming messages.
> > >
> > > If someone has a cleaner solution I’ll be happy to hear it.
> > >
> > > Augustin
> > >
> > >
> > >
> > >
> > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux  a
> écrit :
> > >>
> > >> I'm having big troubles reading data from RabbitMQ.
> > >>
> > >> To understand my troubles, i've simplified my previous code to the
> extreme :
> > >>
> > >>
> > >> Pipeline pipeline = Pipeline.create(options);
> > >>
> > >> PCollection wat = (PCollection)
> pipeline.apply("read_from_rabbit",
> > >> RabbitMqIO.read()
> > >> .withUri(options.getRabbitMQUri())
> > >> .withQueue(options.getRabbitMQQueue())
> > >> )
> > >> .apply("why not", RabbitMqIO.write()
> > >> .withQueue("written_in_rabbit")
> > >> .withQueueDeclare(true)
> > >> .withUri(options.getRabbitMQUri())
> > >> )
> > >>
> > >>
> > >> So if I put a simple message in my input queue, it should be "moved"
> (quotes are here since new message is not the original one, but has same
> content) into my "written_in_rabbit" message.
> > >>
> > >> Unfortunatly, for reasons I don't understand, the original message
> stays in input queue.
> > >>
> > >> It seems to be due to the fact that
> RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So
> where is the finalizeCheckpoint method called ?
> > >>
> > >> And how can I understand why this method is never called in my case ?
> > >>
> > >> Thanks
> > >>
> > >>
> > >
>
>


Re: Windows were processed out of order

2019-06-03 Thread Juan Carlos Garcia
Hi Robert,

*The elements of a PCollection are unordered.* >> Yes this is something
known and understood given the nature of a PCollection.

So that means, that when we are doing a replay of past data (we rewind our
kafka consumer groups), in 1h of processing time, there might be multiple
1h windows for a given GBK hence theses windows are fired on any arbitrary
order?

Thanks for the insight!
JC

On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw  wrote:

> The elements of a PCollection are unordered. This includes the results
> of a GBK--there is no promise that the output be processed in any (in
> particular, windows ordered by timestamp) order. DoFns, especially one
> with side effects, should be written with this in mind.
>
> (There is actually ongoing discussion on the dev list about how to
> make it easier to write order-sensitive operations.)
>
> On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia 
> wrote:
> >
> > Hi Folks,
> >
> > My team and i have a situation that cannot be explain and
> > would like to hear your thoughts, we have a pipeline which
> > enrich the incoming messages and write them to BigQuery, the pipeline
> looks like this:
> >
> > Apache Beam 2.12.0 / GCP Dataflow
> >
> > -
> > - ReadFromKafka (with withCreateTime and 10min MaxDelay)
> > - ApplySomeInitialEnrichment (just add some stuff to the incoming json
> messages)
> > - Apply a Fixed 1 hour window (with default triggering)
> > - Apply Group By Key (userId)
> > - Apply External Sorter
> (SortValues.create(BufferedExternalSorter.options(
> > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
> >   - Read initial state from Hbase (BigTable)
> >   - loop thru all messages, enriching them with the previous state
> (incremental enrichment) and session calculation
> >   - write the final state to Hbase (BigTable)
> >   - output each of the enriched element to the next DoFn
> > - Apply a Transformation to prepare the data to BigQuery
> > - Apply BigQueryIO
> > --
> >
> >
> > Just to give some more context we have a meta_info column in our
> BigQuery table which values are set at the very beginning of the
> ComplexEnrichmentDoFn, meaning all the records within the same Iterable<>
> will hold the same information. The meta_info column contains the
> serialized PaneInfo, WindowInfo and our SystemTimestamp =
> currentTimeMilliseconds.
> >
> > We have 3 windows:
> >   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z),
> systemTimestamp: 1559396670577
> >   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z),
> systemTimestamp: 1559396670670
> >   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z),
> systemTimestamp: 1559396670533
> >
> >
> > window A contains: 18 records
> > window B contains: 46 records
> > window C contains: 3  records
> >
> > If you pay attention to the A, B, C windowInfo from above, the
> `systemTimestamp` field reflect an incorrect order of processing, and the
> enrichment was executed as C -> A ->  B, corrupting all the messages for
> this given user.
> >
> > For all 3 windows the serialized PaneInfo was set by the runner to
> ON_TIME:
> > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
> onTimeIndex=0}"
> >
> > Any idea why would the windows be triggered out of order?
> >
> > --
> >
> > JC
> >
>


-- 

JC


Windows were processed out of order

2019-06-03 Thread Juan Carlos Garcia
Hi Folks,

My team and i have a situation that cannot be explain and
would like to hear your thoughts, we have a pipeline which
enrich the incoming messages and write them to BigQuery, the pipeline looks
like this:

Apache Beam 2.12.0 / GCP Dataflow

-
- ReadFromKafka (with withCreateTime and 10min MaxDelay)
- ApplySomeInitialEnrichment (just add some stuff to the incoming json
messages)
- Apply a Fixed 1 hour window (with default triggering)
- Apply Group By Key (userId)
- Apply External Sorter
(SortValues.create(BufferedExternalSorter.options(
- Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
  - Read initial state from Hbase (BigTable)
  - loop thru all messages, enriching them with the previous state
(incremental enrichment) and session calculation
  - write the final state to Hbase (BigTable)
  - output each of the enriched element to the next DoFn
- Apply a Transformation to prepare the data to BigQuery
- Apply BigQueryIO
--


Just to give some more context we have a meta_info column in our BigQuery
table which values are set at the very beginning of the
ComplexEnrichmentDoFn, meaning all the records within the same Iterable<>
will hold the same information. The meta_info column contains the
serialized PaneInfo, WindowInfo and our SystemTimestamp =
currentTimeMilliseconds.

We have 3 windows:
  A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z),
systemTimestamp: *1559396670577*
  B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z),
systemTimestamp: *1559396670670*
  C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z),
systemTimestamp: *1559396670533*


window A contains: 18 records
window B contains: 46 records
window C contains: 3  records

If you pay attention to the A, B, C windowInfo from above, the `
*systemTimestamp*` field reflect an incorrect order of processing, and the
enrichment was executed as C -> A ->  B, corrupting all the messages for
this given user.

For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
onTimeIndex=0}"

Any idea why would the windows be triggered out of order?

-- 

JC


Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-06 Thread Juan Carlos Garcia
Hi,

I don't want to hijack the thread regarding as of why, but to keep it short
we experienced a lots of problems with Spark (streaming pipeline) +
checkpoints, at the point it was like a gambling to restart a pipeline
without spark going nuts while restoring from a checkpoint resulting then
on data lost(spark bugs). There were pipelines under heavy development
which requires to redeploy them multiple time a day.

We found on Flink the stability /features we needed while we planned a
migration to a managed environment (luckily 'Dataflow' which at that time
was not yet approved) and in our case as you mentioned, we were lucky to be
able to switch across runners without major problems.

Thanks

kant kodali  schrieb am Mo., 6. Mai 2019, 21:34:

> 1) It would be good to know the reasons why you guys moved from one
> execution to another?
> 2) You are lucky to have your problem fit into all three execution engines
> and supported by the Beam at the same time. This is certainly not the case
> for me since some runners that Beam supports are still a Work in progress
> while the execution engine had the support since 2 years at very least.
>
>
>
> On Mon, May 6, 2019 at 12:24 PM kant kodali  wrote:
>
>>
>>
>> On Mon, May 6, 2019 at 12:09 PM Juan Carlos Garcia 
>> wrote:
>>
>>> As everyone has pointed out there will be a small overhead added by the
>>> abstraction but in my own experience its totally worth it.
>>>
>>> Almost two years ago we decided to jump into the beam wagon, by first
>>> deploying into an on-premises hadoop cluster with the Spark engine (just
>>> because spark was already available and we didn't want to introduce a new
>>> stack in our hadoop cluster), then we moved to a Flink cluster (due to
>>> others reason) and few months later we moved 90% of our streaming
>>> processing to Dataflow (in order to migrate the on-premises cluster to the
>>> cloud), all that wouldn't have been possible without the beam abstraction.
>>>
>>> In conclusion beam abstraction rocks, it's not perfect, but it's really
>>> good.
>>>
>>> Just my 2 cents.
>>>
>>> Matt Casters  schrieb am Mo., 6. Mai 2019, 15:33:
>>>
>>>> I've dealt with responses like this for a number of decades.  With
>>>> Kettle Beam I could say: "here, in 20 minutes of visual programming you
>>>> have your pipeline up and running".  It's easy to set up, maintain, debug,
>>>> unit test, version control... the whole thing. And then someone would say:
>>>> Naaah, if I don't code it myself I don't trust it.  Usually it's worded
>>>> differently but that's what it comes down to.
>>>> Some people think in terms of impossibilities instead of possibilities
>>>> and will always find some reason why they fall in that 0.1% of the cases.
>>>>
>>>> > Lets say Beam came up with the abstractions long before other runners
>>>> but to map things to runners it is going to take time (that's where things
>>>> are today). so its always a moving target.
>>>>
>>>> Any scaleable data processing problem you might have that can't be
>>>> solved by Spark, Flink or DataFlow is pretty obscure don't you think?
>>>>
>>>> Great discussion :-)
>>>>
>>>> Cheers,
>>>> Matt
>>>> ---
>>>> Matt Casters attcast...@gmail.com>
>>>> Senior Solution Architect, Kettle Project Founder
>>>>
>>>>
>>>>
>>>> Op zo 5 mei 2019 om 00:18 schreef kant kodali :
>>>>
>>>>> I believe this comes down to more of abstractions vs execution engines
>>>>> and I am sure people can take on both sides. I think both are important
>>>>> however It is worth noting that the execution framework themselves have a
>>>>> lot of abstractions but sure more generic ones can be built on top. Are
>>>>> abstractions always good?! I will just point to this book
>>>>> <https://www.amazon.com/Philosophy-Software-Design-John-Ousterhout/dp/1732102201/ref=sr_1_1?keywords=john+ousterhout+book=1557008185=gateway=8-1>
>>>>>
>>>>> I tend to lean more on the execution engines side because I can build
>>>>> something on top. I am also not sure if Beam is the first one to come up
>>>>> with these ideas since Frameworks like Cascading had existed long before.
>>>>>
>>>>> Lets say Beam came up with the abstractions long before other runners
>>>>> but to map things to runne

Re: kafka client interoperability

2019-05-02 Thread Juan Carlos Garcia
Downgrade only the KafkaIO module to the version that works for you (also
excluding any transient dependency of it) that works for us.

JC.

Lukasz Cwik  schrieb am Do., 2. Mai 2019, 20:05:

> +dev 
>
> On Thu, May 2, 2019 at 10:34 AM Moorhead,Richard <
> richard.moorhe...@cerner.com> wrote:
>
>> In Beam 2.9.0, this check was made:
>>
>>
>> https://github.com/apache/beam/blob/2ba00576e3a708bb961a3c64a2241d9ab32ab5b3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java#L132
>>
>> However this logic was removed in 2.10+ in the newer ProducerRecordCoder
>> class:
>>
>>
>> https://github.com/apache/beam/blob/release-2.10.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java#L137
>>
>>
>> We are attempting to use Beam 2.10 with kafka 0.10.2.1; this is
>> advertised as supported here:
>>
>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>>
>> However we are experiencing issues with the `headers` method call
>> mentioned above. Is there a way around this?
>>
>>
>>
>>
>> CONFIDENTIALITY NOTICE This message and any included attachments are from
>> Cerner Corporation and are intended only for the addressee. The information
>> contained in this message is confidential and may constitute inside or
>> non-public information under international, federal, or state securities
>> laws. Unauthorized forwarding, printing, copying, distribution, or use of
>> such information is strictly prohibited and may be unlawful. If you are not
>> the addressee, please promptly delete this message and notify the sender of
>> the delivery error by e-mail or you may call Cerner's corporate offices in
>> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>>
>


Re: Transform a PCollection> into PCollection (Java)

2019-05-01 Thread Juan Carlos Garcia
It looks correct to me, you can try adding a log4j.properties in your test
resources in  order to have some output in your  console while run it from
within the IDE.

Are you sure you can access the Kafka from your machine?

JC

On Wed, May 1, 2019 at 5:27 PM Andres Angel 
wrote:

> Let me update my code guys, here is the new version:
> https://pastebin.com/UdT4D7VW
>
>
>
> On Wed, May 1, 2019 at 11:10 AM Juan Carlos Garcia 
> wrote:
>
>> Hi Andres,
>>
>> You are missing the call to pipeline method *run()*
>>
>> JC
>>
>> On Wed, May 1, 2019 at 4:35 PM Andres Angel <
>> ingenieroandresan...@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> Guys I'm trying to consume a kafka topic within my job pipeline, the
>>> main idea is firs read the payload from the kafka topic using KafkaIO, this
>>> read will return a PCollection> then I want to
>>> turn this into a PCollection> where later I might be able
>>> to read the V and turn it into a PCollection.
>>>
>>> I have designed a first sample code : https://pastebin.com/UdT4D7VW and
>>> at the line 18 I added a print hoping visualize the value payload in order
>>> to create later my final PCollection, however when I run the
>>> pipeline it doesnt print anything so I dont know if is really even
>>> consuming my topic.
>>>
>>> Question:
>>> * How should I debug this to know where I'm? I'm using IntelliJ , Java 8
>>> and maven 3
>>> * Am I right on my assumptions to read
>>> the PCollection> into PCollection ?
>>> * There is any other better way?
>>>
>>> thanks so much
>>> AU
>>>
>>
>>
>> --
>>
>> JC
>>
>>

-- 

JC


Re: Transform a PCollection> into PCollection (Java)

2019-05-01 Thread Juan Carlos Garcia
Hi Andres,

You are missing the call to pipeline method *run()*

JC

On Wed, May 1, 2019 at 4:35 PM Andres Angel 
wrote:

> Hello everyone,
>
> Guys I'm trying to consume a kafka topic within my job pipeline, the main
> idea is firs read the payload from the kafka topic using KafkaIO, this read
> will return a PCollection> then I want to turn
> this into a PCollection> where later I might be able to
> read the V and turn it into a PCollection.
>
> I have designed a first sample code : https://pastebin.com/UdT4D7VW and
> at the line 18 I added a print hoping visualize the value payload in order
> to create later my final PCollection, however when I run the
> pipeline it doesnt print anything so I dont know if is really even
> consuming my topic.
>
> Question:
> * How should I debug this to know where I'm? I'm using IntelliJ , Java 8
> and maven 3
> * Am I right on my assumptions to read
> the PCollection> into PCollection ?
> * There is any other better way?
>
> thanks so much
> AU
>


-- 

JC


Re: GroupByKey and SortValues Transformation

2019-04-21 Thread Juan Carlos Garcia
Hi Kenn, thanks for replying, it really clear things up.

I checked my code and indeed i found some issues.

JC

On Fri, Apr 19, 2019 at 8:54 PM Kenneth Knowles  wrote:

> One important clarification: After GroupByKey, the type of the elements
> flowing through SortValues and ParDo(BusinessEnrichment) will be KV Iterable>. So these are not bundles, but single elements, and an
> element is atomic. The SortValues transform will operate on each element
> and make sure the Iterable is sorted. An Iterable is fundamentally
> sequential and to Beam it is indivisible.
>
> If you have default triggering, you will see one output per key & window.
> There are no ordering restrictions between these, as event time is
> independent from processing time.
>
> If you have configured triggers for your GroupByKey then you will see
> multiple output elements per key & window. For a given key & window the
> outputs contain a "paneIndex" in their PaneInfo metadata that tells you the
> order of they were output. Again, for different key & window there are no
> ordering restrictions.
>
> Kenn
>
>
> On Thu, Apr 18, 2019 at 11:05 PM Juan Carlos Garcia 
> wrote:
>
>> Hi Folks,
>>
>> I have question regarding *GroupBy* and *SortValue* (via SecondaryKey),
>>
>> The pipeline looks like:
>>
>> ...source+initialTransformations
>> .apply(Window.Into(FixedWindows.of(Duration.standardMinutes(10
>> .apply("GroupByKey", GroupByKey.create())  *// using my primaryKey
>> (userId)*
>> .apply("SortValues", SortValues.create(BufferedExternalSorter.options())) *//
>> My Secondary Key is the timestamp of the incoming event*
>> .apply("Enrichment", ParDo.of(new *BusinessEnrichment*())) *// i
>> receives a KV>>*
>> ..SinkToBigQuery:
>>
>> 1. Is it guarantee that my *BusinessEnrichment *will hold all the data
>> grouped, sorted in on single *machine* and that bundles of the same keys
>> will not be parallelize during auto scaling (i am running on top of
>> Dataflow) ?
>>
>> I expect a parallel computation on different keys (users), but the
>> bundles within the grouped key (userId) to be treated sequentially inside
>> *BusinessEnrichment*, is that correct?
>>
>> I am asking due to an observation of mixed results which could be due to
>> a bug on my code or parallel bundles computations within the same key, my
>> expectation is to have a sequential processing after *grouping + sorting*,
>> for example having a userId with 1000 sorted events i expect the processing
>> as the following sequence:
>>
>>  userId(1) with bundleA (0..250) -> userId(1) with bundleB (251..500)
>> -> userId(1) with bundleC (501..750) -> userId(1) with bundleD (751..1000)
>>
>> Please advise if my assumption is wrong.
>>
>> Thanks and regards
>>
>> --
>>
>> JC
>>
>>

-- 

JC


GroupByKey and SortValues Transformation

2019-04-19 Thread Juan Carlos Garcia
Hi Folks,

I have question regarding *GroupBy* and *SortValue* (via SecondaryKey),

The pipeline looks like:

...source+initialTransformations
.apply(Window.Into(FixedWindows.of(Duration.standardMinutes(10
.apply("GroupByKey", GroupByKey.create())  *// using my primaryKey (userId)*
.apply("SortValues", SortValues.create(BufferedExternalSorter.options())) *//
My Secondary Key is the timestamp of the incoming event*
.apply("Enrichment", ParDo.of(new *BusinessEnrichment*())) *// i receives a
KV>>*
..SinkToBigQuery:

1. Is it guarantee that my *BusinessEnrichment *will hold all the data
grouped, sorted in on single *machine* and that bundles of the same keys
will not be parallelize during auto scaling (i am running on top of
Dataflow) ?

I expect a parallel computation on different keys (users), but the bundles
within the grouped key (userId) to be treated sequentially inside
*BusinessEnrichment*, is that correct?

I am asking due to an observation of mixed results which could be due to a
bug on my code or parallel bundles computations within the same key, my
expectation is to have a sequential processing after *grouping + sorting*,
for example having a userId with 1000 sorted events i expect the processing
as the following sequence:

 userId(1) with bundleA (0..250) -> userId(1) with bundleB (251..500)
-> userId(1) with bundleC (501..750) -> userId(1) with bundleD (751..1000)

Please advise if my assumption is wrong.

Thanks and regards

-- 

JC


Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

2019-03-20 Thread Juan Carlos Garcia
I would recommend going to the compute engine service and check the vm
where the pipeline is working, from there you might have more insight if
you have a bottleneck on your pipeline (cpu, io, network) that is
preventing to process it faster.



Maulik Gandhi  schrieb am Mi., 20. März 2019, 20:15:

> How big is your bounded-source
> - 16.01 GiB total data from AVRO files.  But it can be b/w 10-100s of GBs
>
> How much pressure (messages per seconds) your unbounded source is
> receiving?
> -  Initially no pressure, to prime the Beam state, but later there will be
> data flowing through PubSub.
>
> I also add a parameter on mvn command, as below and could get 7 (105/15)
> worker, as per guide:
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling
>
> mvn compile exec:java -pl 
> -Dexec.mainClass= \
> -Dexec.args="--runner=DataflowRunner --project= \
>  --stagingLocation=gs:// \
>  --maxNumWorkers=105 \
>  --autoscalingAlgorithm=THROUGHPUT_BASED \
>  --templateLocation=gs://"
>
> Even though I got 7 worker nodes, when processing GCS data (bounded
> source) and adding it to Beam state, I think the work was just being
> performed on 1 node, as it took more than 16+ hours.
>
> Can someone point me to documentation, on how to figure out how much data
> is being processed by each worker node (like reading GCS part AVRO files,
> input counts, etc), rather than just high-level count of input and output
> element from ParDo.
>
> Thanks.
> - Maulik
>
> On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia 
> wrote:
>
>> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only
>> when it feels the pipeline is not able to keep it up with the incoming
>> source. How big is your bounded-source and how much pressure (messages per
>> seconds) your unbounded source is receiving?
>>
>> Maulik Gandhi  schrieb am Di., 19. März 2019, 21:06:
>>
>>> Hi Juan,
>>>
>>> Thanks for replying.  I believe I am using correct configurations.
>>>
>>> I have posted more details with code snippet and Data Flow job template
>>> configuration on Stack Overflow post:
>>> https://stackoverflow.com/q/55242684/11226631
>>>
>>> Thanks.
>>> - Maulik
>>>
>>> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia 
>>> wrote:
>>>
>>>> Hi Maulik,
>>>>
>>>> Have you submitted your job with the correct configuration to enable
>>>> autoscaling?
>>>>
>>>> --autoscalingAlgorithm=
>>>> --maxWorkers=
>>>>
>>>> I am on my phone right now and can't tell if the flags name are 100%
>>>> correct.
>>>>
>>>>
>>>> Maulik Gandhi  schrieb am Di., 19. März 2019, 18:13:
>>>>
>>>>>
>>>>> Maulik Gandhi 
>>>>> 10:19 AM (1 hour ago)
>>>>> to user
>>>>> Hi Beam Community,
>>>>>
>>>>> I am working on Beam processing pipeline, which reads data from the
>>>>> non-bounded and bounded source and want to leverage Beam state management
>>>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>>>> data in key-value (eg: KV.  As I am reading data from the
>>>>> non-bounded and bounded source, I am forced to perform Window + 
>>>>> Triggering,
>>>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>>>
>>>>> I am able to kick-off the Data Flow job, which would run my Beam
>>>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to 
>>>>> perform
>>>>> the work, and would not scale the job to use more worker nodes, thus not
>>>>> leveraging the benefit of distributed processing.
>>>>>
>>>>> I have posted the question on Stack Overflow:
>>>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling
>>>>>  but
>>>>> reaching out on the mailing list, to get some help, or learn what I
>>>>> am missing.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks.
>>>>> - Maulik
>>>>>
>>>>


Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

2019-03-20 Thread Juan Carlos Garcia
Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only when
it feels the pipeline is not able to keep it up with the incoming source.
How big is your bounded-source and how much pressure (messages per seconds)
your unbounded source is receiving?

Maulik Gandhi  schrieb am Di., 19. März 2019, 21:06:

> Hi Juan,
>
> Thanks for replying.  I believe I am using correct configurations.
>
> I have posted more details with code snippet and Data Flow job template
> configuration on Stack Overflow post:
> https://stackoverflow.com/q/55242684/11226631
>
> Thanks.
> - Maulik
>
> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia 
> wrote:
>
>> Hi Maulik,
>>
>> Have you submitted your job with the correct configuration to enable
>> autoscaling?
>>
>> --autoscalingAlgorithm=
>> --maxWorkers=
>>
>> I am on my phone right now and can't tell if the flags name are 100%
>> correct.
>>
>>
>> Maulik Gandhi  schrieb am Di., 19. März 2019, 18:13:
>>
>>>
>>> Maulik Gandhi 
>>> 10:19 AM (1 hour ago)
>>> to user
>>> Hi Beam Community,
>>>
>>> I am working on Beam processing pipeline, which reads data from the
>>> non-bounded and bounded source and want to leverage Beam state management
>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>> data in key-value (eg: KV.  As I am reading data from the
>>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>
>>> I am able to kick-off the Data Flow job, which would run my Beam
>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>>> the work, and would not scale the job to use more worker nodes, thus not
>>> leveraging the benefit of distributed processing.
>>>
>>> I have posted the question on Stack Overflow:
>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling
>>>  but
>>> reaching out on the mailing list, to get some help, or learn what I
>>> am missing.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks.
>>> - Maulik
>>>
>>


Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

2019-03-19 Thread Juan Carlos Garcia
Hi Maulik,

Have you submitted your job with the correct configuration to enable
autoscaling?

--autoscalingAlgorithm=
--maxWorkers=

I am on my phone right now and can't tell if the flags name are 100%
correct.


Maulik Gandhi  schrieb am Di., 19. März 2019, 18:13:

>
> Maulik Gandhi 
> 10:19 AM (1 hour ago)
> to user
> Hi Beam Community,
>
> I am working on Beam processing pipeline, which reads data from the
> non-bounded and bounded source and want to leverage Beam state management
> in my pipeline.  For putting data in Beam state, I have to transfer the
> data in key-value (eg: KV.  As I am reading data from the
> non-bounded and bounded source, I am forced to perform Window + Triggering,
> before grouping data by key.  I have chosen to use GlobalWindows().
>
> I am able to kick-off the Data Flow job, which would run my Beam
> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
> the work, and would not scale the job to use more worker nodes, thus not
> leveraging the benefit of distributed processing.
>
> I have posted the question on Stack Overflow:
> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling
>  but
> reaching out on the mailing list, to get some help, or learn what I
> am missing.
>
> Any help would be appreciated.
>
> Thanks.
> - Maulik
>


Re: FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
Hi Udi,

Thanks for that, looking forward for the 2.11 release.

JC

Udi Meiri  schrieb am Mi., 20. Feb. 2019, 22:26:

> Hi Juan!
>
> I've recently replaced GCS copy with rewrite:
> https://github.com/apache/beam/pull/7682
> It should be available in the next Beam release (2.11).
>
> On Wed, Feb 20, 2019 at 7:43 AM Juan Carlos Garcia 
> wrote:
>
>> Sorry, i hit send before verifying the right name of the method:
>>
>> The correct method name is: *enqueueCopy*
>>
>> On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia 
>> wrote:
>>
>>> For anyone interested on the same while waiting for KMS support, just
>>> place the class on your own project
>>> *org.apache.beam.sdk.util.GcsUtil *
>>>
>>> Look / modify the *enqueCopy *method and replace the 
>>> *storageClient.objects().copy()
>>> *with a *storageClient.objects().rewrite() , add the corresponding
>>> callback *and it should works as expected.
>>>
>>> Cheers!
>>>
>>>
>>> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
>>> wrote:
>>>
>>>> Hi Folks,
>>>>
>>>> Is there any discussion going on regarding the support to writes to GCP
>>>> bucket protected with KMS ?
>>>>
>>>> Thanks and regards,
>>>> --
>>>>
>>>> JC
>>>>
>>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>


Re: FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
Sorry, i hit send before verifying the right name of the method:

The correct method name is: *enqueueCopy*

On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia 
wrote:

> For anyone interested on the same while waiting for KMS support, just
> place the class on your own project
> *org.apache.beam.sdk.util.GcsUtil *
>
> Look / modify the *enqueCopy *method and replace the 
> *storageClient.objects().copy()
> *with a *storageClient.objects().rewrite() , add the corresponding
> callback *and it should works as expected.
>
> Cheers!
>
>
> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
> wrote:
>
>> Hi Folks,
>>
>> Is there any discussion going on regarding the support to writes to GCP
>> bucket protected with KMS ?
>>
>> Thanks and regards,
>> --
>>
>> JC
>>
>>
>
> --
>
> JC
>
>

-- 

JC


Re: FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
For anyone interested on the same while waiting for KMS support, just place
the class on your own project
*org.apache.beam.sdk.util.GcsUtil *

Look / modify the *enqueCopy *method and replace the
*storageClient.objects().copy()
*with a *storageClient.objects().rewrite() , add the corresponding
callback *and it should works as expected.

Cheers!


On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
wrote:

> Hi Folks,
>
> Is there any discussion going on regarding the support to writes to GCP
> bucket protected with KMS ?
>
> Thanks and regards,
> --
>
> JC
>
>

-- 

JC


FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
Hi Folks,

Is there any discussion going on regarding the support to writes to GCP
bucket protected with KMS ?

Thanks and regards,
-- 

JC


Re: Dealing with "large" checkpoint state of a Beam pipeline in Flink

2019-02-12 Thread Juan Carlos Garcia
I forgot to mention that we uses hdfs as storage for checkpoint /
savepoint.

Juan Carlos Garcia  schrieb am Di., 12. Feb. 2019,
18:03:

> Hi Tobias,
>
> I think this can happen when there is a lot of backpressure on the
> pipeline.
>
> Don't know if it's normal but i have a pipeline reading from KafkaIO and
> pushing to bigquery instreaming mode and i have seen checkpoint of almost
> 1gb and whenever i am doing a savepoint for updating the pipeline it can
> goes up to 8 GB of data on a savepoint.
>
> I am on Flink 1.5.x, on premises also using Rockdb and incremental.
>
> So far my only solutionto avoid errors while checkpointing or savepointing
> is to make sure the checkpoint Timeout is high enough like 20m or 30min.
>
>
> Kaymak, Tobias  schrieb am Di., 12. Feb. 2019,
> 17:33:
>
>> Hi,
>>
>> my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
>> configured with FILE_LOADS as output. What bothers me is that even if I
>> configure in my Flink 1.6 configuration
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>>
>> I see states that are as big as 230 MiB and checkpoint timeouts, or
>> checkpoints that take longer than 10 minutes to complete (I just saw one
>> that took longer than 30 minutes).
>>
>> Am I missing something? Is there some room for improvement? Should I use
>> a different storage backend for the checkpoints? (Currently they are stored
>> on GCS).
>>
>> Best,
>> Tobi
>>
>


Re: Dealing with "large" checkpoint state of a Beam pipeline in Flink

2019-02-12 Thread Juan Carlos Garcia
Hi Tobias,

I think this can happen when there is a lot of backpressure on the
pipeline.

Don't know if it's normal but i have a pipeline reading from KafkaIO and
pushing to bigquery instreaming mode and i have seen checkpoint of almost
1gb and whenever i am doing a savepoint for updating the pipeline it can
goes up to 8 GB of data on a savepoint.

I am on Flink 1.5.x, on premises also using Rockdb and incremental.

So far my only solutionto avoid errors while checkpointing or savepointing
is to make sure the checkpoint Timeout is high enough like 20m or 30min.


Kaymak, Tobias  schrieb am Di., 12. Feb. 2019,
17:33:

> Hi,
>
> my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
> configured with FILE_LOADS as output. What bothers me is that even if I
> configure in my Flink 1.6 configuration
>
> state.backend: rocksdb
> state.backend.incremental: true
>
> I see states that are as big as 230 MiB and checkpoint timeouts, or
> checkpoints that take longer than 10 minutes to complete (I just saw one
> that took longer than 30 minutes).
>
> Am I missing something? Is there some room for improvement? Should I use a
> different storage backend for the checkpoints? (Currently they are stored
> on GCS).
>
> Best,
> Tobi
>


Re: Some questions about ensuring correctness with windowing and triggering

2019-02-12 Thread Juan Carlos Garcia
In my experience ordering is not guaranteed, you may need apply a
transformation that sort the elements and then dispatch them sorted out.

Or uses the Sorter extension for this:

https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter

Steve Niemitz  schrieb am Di., 12. Feb. 2019, 16:31:

> Hi everyone, I have some questions I want to ask about how windowing,
> triggering, and panes work together, and how to ensure correctness
> throughout a pipeline.
>
> Lets assume I have a very simple streaming pipeline that looks like:
> Source -> CombineByKey (Sum) -> Sink
>
> Given fixed windows of 1 hour, early firings every minute, and
> accumulating panes, this is pretty straight forward.  However, this can get
> more complicated if we add steps after the CombineByKey, for instance
> (using the same windowing strategy):
>
> Say I want to buffer the results of the CombineByKey into batches of N
> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
> now my pipeline looks like:
>
> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>
> *This leads to my main question:*
> Is ordering preserved somehow here?  ie: is it possible that the result
> from early firing F+1 now comes BEFORE the firing F (because it was
> re-ordered in the GroupIntoBatches).  This would mean that the sink then
> gets F+1 before F, which means my resulting store has incorrect data
> (possibly forever if F+1 was the final firing).
>
> If ordering is not preserved, it seems as if I'd need to introduce my own
> ordering back in after GroupIntoBatches.  GIB is an example here, but I
> imagine this could happen with any GBK type operation.
>
> Am I thinking about this the correct way?  Thanks!
>
> [1]
> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>


Re: How to call Oracle stored proc in JdbcIO

2019-02-05 Thread Juan Carlos Garcia
I believe this is not a missing feature, as the question is more inclined
toward what do you expect from this procedure?

Like reading back a ref cursor into a PCollection, or just doing an insert
/ update via the sp.

Going forward on the jdbc realm you just need to create a prepared
statement with something like this:

{call procedure_name(?, ?, ?)}

But then question is what do you expect from it?

BTW JdbcIO is just a very simple ParDo which you can create your own when
dealing with anything special from oracle.

Best regards

JC



Am Di., 5. Feb. 2019, 23:03 hat Rui Wang  geschrieben:

> Assuming this is a missing feature. Created
> https://jira.apache.org/jira/browse/BEAM-6525 to track it.
>
> -Rui
>
> On Fri, Jan 25, 2019 at 10:35 AM Rui Wang  wrote:
>
>> Hi Community,
>>
>> There is a stackoverflow question [1] asking how to call Oracle stored
>> proc in Beam via JdbcIO. I know very less on JdbcIO and Oracle, so just
>> help ask here to say if anyone know: does JdbcIO support call stored proc?
>>
>> If there is no such support, I will create a JIRA for it and reply to the
>> question.
>>
>>  -Rui
>>
>> [1]:
>> https://stackoverflow.com/questions/54364783/how-to-call-an-oracle-stored-proc-in-apache-beam
>>
>


Re: Spark: No TransformEvaluator registered

2019-01-29 Thread Juan Carlos Garcia
Hi Matt,

Are the META-INF/services files merged correctly on the fat-jar?



On Tue, Jan 29, 2019 at 2:33 PM Matt Casters  wrote:

> Hi Beam!
>
> After I have my pipeline created and the execution started on the Spark
> master I get this strange nugget:
>
> java.lang.IllegalStateException: No TransformEvaluator registered for
> BOUNDED transform Read(CompressedSource)
>
> I'm reading from a HDFS file (hdfs:///input/customers-noheader-1M.txt)
> It's the exact same file and pipeline I've been testing on local files in
> the direct runner and on Google Storage in DataFlow without a problem.
> What could I have missed in the Spark case?
>
> Thanks again for any suggestions!
>
> Matt
> ---
> Matt Casters attcast...@gmail.com>
> Senior Solution Architect, Kettle Project Founder
>
>

-- 

JC


Re: Spark progress feedback

2019-01-28 Thread Juan Carlos Garcia
Matt is the machine from where you are launching the pipeline different
from where it should run?

If that's the case make sure the machine used for launching has all the
hdfs environments variable set, as the pipeline is being configured in the
launching machine before it hit the worker machine.

Good luck
JC


Am Mo., 28. Jan. 2019, 13:34 hat Matt Casters 
geschrieben:

> Dear Beam friends,
>
> In preparation for my presentation of the Kettle Beam work in London next
> week I've been trying to get Spark Beam to run which worked in the end.
> The problem that resurfaced is however ... once again... back with a
> vengeance :
>
> java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
>
>
> I configured HADOOP_HOME, HADOOP_CONF_DIR, ran
> FileSystems.FileSystems.setDefaultPipelineOptions(pipelineOptions), tried
> every trick in the book (very few of those are to be found) but it's a
> fairly brutal trial-and-error process.
>
> Given the fact that I'm not the only person hitting these issues I think
> it would be a good idea to allow for some sort of feedback of the
> FileSystems loading process, which filesystems it tries to load, which fail
> and so on.
> Also, the maven library situation is a bit fuzzy in the sense that there
> are libraries like beam-sdks-java-io-hdfs on a point release (0.6.0) as
> well as beam-sdks-java-io-hadoop-file-system on the latest version.
>
> I've been expanding my trial-and-error pattern to the endpoint and are
> ready to give up on Beam-on-Spark.  I could try to get a Spark test
> environment configured for s3:// but I don't think it's all that
> representative of real-world scenarios.
>
> Thanks anyway in advance for any suggestions,
>
> Matt
> ---
> Matt Casters attcast...@gmail.com>
> Senior Solution Architect, Kettle Project Founder
>
>
>


Re: kafkaIO Consumer Rebalance with Spark Runner

2019-01-28 Thread Juan Carlos Garcia
Hi Rick,

You can limit your Spark processing by passing the following option to your
beam pipeline:

*MaxRecordsPerBatch*

see
https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/runners/spark/SparkPipelineOptions.html#getMaxRecordsPerBatch--

Hope it helps.

JC




On Mon, Jan 28, 2019 at 10:57 AM  wrote:

> Dear Raghu,
>
>
>
> I add the line: “PCollection reshuffled =
> windowKV.apply(Reshuffle.viaRandomKey());” in my program.
>
>
>
> I tried to control the streaming data size: 100,000/1sec to decrease the
> processing time.
>
>
>
> The following settings are used for my project.
>
>
>
> 1.  One topic / 2 partitions
>
> 2.  Two workers / two executors
>
>
>
> 3.  The spark-default setting is:
>
> spark.executor.instances=2
>
> spark.executor.cores=4
>
> spark.executor.memory=2048m
>
> spark.default.parallelism=200
>
>
>
> spark.streaming.blockInterval=50ms
>
> spark.streaming.kafka.maxRatePerPartition=50,000
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.concurrentJobs = 1
>
> spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
>
> spark.executor.extraJavaOptions=-Xss100M
>
>
>
> spark.shuffle.consolidateFiles=true
>
> spark.streaming.unpersist=true
>
> spark.streaming.stopGracefullyOnShutdown=true
>
>
>
> I hope that the data size is controlled at 100,000.
>
>
>
> Here,
>
>
>
> The data size is always over 100,000. The setting of
> “spark.streaming.kafka.maxRatePerPartition” confused me.
>
>
>
> That does not seem to work for me.
>
>
>
> Rick
>
>
>
> *From:* Raghu Angadi [mailto:ang...@gmail.com]
> *Sent:* Saturday, January 26, 2019 3:06 AM
> *To:* user@beam.apache.org
> *Subject:* Re: kafkaIO Consumer Rebalance with Spark Runner
>
>
>
> You have 32 partitions. Reading can not be distributed to more than 32
> parallel tasks.
>
> If you have a log of processing for each record after reading, you can
> reshuffle the messages before processing them, that way the processing
> could be distributed to more tasks. Search for previous threads about
> reshuffle in Beam lists.
>
>
>
> On Thu, Jan 24, 2019 at 7:23 PM  wrote:
>
> Dear all,
>
>
>
> I am using the kafkaIO sdk in my project (Beam with Spark runner).
>
>
>
> The problem about task skew is shown as the following figure.
>
>
>
> My running environment is:
>
> OS: Ubuntn 14.04.4 LTS
>
> The version of related tools is:
>
> java version: "1.8.0_151"
>
> Beam version: 2.9.0 (Spark runner with Standalone mode)
>
> Spark version: 2.3.1 Standalone mode
>
> Execution condition:
>
> Master/Driver node: ubuntu7
>
> Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
>
> The number of executors is 8
>
>
>
> Kafka Broker: 2.10-0.10.1.1
>
> Broker node at ubuntu7
>
> Kafka Client:
>
> The topic: kafkasink32
>
> kafkasink32 Partitions: 32
>
>
>
> The programming of my project for kafkaIO SDK is as:
>
>
> ==
>
> Map map = ImmutableMap.*builder*()
>
>.put("group.id", (Object)"test-consumer-group")
>
>.build();
>
> List topicPartitions = *new** ArrayList()*;
>
>*for*(*int* i = 0; i < 32; i++) {
>
>  topicPartitions.add(*new* TopicPartition(
> "kafkasink32",i));
>
> }
>
> PCollection> readKafkaData = p.apply(KafkaIO. String>*read*()
>
>  .withBootstrapServers("ubuntu7:9092")
>
>.updateConsumerProperties(map)
>
>.withKeyDeserializer(LongDeserializer.*class*)
>
>.withValueDeserializer(StringDeserializer.*class*)
>
>.withTopicPartitions(topicPartitions)
>
>.withoutMetadata()
>
>);
>
>
> ==
>
> Here I have two directions to solve this problem:
>
>
>
> 1.  Using the following sdk from spark streaming
>
>
> https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
>
> LocationStrategies.PreferConsistent: Use in most cases as it consistently
> distributes partitions across all executors.
>
>
>
> If we would like to use this feature, we have not idea to set this in
> kafkaIO SDK.
>
>
>
> 2.  Setting the related configurations of kafka to perform the
> consumer rebalance
>
> set consumer group? Set group.id?
>
>
>
> If we need to do No2., could someone give me some ideas to set
> configurations?
>
>
>
> If anyone provides any direction to help us to overcome this problem, we
> would appreciate it.
>
>
>
> Thanks.
>
>
>
> Sincerely yours,
>
>
>
> Rick
>
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>


-- 

JC


Re: Spark

2019-01-18 Thread Juan Carlos Garcia
Hi Matt,

With flink you will be able launch your pipeline just by invoking the main
method of your main class, however it will run as standalone process and
you will not have the advantage of distribute computation.

Am Fr., 18. Jan. 2019, 09:37 hat Matt Casters 
geschrieben:

> Thanks for the reply JC, I really appreciate it.
>
> I really can't force our users to use antiquated stuff like scripts, let
> alone command line things, but I'll simply use SparkLauncher and your
> comment about the main class doing Pipeline.run() on the Master is
> something I can work with... somewhat.
> The execution results, metrics and all that are handled the Master I
> guess.  Over time I'll figure out a way to report the metrics and results
> from the master back to the client.  I've done similar things with
> Map/Reduce in the past.
>
> Looking around I see that the same conditions apply for Flink.  Is this
> because Spark and Flink lack the APIs to talk to a client about the state
> of workloads unlike DataFlow and the Direct Runner?
>
> Thanks!
>
> Matt
> ---
> Matt Casters attcast...@gmail.com>
> Senior Solution Architect, Kettle Project Founder
>
>
>
>
> Op do 17 jan. 2019 om 15:30 schreef Juan Carlos Garcia <
> jcgarc...@gmail.com>:
>
>> Hi Matt, during the time we were using Spark with Beam, the solution was
>> always to pack the jar and use the spark-submit command pointing to your
>> main class which will do `pipeline.run`.
>>
>> The spark-submit command have a flag to decide how to run it
>> (--deploy-mode), whether to launch the job on the driver machine or in one
>> of the machine in the cluster.
>>
>>
>> JC
>>
>>
>> On Thu, Jan 17, 2019 at 10:00 AM Matt Casters 
>> wrote:
>>
>>> Dear Beam friends,
>>>
>>> Now that I've got cool data integration (Kettle-beam) scenarios running
>>> on DataFlow with sample data sets in Google (Files, Pub/Sub, BigQuery,
>>> Streaming, Windowing, ...) I thought it was time to also give Apache Spark
>>> some attention.
>>>
>>> The thing I have some trouble with it figuring out what the relationship
>>> is between the runner (SparkRunner), Pipeline.run() and spark-submit (or
>>> SparkLauncher).
>>>
>>> The samples I'm seeing mostly involve packaging up a jar file and then
>>> doing a spark-submit.  That obviously makes it unclear if Pipeline.run()
>>> should be used at all and how Metrics should be obtained from a Spark job
>>> during execution or after completion.
>>>
>>> I really like the way the GCP DataFlow implementation automatically
>>> deploys jar file binaries and from what I can
>>> determine org.apache.spark.launcher.SparkLauncher offers this functionality
>>> so perhaps I'm either doing something wrong or I'm reading the docs wrong
>>> or the wrong docs.
>>> The thing is, if you try running your pipelines against a Spark master
>>> feedback is really minimal putting you in a trial & error situation pretty
>>> quickly.
>>>
>>> So thanks again in advance for any help!
>>>
>>> Cheers,
>>>
>>> Matt
>>> ---
>>> Matt Casters attcast...@gmail.com>
>>> Senior Solution Architect, Kettle Project Founder
>>>
>>>
>>
>> --
>>
>> JC
>>
>>


Re: KafkaIO not commiting offsets when using withMaxReadTime

2019-01-09 Thread Juan Carlos Garcia
Just for you to have a look where this happen:

https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584

Cheers

On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia 
wrote:

> I also experience the same, as per the documentation **withMaxReadTime**
> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess
> is beyond the scope of the current KafkaIO to behave as Bounded with offset
> management or just something is missing in the current implementation
> (Watermarking).
>
>
>
> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
> andre.missag...@arquivei.com.br> wrote:
>
>> Hello everyone,
>>
>> I need to do some batch processing that uses messages in a Kafka topic.
>> So I tried the "withMaxReadTime" KafkaIO setting:
>>
>> ---
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "...")
>> properties.setProperty("group.id", "mygroup")
>> properties.setProperty("sasl.jaas.config", "...")
>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>> properties.setProperty("enable.auto.commit", "false")
>>
>> sc.customInput("Read From Kafka",
>>   KafkaIO
>> .read[String, String]()
>> .withTopic("mytopic")
>> .withKeyDeserializer(classOf[StringDeserializer])
>> .withValueDeserializer(classOf[StringDeserializer])
>> .updateConsumerProperties(properties)
>> .withMaxReadTime(Duration.standardSeconds(20))
>> .withMaxNumRecords(100)
>> .commitOffsetsInFinalize()
>> .withoutMetadata()
>> )
>> .count.debug() // prints something between 1 and 2
>> ---
>> I can see that it was able to read the messages and process them. But in
>> the end, no offset was commited:
>>
>> TOPICPARTITION  CURRENT-OFFSET
>> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
>> mytopic 0
>> 0   3094751 3094751 -
>> -   -
>>
>> But it is a strange behavior: sometimes it commits the offset, sometimes
>> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>
>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>
>> Best Regards,
>>
>> --
>> *André Badawi Missaglia*
>> Data Engineer
>> (16) 3509-5515 *|* www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds%3Acc=on>
>>
>
>
> --
>
> JC
>
>

-- 

JC


Re: KafkaIO not commiting offsets when using withMaxReadTime

2019-01-09 Thread Juan Carlos Garcia
I also experience the same, as per the documentation **withMaxReadTime**
and **withMaxNumRecords** are mainly used for Demo purposes, so i guess is
beyond the scope of the current KafkaIO to behave as Bounded with offset
management or just something is missing in the current implementation
(Watermarking).



On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
andre.missag...@arquivei.com.br> wrote:

> Hello everyone,
>
> I need to do some batch processing that uses messages in a Kafka topic. So
> I tried the "withMaxReadTime" KafkaIO setting:
>
> ---
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "...")
> properties.setProperty("group.id", "mygroup")
> properties.setProperty("sasl.jaas.config", "...")
> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
> properties.setProperty("enable.auto.commit", "false")
>
> sc.customInput("Read From Kafka",
>   KafkaIO
> .read[String, String]()
> .withTopic("mytopic")
> .withKeyDeserializer(classOf[StringDeserializer])
> .withValueDeserializer(classOf[StringDeserializer])
> .updateConsumerProperties(properties)
> .withMaxReadTime(Duration.standardSeconds(20))
> .withMaxNumRecords(100)
> .commitOffsetsInFinalize()
> .withoutMetadata()
> )
> .count.debug() // prints something between 1 and 2
> ---
> I can see that it was able to read the messages and process them. But in
> the end, no offset was commited:
>
> TOPICPARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
> mytopic 0  0
> 3094751 3094751 -   -   -
>
> But it is a strange behavior: sometimes it commits the offset, sometimes
> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>
> Has anyone used Bounded KafkaIO before? is there anything I can do?
>
> Best Regards,
>
> --
> *André Badawi Missaglia*
> Data Engineer
> (16) 3509-5515 *|* www.arquivei.com.br
> 
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> 
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> 
> 
> 
> 
>


-- 

JC


Re: KafkaIO and added partitions

2019-01-02 Thread Juan Carlos Garcia
+1

Am Mi., 2. Jan. 2019, 14:34 hat Abdul Qadeer 
geschrieben:

> +1
>
> On Tue, 1 Jan 2019 at 12:45,  wrote:
>
>> +1 from my side too :-)
>> And ideally I would want to have some hooks to let me know the extra
>> partitions have been picked up (or a way to query it).
>>
>> Although if that can't be provided I can work around it myself by sending
>> some specific message to the partition that somewhere results in a visible
>> state change in the pipeline.
>>
>> Also, as a quick (semi related) heads up: I will very likely soon
>> contribute a change to the LogAppendTimePolicy so that the idle partition
>> behavior (automatic watermark generation) can be disabled.
>>
>> (of course all related to my streamy-db project)
>>
>> Kind regards,
>> Jan
>>
>>
>> On Tue, 1 Jan 2019 at 08:19, Ramesh Nethi  wrote:
>>
>>> +1 for this capability.  This would enable pipelines to continue to run
>>> when such changes need to be made.
>>>
>>> regards
>>> Ramesh
>>>
>>> On Fri, 23 Nov 2018 at 00:40 Raghu Angadi  wrote:
>>>
 On Thu, Nov 22, 2018 at 10:10 AM Raghu Angadi 
 wrote:

> - New partitions will be ignored during runtime.
> - Update will not succeed either. Error message on the workers should
> explain the mismatch.
>

 This is the current state. Supporting changes to number of partition is
 quite doable if there is enough user interested (even in the current
 UnnoundedSource API framework).

>
> On Thu, Nov 22, 2018 at 2:15 AM Jozef Vilcek 
> wrote:
>
>> Hello,
>> just wanted to check how does Beam KafkaIO behaves when partitions
>> are added to the topic.
>> Will they be picked up or ignored during the runtime?
>> Will they be picked up on restart with state restore?
>>
>> Thanks,
>> Jozef
>>
>


Re: Graceful shutdown of long-running Beam pipeline on Flink

2018-12-02 Thread Juan Carlos Garcia
Hi Wayne,

We have the same setup and we do daily updates to our pipeline.

The way we do it is using the flink tool via a Jenkins.

Basically our deployment job do as follow:

1. Detect if the pipeline is running (it matches via job name)

2. If found, do a flink cancel with a savepoint (we uses hdfs for
checkpoint / savepoint) under a given directory.

3. It uses the flink run command for the new job and specify the savepoint
from step 2.

I don't think there is any support to achieve the same from within the
pipeline. You need to do this externally as explained above.

Best regards,
JC


Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins 
geschrieben:

> Hi all,
> We have a number of Beam pipelines processing unbounded streams sourced
> from Kafka on the Flink runner and are very happy with both the platform
> and performance!
>
> The problem is with shutting down the pipelines...for version upgrades,
> system maintenance, load management, etc. it would be nice to be able to
> gracefully shut these down under software control but haven't been able to
> find a way to do so. We're in good shape on checkpointing and then cleanly
> recovering but shutdowns are all destructive to Flink or the Flink
> TaskManager.
>
> Methods tried:
>
> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
> This would be our preferred method but p.run() doesn't return until
> termination and even if it did, the runner code simply throws:
> "throw new UnsupportedOperationException("FlinkRunnerResult does not
> support cancel.");"
> so this doesn't appear to be a near-term option.
>
> 2) Inject a "termination" message into the pipeline via Kafka
> This does get through, but calling exit() from a stage in the pipeline
> also terminates the Flink TaskManager.
>
> 3) Inject a "sleep" message, then manually restart the cluster
> This is our current method: we pause the data at the source, flood all
> branches of the pipeline with a "we're going down" msg so the stages can do
> a bit of housekeeping, then hard-stop the entire environment and re-launch
> with the new version.
>
> Is there a "Best Practice" method for gracefully terminating an unbounded
> pipeline from within the pipeline or from the mainline that launches it?
>
> Thanks!
> Wayne
>
> --
> Wayne Collinsdades.ca Inc.mailto:wayn...@dades.ca 
> cell:416-898-5137
>
>


Re:

2018-11-29 Thread Juan Carlos Garcia
If you are using Gradle for packaging, make sure your final jar (fat-jar)
contains all the services files merged.

Using the Gradle shadowJar plugin include "*mergeServiceFiles()*"
instruction like:

apply plugin: 'com.github.johnrengelman.shadow'
shadowJar {
mergeServiceFiles()

zip64 true
classifier = 'bundled'
}

If you are using Maven then use the Shade plugin.

On Thu, Nov 29, 2018 at 4:50 PM Robert Bradshaw  wrote:

> BeamJava uses com.google.auto.service.AutoService which, at the end of
> the day, is shorthand for Java's standard ServiceLoader mechanisms
> (e.g. see [1]). I'm not an expert on the details of how this works,
> but you'll probably have to make sure these filesystem dependencies
> are in your custom classloader's jar.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
> On Thu, Nov 29, 2018 at 3:57 PM Matt Casters 
> wrote:
> >
> > Hello Beam,
> >
> > I've been taking great steps forward in having Kettle generate Beam
> pipelines and they actually execute just find in unit testing in IntelliJ.
> > The problem starts when I collect all the libraries needed for Beam and
> the Runners and throw them into the Kettle project as a plugin.
> >
> > Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme gs
> > at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)
> > at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)
> > at org.apache.beam.sdk.io
> .FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:213)
> > at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:700)
> > at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1028)
> > at
> org.kettle.beam.core.transform.BeamOutputTransform.expand(BeamOutputTransform.java:87)
> > ... 32 more
> >
> > This also happens for local file execution ("scheme file" in that case).
> >
> > So the questions are: how is Beam bootstrapped? How does Beam determine
> which libraries to use and what is the recommended way for packaging things
> up properly?
> > The Beam plugin is running in a separate URLClassloader so I think
> something is going awry there.
> >
> > Thanks a lot for any answers or tips you might have!
> >
> > Matt
> > ---
> > Matt Casters 
> > Senior Solution Architect, Kettle Project Founder
> >
> >
>


-- 

JC


Re: Multiple concurrent transforms with SparkRunner

2018-11-13 Thread Juan Carlos Garcia
I suggest to play around with some spark configurations like: dynamic
execution parameters

https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation



Am Mi., 14. Nov. 2018, 02:28 hat Jiayue Zhang (Bravo) 
geschrieben:

> Hi,
>
> I'm writing Java Beam code to run with both Dataflow and Spark. The input
> files are tfrecord format and are from multiple directories. Java
> TFRecordIO doesn't have readAll from list of files so what I'm doing is:
>
> for (String dir: listOfDirs) {
> p.apply(TFRecordIO.read().from(dir))
>  .apply(ParDo.of(new BatchElements()))
>  .apply(ParDo.of(new Process()))
>  .apply(Combine.globally(new CombineResult()))
>  .apply(TextIO.write().to(dir))
> }
>
> These directories are fairly independent and I only need result of each
> directory. When running on Dataflow, processing of these directories happen
> concurrently. But when running with Spark, I saw the spark jobs and stages
> are sequential. It needs finish all steps in one directory before moving to
> next one. What's the way to make multiple transforms run concurrently with
> SparkRunner?
>


Re: Disable a DoFn for a specific runner

2018-11-11 Thread Juan Carlos Garcia
You can use the PipelineOption which can be accessed from within a {@link
DoFn} by invoking getPipelineOptions() on the input DoFn.ProcessContext
Context object, or the same from the StartBundleContext, and from there you
can access to the Runner options you are passing from the command line.

JC

On Sun, Nov 11, 2018 at 12:23 PM Davide Anastasia <
davide.anasta...@cloud-iq.com> wrote:

> Hi,
> I am mostly working with Dataflow using Beam, but I mostly develop using
> the local runner. However, I am interacting with other google service in
> production which I don't want to necessarily interact with during the
> development stage.
>
> I am wondering if there is a way to write something like:
>
> if (runner == "Dataflow") {
>   // do stuff
> } else {
>   // do nothing
> }
>
> inside my PTransform / DoFn or elsewhere in the code.
>
> Is there anyway I can fill the "runner" variable in a consistent way?
>
> Thanks,
> D.
>


-- 

JC


Re: No filesystem found for scheme hdfs - with the FlinkRunner

2018-10-26 Thread Juan Carlos Garcia
Just for everyone to know we figure it out, it was an environment problem.

In our case we have our cluster in a network that is not accessible
directly, so to deploy we need to uses Jenkins  with some slaves that have
access to that network.

During deployment in the *main* method of the class we execute
*FileSystems.**setDefaultPipelineOptions(_**options);* which trigger
the *HadoopFileSystemOptionsRegistrar
*via the ServiceLoader mechanism and this access the environment
variable *HADOOP_CONF_DIR
*in order to correctly register the Filesystem.

SO, its very important that the machine you are using for deployment have
that Environment variable set as well (not only the worker where the
pipeline will run).

In our case the variable was set on the .bashrc of the user used for
deployment, but here is the catch.

We were using "sudo -u DEPLOYMENT_USER *-s* /var/lib/flink/bin/flink run -d
.", but the flag *-s *do not execute the user .bashrc
(.bash_profile), hence we have failures at runtime. The fix was just
replacing *-s *flag with *-i *to make sure the environment variable is
present when the command to run works.

Thanks


On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia 
wrote:

> Hi Tim,
>
> I am using FileIO directly with the AvroIO.sink(...), however having
> experienced BEAM-2277 with the SparkRunner few months ago, i got the
> feeling this is something different (maybe some dependency
> mismatch/missing).
>
> Thanks
>
> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson 
> wrote:
>
>> Hi Juan
>>
>> This sounds reminiscent of
>> https://issues.apache.org/jira/browse/BEAM-2277 which we believed fixed
>> in 2.7.0.
>> What IO are you using to write your files and can you paste a snippet of
>> your code please?
>>
>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
>> workaround too):
>>
>>  transform.apply("Write",
>> AvroIO.writeGenericRecords(schema)
>> .to(FileSystems.matchNewResource(options.getTarget(),true))
>> // BEAM-2277 workaround
>> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
>>  true)));
>>
>>
>> Thanks
>> Tim
>>
>>
>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia 
>> wrote:
>>
>>> Hi Folks,
>>>
>>> I have a strange situation while running beam 2.7.0 with the
>>> FlinkRunner, my setup consist of a HA Flink cluster (1.5.4) writing to HDFS
>>> its checkpoint. Flink is able to correctly writes its checkpoint /
>>> savepoint to HDFS without any problems.
>>>
>>> However, my pipeline has to write to HDFS as well, but fails with
>>> "Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme hdfs"
>>> (stacktrace at the bottom)
>>>
>>> In the host where the pipeline is running:
>>> 1. The environment variable HADOOP_CONF_DIR is set.
>>> 2. During my pipeline construction i am explicitly calling
>>> FileSystems.setDefaultPipelineOptions(_options); to trigger the
>>> ServiceLoader to find all options registrar from the classpath
>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class
>>> FileSystems in my main method using reflection i am able to see that at
>>> launch time it contains:
>>>{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
>>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>>
>>> Any idea what i am doing wrong with the HDFS integration?
>>>
>>> {snippet}
>>>
>>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>> Field f =
>>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>> f.setAccessible(true);
>>> AtomicReference>
>>> value = (AtomicReference>) f.get(null);
>>>
>>> System.out.println("===");
>>> System.out.println(value);
>>> {snippet}
>>>
>>> {stacktrace}
>>> Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme hdfs
>>> at
>>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>> at
>>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>> at
>>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>> at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>> at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>> at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>> at
>>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>> at
>>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>>
>>> {stacktrace}
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>
> --
>
> JC
>
>

-- 

JC


Re: No filesystem found for scheme hdfs - with the FlinkRunner

2018-10-26 Thread Juan Carlos Garcia
Hi Tim,

I am using FileIO directly with the AvroIO.sink(...), however having
experienced BEAM-2277 with the SparkRunner few months ago, i got the
feeling this is something different (maybe some dependency
mismatch/missing).

Thanks

On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson 
wrote:

> Hi Juan
>
> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277
> which we believed fixed in 2.7.0.
> What IO are you using to write your files and can you paste a snippet of
> your code please?
>
> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
> workaround too):
>
>  transform.apply("Write",
> AvroIO.writeGenericRecords(schema)
> .to(FileSystems.matchNewResource(options.getTarget(),true))
> // BEAM-2277 workaround
> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", 
> true)));
>
>
> Thanks
> Tim
>
>
> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia 
> wrote:
>
>> Hi Folks,
>>
>> I have a strange situation while running beam 2.7.0 with the FlinkRunner,
>> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
>> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
>> HDFS without any problems.
>>
>> However, my pipeline has to write to HDFS as well, but fails with "Caused
>> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
>> (stacktrace at the bottom)
>>
>> In the host where the pipeline is running:
>> 1. The environment variable HADOOP_CONF_DIR is set.
>> 2. During my pipeline construction i am explicitly calling
>> FileSystems.setDefaultPipelineOptions(_options); to trigger the
>> ServiceLoader to find all options registrar from the classpath
>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class
>> FileSystems in my main method using reflection i am able to see that at
>> launch time it contains:
>>{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>
>> Any idea what i am doing wrong with the HDFS integration?
>>
>> {snippet}
>>
>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>> Field f =
>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>> f.setAccessible(true);
>> AtomicReference>
>> value = (AtomicReference>) f.get(null);
>>
>> System.out.println("===");
>> System.out.println(value);
>> {snippet}
>>
>> {stacktrace}
>> Caused by: java.lang.IllegalArgumentException: No filesystem found for
>> scheme hdfs
>> at
>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>> at
>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>> at
>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>> at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>> at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>> at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>> at
>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>> at
>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>
>> {stacktrace}
>>
>> --
>>
>> JC
>>
>>
>>
>> --
>>
>> JC
>>
>>

-- 

JC


No filesystem found for scheme hdfs - with the FlinkRunner

2018-10-26 Thread Juan Carlos Garcia
Hi Folks,

I have a strange situation while running beam 2.7.0 with the FlinkRunner,
my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
HDFS without any problems.

However, my pipeline has to write to HDFS as well, but fails with "Caused
by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
(stacktrace at the bottom)

In the host where the pipeline is running:
1. The environment variable HADOOP_CONF_DIR is set.
2. During my pipeline construction i am explicitly calling
FileSystems.setDefaultPipelineOptions(_options); to trigger the
ServiceLoader to find all options registrar from the classpath
3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems
in my main method using reflection i am able to see that at launch time it
contains:
   {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}

Any idea what i am doing wrong with the HDFS integration?

{snippet}

FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
Field f =
FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
f.setAccessible(true);
AtomicReference> value
= (AtomicReference>) f.get(null);

System.out.println("===");
System.out.println(value);
{snippet}

{stacktrace}
Caused by: java.lang.IllegalArgumentException: No filesystem found for
scheme hdfs
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
at
org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at
org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
at
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)

{stacktrace}

-- 

JC



-- 

JC


Re: KafkaIO - Deadletter output

2018-10-24 Thread Juan Carlos Garcia
As Raghu said,

Just apply a regular ParDo and return a PCollectionTuple afert that you can
extract your Success Records (TupleTag) and your DeadLetter
records(TupleTag) and do whatever you want with them.


Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize explicitly in
> a ParDo, which gives complete control on how to handle record errors. This
> is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be convenient for
> users in many such scenarios. This is simpler than each source supporting
> it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>> something that can easily be supported. We might be able to support similar
>> features when we have Kafka on top of Splittable DoFn though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>>
>>> This is a great question. I've added the dev list to be sure it gets
>>> noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias 
>>> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that uses a
 KafkaIO
 connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
 takes
 only a SerializableFunction and not a ParDo, how would I be able to
 produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(
 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))
 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
 "earliest"))
 .updateConsumerProperties(ImmutableMap.of("group.id",
 "beam-consumers"))
 .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
 "true"))
 .withTimestampPolicyFactory(
 TimestampPolicyFactory.withTimestampFn(
 new MessageTimestampExtractor(inputMessagesConfig)))
 .withReadCommitted()
 .commitOffsetsInFinalize())


 and I like to get deadletter outputs when my timestamp extraction fails.

 Best,
 Tobi




Error with FlinkRunner: No translator known for org.apache.beam.sdk.io.Read$Unbounded

2018-10-16 Thread Juan Carlos Garcia
Hi Folks,

I am switching some pipelines from SparkRunner to the FlinkRunner on beam
2.7

I started with a very simple pipeline which just reads from multiple kafka
sources, flatten those and then apply a regular DoFn.

On my first try of the pipeline (from command line using a fat jar) like:
```
  java -cp my-fat.jar FQCN --configuration=foo --runner=FlinkRunner
```

complains with:
```
Exception in thread "main" java.lang.IllegalStateException: No translator
known for org.apache.beam.sdk.io.Read$Unbounded
at
org.apache.beam.runners.core.construction.PTransformTranslation.urnForTransform(PTransformTranslation.java:164)
at
org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:100)
...
...
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
```

On my build.gradle i just replaced my spark-runner with this:
``
runtime "org.apache.beam:beam-runners-flink_2.11:2.7.0"
``

Am i missing something?
-- 

JC


Re: Spark storageLevel not taking effect

2018-10-12 Thread Juan Carlos Garcia
Hi Mike,

>From the documentation on
https://beam.apache.org/documentation/runners/spark/#pipeline-options-for-the-spark-runner

storageLevel The StorageLevel to use when caching RDDs in batch pipelines.
The Spark Runner automatically caches RDDs that are evaluated repeatedly.
This is a batch-only property as streaming pipelines in Beam are stateful,
which requires Spark DStream's StorageLevel to be MEMORY_ONLY. MEMORY_ONLY
So i think you are out of luck here.


On Thu, Oct 11, 2018 at 10:05 PM Mike Kaplinskiy 
wrote:

> Hey folks,
>
> Admittedly I may be a bit on the bleeding edge here, but I'm attempting to
> run a Beam pipeline on Spark which is running on top of Kubernetes.
> Specifically Beam 2.6.0 with Spark 2.4.0-rc2 running in client mode with a
> Kubernetes (1.11) driver. It's actually pretty cool - from a Kubernetes
> perspective, I start a pod which starts a ton of workers to do the parallel
> stuff and then cleans up after itself.
>
> One thing I can't seem to get working is setting the storage level for
> Spark RDDs via Beam. Specifically passing --storageLevel=MEMORY_AND_DISK
> seems to not work - the rdd still shows up as "Memory Deserialized 1x
> Replicated" in the Spark UI. I would expect it to be something closer to
> "Disk Memory Deserialized 1x Replicated." It *seems* to be serialized only
> in the sense that less memory is used (I assume it gets encoded).
>
> I even tried hardcoding storageLevel in BoundedDataset.java (based on the
> line number in the DAG viz). Unfortunately it still shows up as memory-only.
>
> Am I missing something that would let me spill data to disk?
>
> For reference, here's my exact command line:
> /opt/spark/bin/spark-submit
> --master 'k8s://https://kubernetes:443'
> --deploy-mode client
> --name $(MY_POD_NAME)
> --conf spark.executor.instances=20
> --conf spark.driver.host=$(MY_POD_IP)
> --conf spark.driver.port=7077
> --conf spark.kubernetes.container.image=$(MY_IMAGE)
> --conf spark.kubernetes.driver.pod.name=$(MY_POD_NAME)
> --conf spark.kubernetes.executor.podNamePrefix=$(MY_POD_NAME)
> --conf spark.executor.memory=5500m
> --conf spark.executor.memoryOverhead=1300m
> --conf spark.memory.fraction=0.45
> --conf spark.executor.cores=3
> --conf spark.kubernetes.executor.limit.cores=3
> --conf spark.default.parallelism=60
> --conf spark.kubernetes.allocation.batch.size=20
> --conf spark.kubernetes.driver.label.app=beam-datomic-smoketest
> --conf spark.kubernetes.node.selector.node.ladderlife.com/group=etl
> --conf
> spark.kubernetes.executor.annotation.iam.amazonaws.com/role=etl-role
> --conf spark.kubernetes.executor.secrets.google-cloud=/google-cloud-secrets
> --conf spark.kubernetes.executor.secretKeyRef.SENTRY_DSN=sentry-secrets:dsn
> --conf spark.executorEnv.STATSD_HOST=169.254.168.253
> --class ladder.my_beam_job
> local:///srv/beam_job.jar
> --runner=SparkRunner
> --storageLevel=MEMORY_AND_DISK
>
> Thanks,
> Mike.
>
> Ladder . The smart, modern way to insure your life.
>


-- 

JC


Re: Apache Beam UI job creator

2018-10-08 Thread Juan Carlos Garcia
I think (maybe i am wrong) but there is already a project within the Google
products that aim for this and still is on beta /alpha ... (i can
recall the name)

And personally i would definitely like to see something like this.

Karan Kumar  schrieb am Mo., 8. Okt. 2018, 11:24:

> Hello
>
> We want to expose a GUI for our engineers/business analysts to create real
> time pipelines using drag and drop constructs. Projects such as
> https://github.com/TouK/nussknacker for flink and
> https://github.com/hortonworks/streamline for storm match our
> requirements.
>
> We wanted to understand if a UI job creator is on the road map for the
> beam community or
> if there are any projects which have taken a stab at solving this problem.
>
> --
> Thanks
> Karan
>


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-10-04 Thread Juan Carlos Garcia
Hi Jean,

Thank you!

On Thu, Oct 4, 2018 at 7:54 AM Jean-Baptiste Onofré  wrote:

> Hi Juan
>
> I'm on it.
>
> Regards
> JB
> Le 4 oct. 2018, à 07:19, Juan Carlos Garcia  a écrit:
>>
>> Bump,
>>
>> can someone from the core-dev provide a feedback about:
>>https://issues.apache.org/jira/browse/BEAM-4597
>>
>> Thanks
>>
>> On Mon, Jul 30, 2018 at 3:15 PM Juan Carlos Garcia 
>> wrote:
>>
>>> Hi Jean,
>>>
>>> Thanks for taking a look.
>>>
>>>
>>> On Mon, Jul 30, 2018 at 2:49 PM, Jean-Baptiste Onofré 
>>> wrote:
>>>
>>>> Hi Juan,
>>>>
>>>> it seems that has been introduce by the metrics layer in the core runner
>>>> API.
>>>>
>>>> Let me check.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 30/07/2018 14:47, Juan Carlos Garcia wrote:
>>>> > Bump!
>>>> >
>>>> > Does any of the core-dev roam around here?
>>>> >
>>>> > Can someone provide a feedback about BEAM-4597
>>>> > <https://issues.apache.org/jira/browse/BEAM-4597>
>>>> >
>>>> > Thanks and regards,
>>>> >
>>>> > On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia <
>>>> jcgarc...@gmail.com
>>>> > <mailto:jcgarc...@gmail.com>> wrote:
>>>> >
>>>> > Folks,
>>>> >
>>>> > Its someone using the SparkRunner out there with the Spark
>>>> > KryoSerializer ?
>>>> >
>>>> > We are being force to use the not so efficient 'JavaSerializer'
>>>> with
>>>> > Spark because we face the following exception:
>>>> >
>>>> > 
>>>> > Exception in thread "main" java.lang.RuntimeException:
>>>> > org.apache.spark.SparkException: Job aborted due to stage failure:
>>>> > Exception while getting task result:
>>>> > com.esotericsoftware.kryo.KryoException: Unable to find class:
>>>> >
>>>>  
>>>> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
>>>> > Serialization trace:
>>>> > factory (org.apache.beam.runners.core.metrics.MetricsMap)
>>>> > counters
>>>> (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
>>>> > metricsContainers
>>>> > (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
>>>> > metricsContainers
>>>> > (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
>>>> > at
>>>> >
>>>>  
>>>> org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)
>>>> > 
>>>> >
>>>> > I created a jira ticket and attached a project example on it,
>>>> > https://issues.apache.org/jira/browse/BEAM-4597
>>>> > <https://issues.apache.org/jira/browse/BEAM-4597>
>>>> >
>>>> > Any feedback is appreciated.
>>>> >
>>>> > --
>>>> >
>>>> > JC
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> >
>>>> > JC
>>>> >
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>

-- 

JC


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-10-03 Thread Juan Carlos Garcia
Bump,

can someone from the core-dev provide a feedback about:
   https://issues.apache.org/jira/browse/BEAM-4597

Thanks

On Mon, Jul 30, 2018 at 3:15 PM Juan Carlos Garcia 
wrote:

> Hi Jean,
>
> Thanks for taking a look.
>
>
> On Mon, Jul 30, 2018 at 2:49 PM, Jean-Baptiste Onofré 
> wrote:
>
>> Hi Juan,
>>
>> it seems that has been introduce by the metrics layer in the core runner
>> API.
>>
>> Let me check.
>>
>> Regards
>> JB
>>
>> On 30/07/2018 14:47, Juan Carlos Garcia wrote:
>> > Bump!
>> >
>> > Does any of the core-dev roam around here?
>> >
>> > Can someone provide a feedback about BEAM-4597
>> > <https://issues.apache.org/jira/browse/BEAM-4597>
>> >
>> > Thanks and regards,
>> >
>> > On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia <
>> jcgarc...@gmail.com
>> > <mailto:jcgarc...@gmail.com>> wrote:
>> >
>> > Folks,
>> >
>> > Its someone using the SparkRunner out there with the Spark
>> > KryoSerializer ?
>> >
>> > We are being force to use the not so efficient 'JavaSerializer' with
>> > Spark because we face the following exception:
>> >
>> > 
>> > Exception in thread "main" java.lang.RuntimeException:
>> > org.apache.spark.SparkException: Job aborted due to stage failure:
>> > Exception while getting task result:
>> > com.esotericsoftware.kryo.KryoException: Unable to find class:
>> >
>>  
>> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
>> > Serialization trace:
>> > factory (org.apache.beam.runners.core.metrics.MetricsMap)
>> > counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
>> > metricsContainers
>> > (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
>> > metricsContainers
>> > (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>> > at
>> >
>>  
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
>> > at
>> >
>>  org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)
>> > 
>> >
>> > I created a jira ticket and attached a project example on it,
>> > https://issues.apache.org/jira/browse/BEAM-4597
>> > <https://issues.apache.org/jira/browse/BEAM-4597>
>> >
>> > Any feedback is appreciated.
>> >
>> > --
>> >
>> > JC
>> >
>> >
>> >
>> >
>> > --
>> >
>> > JC
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>
>
> --
>
> JC
>
>

-- 

JC


Re: Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

2018-10-02 Thread Juan Carlos Garcia
Thanks for you inputs on this matter, by data loss i meant data that is on
Kafka but was not written in HDFS due to restarting the pipeline
(SparkRunner) or because it failed due to connectivity and was kill by Yarn
and when we restarted the pipeline those records were skipped.

I am curious about what you mentioned (30min of 1hr window would be lost),
just a noob question, why?

> read from any source and write on any supported sink.

I have no doubt about it.

With a multi stage pipeline (where we sort, manipulate, group the data) my
purpose is to reliable sink data to HDFS, regardless of any interruption on
the pipeline, like other ingestion library do in "batch" (Camus / gobbling
from LinkedIn)

During this period we are using HDFS as sink with Spark with a window to
avoid hitting HDFS badly, do you recommend me to use Flink instead (at
least for this requirement)?

Thanks in advanced

Raghu Angadi  schrieb am Di., 2. Okt. 2018, 08:25:

> This is mostly a question about SparkRunner and to certain extent FileIO.
> You might want to elaborate a bit more what you mean by data loss. In most
> cases, restarting a pipeline from scratch loses checkpointed state from
> previous job (e.g. first 30 minutes of a 1 hour window would be lost),
> unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
> starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).
>
> Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
> corresponding messages are processed/checkpointed. In the case of Spark and
> Dataflow, it would be after the messages pass through the first stage of
> the pipeline.
>
> > Please advice if this usecase (data ingestion to hdfs) is something beam
> could achieve without lossing data from KafkaIO.
> Yes, reading from any supported source and writing to any supported sink
> is supported. Otherwise, it would be a bug.
>
> On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia 
> wrote:
>
>> Hi folks we are running a pipeline which as the subject says the we are
>> having issues with data lost.
>>
>> Using KafkaIO (2.0.4 due to the version of our brokers) with
>> commitOnFinalize, we would like to understand how this finalize work
>> together with a FileIO.
>>
>> I studied the KafkaIO and saw that the records are committed to kafka
>> inside the consumerPollLoop method only when a checkpoint is produced, but
>> when is this checkpoint produced?, how does it cope with windowed data and
>> a FileIO to produces files?
>>
>> When running with spark our batchInterval is 30secs, and the pipeline
>> have a fixed-window of 1hr for FileIO to write to HDFS and we are
>> constantly restarting the pipeline (1 or 3 times a day, or yarn reach it
>> maximum restart attempt and then it kill it completely due to networks
>> interruption ), however we have detected we have missing data on HDFS.
>>
>> Initially we were running without specifying a checkpoint directory
>> (SparkRunner) , and we found that on each deployment a random directory was
>> generated under /tmp, recently we started to uses a fixed directory for
>> checkpoint (via - - checkpointDir on the spark runner), but still we have
>> doubts that this will completely solve our data lost problems when
>> restarting the pipeline multiple times a day (or is it our assumption
>> incorrect? ).
>>
>> Please advice if this usecase (data ingestion to hdfs) is something beam
>> could achieve without lossing data from KafkaIO.
>>
>> Thanks
>> JC
>>
>>
>>
>>


Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

2018-10-01 Thread Juan Carlos Garcia
Hi folks we are running a pipeline which as the subject says the we are
having issues with data lost.

Using KafkaIO (2.0.4 due to the version of our brokers) with
commitOnFinalize, we would like to understand how this finalize work
together with a FileIO.

I studied the KafkaIO and saw that the records are committed to kafka
inside the consumerPollLoop method only when a checkpoint is produced, but
when is this checkpoint produced?, how does it cope with windowed data and
a FileIO to produces files?

When running with spark our batchInterval is 30secs, and the pipeline have
a fixed-window of 1hr for FileIO to write to HDFS and we are constantly
restarting the pipeline (1 or 3 times a day, or yarn reach it maximum
restart attempt and then it kill it completely due to networks interruption
), however we have detected we have missing data on HDFS.

Initially we were running without specifying a checkpoint directory
(SparkRunner) , and we found that on each deployment a random directory was
generated under /tmp, recently we started to uses a fixed directory for
checkpoint (via - - checkpointDir on the spark runner), but still we have
doubts that this will completely solve our data lost problems when
restarting the pipeline multiple times a day (or is it our assumption
incorrect? ).

Please advice if this usecase (data ingestion to hdfs) is something beam
could achieve without lossing data from KafkaIO.

Thanks
JC


Re: Modular IO presentation at Apachecon

2018-09-26 Thread Juan Carlos Garcia
Im really looking forward for a way to monitor the results(like which batch
of elements were written per destination if possible  ) of an IO Module
in a consistent way.

Nice presentation.

Thomas Weise  schrieb am Do., 27. Sep. 2018, 06:35:

> Thanks for sharing. I'm looking forward to see the recording of the talk
> (hopefully!).
>
> This will be very helpful for Beam users. IO still is typically the
> unexpectedly hard and time consuming part of authoring pipelines.
>
>
> On Wed, Sep 26, 2018 at 2:48 PM Alan Myrvold  wrote:
>
>> Thanks for the slides.
>> Really enjoyed the talk in person, especially the concept that IO is a
>> transformation, and a source or sink are not special and the splittable
>> DoFn explanation.
>>
>> On Wed, Sep 26, 2018 at 2:17 PM Ismaël Mejía  wrote:
>>
>>> Hello, today Eugene and me did a talk about about modular APIs for IO
>>> at ApacheCon. This talk introduces some common patterns that we have
>>> found while creating IO connectors and also presents recent ideas like
>>> dynamic destinations, sequential writes among others using FileIO as a
>>> use case.
>>>
>>> In case you guys want to take a look, here is a copy of the slides, we
>>> will probably add this to the IO authoring documentation too.
>>>
>>> https://s.apache.org/beam-modular-io-talk
>>>
>>


Re: AvroIO - failure using direct runner with java.nio.file.FileAlreadyExistsException when moving from temp to destination

2018-09-26 Thread Juan Carlos Garcia
Hi Tim, thanks for the explanation and it makes more senses now as why it
was failing. :)

I opened a Jira ticket https://issues.apache.org/jira/browse/BEAM-5511 for
this matter.

Thanks,
JC


On Wed, Sep 26, 2018 at 1:40 PM Tim Robertson 
wrote:

> Hi Juan
>
> Well done for diagnosing your issue and thank you for taking the time to
> report it here.
>
> I'm not the author of this section but I've taken a quick look at the code
> and in line comments and have some observations which I think might help
> explain it.
>
> I notice it writes into temporary files and uses a HashMap Writer> for maintaining a pool of writers for each destination. I presume
> that you are receiving a new instance of the DestinationT object on each
> call and therefore the HashMap will be treating these as separate entries -
> a new writer is created for each entry in the hashMap..  The method
> responsible for providing the DestinationT is the following from the
> FileBasedSink which does document the expectation:
>
> /**
>  * Returns an object that represents at a high level the destination being 
> written to. May not
>  * return null. A destination must have deterministic hash and equality 
> methods defined.
>  */
> public abstract DestinationT getDestination(UserT element);
>
>
> Beyond that I notice that it also relies on using the a hashCode from the
> serialised object (i.e. after running through the coder) which you note
> too. The inline doc explains the reasoning for that which is because
> hashCode is not guaranteed to be stable across machines. When elements are
> processed on different machines we need deterministic behaviour to direct
> to the correct target shard. To do that the code opts to use a murmur3_32
> algorithm which is safe across machines (Today I learnt!) and it operates
> on the encoded bytes for the object which are to be deterministic.
>
> I agree that we should improve the documentation and state that hashCode
> and equals needs to be implemented when user defined objects are used for
> the dynamic destination. Would you mind opening a Jira for that please?
>
> I hope this helps a little, and thanks again
> Tim
>
>
>
>
>
>
>
>
>
>
>
> On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia 
> wrote:
>
>> Hi Guys, after days of bumping my head against the monitor i found why it
>> was not working.
>>
>> One key element when using *DynamicAvroDestinations *that is not
>> described in the documentation is that, if you are using a regular POJO as
>> *DestinationT* like i am (and not Long/String/Integer as the example) :
>>
>> {code}
>>DynamicAvroDestinations> GenericRecord>
>> {code}
>>
>> Its very important to pay attention to equals / hashCode
>> implementations,  which should aligned with your
>> sharding/grouping/partition structure. Not doing so will give you the
>> result i described earlier (1 file (or shard) with 1 record only, or
>> sometime just an exception).
>>
>> While i still don't understand why it depends on equals / hashCode, as i
>> checked the class on:
>>   *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*
>>
>> The hashing depends on the Coder itself (method:  int
>> hashDestination(DestinationT destination, Coder
>> destinationCoder)).
>>
>> Maybe a core member could explain the reason of it, or its an unexpected
>> behavior and there is a bug somewhere else.
>>
>> In my case below you can find my POJO Destination along with the
>> corresponding Codec implementation, which works correctly as long as the
>> equals / hashCode are implemented:
>>
>> {code}
>> static class GenericRecordDynamicDestination {
>> private String logicalType;
>> private final int year;
>> private final int month;
>> private final int day;
>>
>> public GenericRecordDynamicDestination(final String _logicalType,
>> final int _year, final int _month, final int _day) {
>> logicalType = _logicalType;
>> year = _year;
>> month = _month;
>> day = _day;
>> }
>>
>> public String getLogicalType() {
>> return logicalType;
>> }
>>
>> public void setLogicalType(final String _logicalType) {
>> logicalType = _logicalType;
>> }
>>
>> public int getYear() {
>> return year;
>> }
>>
>> public int getMonth() {
>> return month;
>> }
>>
>> public int getDay() {
&

Re: AvroIO - failure using direct runner with java.nio.file.FileAlreadyExistsException when moving from temp to destination

2018-09-26 Thread Juan Carlos Garcia
Hi Guys, after days of bumping my head against the monitor i found why it
was not working.

One key element when using *DynamicAvroDestinations *that is not described
in the documentation is that, if you are using a regular POJO as
*DestinationT* like i am (and not Long/String/Integer as the example) :

{code}
   DynamicAvroDestinations
{code}

Its very important to pay attention to equals / hashCode implementations,
which should aligned with your sharding/grouping/partition structure. Not
doing so will give you the result i described earlier (1 file (or shard)
with 1 record only, or sometime just an exception).

While i still don't understand why it depends on equals / hashCode, as i
checked the class on:
  *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*

The hashing depends on the Coder itself (method:  int
hashDestination(DestinationT destination, Coder
destinationCoder)).

Maybe a core member could explain the reason of it, or its an unexpected
behavior and there is a bug somewhere else.

In my case below you can find my POJO Destination along with the
corresponding Codec implementation, which works correctly as long as the
equals / hashCode are implemented:

{code}
static class GenericRecordDynamicDestination {
private String logicalType;
private final int year;
private final int month;
private final int day;

public GenericRecordDynamicDestination(final String _logicalType,
final int _year, final int _month, final int _day) {
logicalType = _logicalType;
year = _year;
month = _month;
day = _day;
}

public String getLogicalType() {
return logicalType;
}

public void setLogicalType(final String _logicalType) {
logicalType = _logicalType;
}

public int getYear() {
return year;
}

public int getMonth() {
return month;
}

public int getDay() {
return day;
}

@Override
public boolean equals(final Object _o) {
if (this == _o) return true;
if (_o == null || getClass() != _o.getClass()) return false;

final GenericRecordDynamicDestination that =
(GenericRecordDynamicDestination) _o;

if (year != that.year) return false;
if (month != that.month) return false;
if (day != that.day) return false;
return logicalType.equals(that.logicalType);
}

@Override
public int hashCode() {
int result = logicalType.hashCode();
result = 31 * result + year;
result = 31 * result + month;
result = 31 * result + day;
return result;
}
}

static class GenericRecordDestinationCoder extends
CustomCoder {
@Override
public void encode(final GenericRecordDynamicDestination value,
final OutputStream outStream) throws IOException {
final ObjectOutputStream out = new
ObjectOutputStream(outStream);
out.writeUTF(value.getLogicalType());
out.writeInt(value.getYear());
out.writeInt(value.getMonth());
out.writeInt(value.getDay());
out.flush();
}

@Override
public GenericRecordDynamicDestination decode(final InputStream
inStream) throws IOException {
final ObjectInputStream in = new ObjectInputStream(inStream);
String logicalType = in.readUTF();
int year = in.readInt();
int month = in.readInt();
int day = in.readInt();
return new GenericRecordDynamicDestination(logicalType, year,
month, day);
}

@Override
public void verifyDeterministic() throws NonDeterministicException {
//
}
}
{code}


On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia 
wrote:

> I am writing a pipeline that will read from kafka and convert the data
> into Avro files with a fixed windows of 10min.
>
> I am using a *DynamicAvroDestinations *in order to build a dynamic path
> and select the corresponding schema based on the incoming data.
>
> 1.)
> While testing on my machine (With DirectRunner) i am using a File
> (BoundedSource) containing hundreds of this messages and feeding my
> pipeline with this, however i have found that sometimes, the pipeline fail
> with :
> {code}Caused by: java.nio.file.FileAlreadyExistsException:
> /tmp/test-tracking/2018/09/tracking-day-11-w--922337203621-0_4.avro
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
> at sun.nio.fs.Un

AvroIO - failure using direct runner with java.nio.file.FileAlreadyExistsException when moving from temp to destination

2018-09-20 Thread Juan Carlos Garcia
I am writing a pipeline that will read from kafka and convert the data into
Avro files with a fixed windows of 10min.

I am using a *DynamicAvroDestinations *in order to build a dynamic path and
select the corresponding schema based on the incoming data.

1.)
While testing on my machine (With DirectRunner) i am using a File
(BoundedSource) containing hundreds of this messages and feeding my
pipeline with this, however i have found that sometimes, the pipeline fail
with :
{code}Caused by: java.nio.file.FileAlreadyExistsException:
/tmp/test-tracking/2018/09/tracking-day-11-w--922337203621-0_4.avro
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
at java.nio.file.Files.copy(Files.java:1274)
at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
at
org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
at
org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
{code}

2.)Trying to group all related messages into a single AVRO file
(MMDD-HHMM),  when the pipeline doesn't fail using the DirectRunner,
the generate AVRO files only contains 1 record per file.

if i generate a pseudo-random name in the *getFilenamePolicy *everything
works, but i ends up with hundreds of files, each of them containing 1
record.

My PipelineSteps are:
-- ReadFromSource
--ApplyWindows
(FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()
--Create a KV from KafkaRecord  (The Key is the date as MMDDHHMM)
--GroupbyKey (this return a KV>)
--Emit Each records of the iterable as (KV)
--AvroIO ( AvroIO.>writeCustomTypeToGenericRecords() )

Also please find below is the code for the getFileNamePolicy:
{code}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(final
GenericRecordDestination destination) {
return new FileBasedSink.FilenamePolicy() {
@Override
public ResourceId windowedFilename(final int shardNumber, final int
numShards, final BoundedWindow window, final PaneInfo paneInfo, final
FileBasedSink.OutputFileHints outputFileHints) {

StringBuilder path = new StringBuilder(filesLocation);
if (!filesLocation.endsWith("/")) {
path.append("/");
}

path.append(DateTimeUtil.format(destination.getTimestamp(),
"/MM"))
.append("/")
.append(filesPrefix)

.append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
"dd"))
.append("-w-").append(window.maxTimestamp().getMillis())
.append("-").append(shardNumber)
.append("_").append(numShards)
.append(AVRO_SUFFIX);

return FileSystems.matchNewResource(path.toString(), false);
}

@Nullable
@Override
public ResourceId unwindowedFilename(final int shardNumber, final
int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
throw new PipelineException("unwindowedFilename is not
supported");
}
};
}
{code}

Thanks
-- 

JC


KafkaIO needs access to the brokers even before the pipeline reach the worker

2018-09-19 Thread Juan Carlos Garcia
Hi folks, we have a pipeline for Dataflow and our Google cloud environment
has a private network (where the pipeline should run, this network
interconnect via an IP-sec to an AWS environment where the Kafka brokers
are running).

We have found that in order to be able to submit the pipeline we have to do
it from a machine that has access to the Kafka brokers.

Is there a way to avoid that?

Why KafkaIO cannot defer the communication to the brokers after the
pipeline its on the worker node?

Thanks and regards,
JC


Re: Problem with KafkaIO

2018-09-19 Thread Juan Carlos Garcia
Sorry I hit the send button to fast... The error occurs in the worker.

Juan Carlos Garcia  schrieb am Mi., 19. Sep. 2018,
20:22:

> Sorry for hijacking the thread, we are running Spark on top of Yarn, yarn
> retries multiple times until it reachs it max attempt and then gives up.
>
> Raghu Angadi  schrieb am Mi., 19. Sep. 2018, 18:58:
>
>> On Wed, Sep 19, 2018 at 7:26 AM Juan Carlos Garcia 
>> wrote:
>>
>>> Don't know if its related, but we have seen our pipeline dying (using
>>> SparkRunner) when there is problem with Kafka  (network interruptions),
>>> errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
>>> expired while fetching topic metadata
>>>
>>> Maybe this will fix it as well, thanks Raghu for the hint about
>>> *withConsumerFactoryFn.*
>>>
>>
>> Wouldn't that be retried by the SparkRunner if it happens on the worker?
>> or does it happen while launching the pipeline on the client?
>>
>>
>>
>>>
>>>
>>>
>>> On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
>>>> Hi Raghu, thank you.
>>>>
>>>> I'm not sure though what to pass as an argument:
>>>>
>>>> KafkaIO.read[String,String]()
>>>>   .withBootstrapServers(server)
>>>>   .withTopic(topic)
>>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>>   .withConsumerFactoryFn(new 
>>>> KafkaExecutor.ConsumerFactoryFn())
>>>>   .updateConsumerProperties(properties)
>>>>   .commitOffsetsInFinalize()
>>>>   .withoutMetadata()
>>>>
>>>>
>>>> Regards
>>>>
>>>>
>>>> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
>>>> escreveu:
>>>>
>>>>> Hi Eduardo,
>>>>>
>>>>> There another work around you can try without having to wait for 2.7.0
>>>>> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
>>>>> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>>>>
>>>>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>>>>> private static class ConsumerFactoryFn
>>>>> implements SerializableFunction,
>>>>> Consumer> {
>>>>>   @Override
>>>>> public Consumer apply(Map config) {
>>>>>   return new KafkaConsumer(config) {
>>>>>   @Override
>>>>>   public ConsumerRecords poll(long timeout) {
>>>>>   // work around for BEAM-5375
>>>>>   while (true) {
>>>>>   try {
>>>>> return super.poll(timeout);
>>>>>  } catch (Exception e) {
>>>>> // LOG & sleep for sec
>>>>> }
>>>>>   }
>>>>>}
>>>>> }
>>>>> }
>>>>>
>>>>> [1]:
>>>>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>>>>> [2]:
>>>>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>>>>
>>>>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>
>>>>>> Hi Raghu, we're not sure how long the network was down. According to
>>>>>> the logs no longer than one minute. A 30 second shutdown would work for 
>>>>>> the
>>>>>> tests.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>>>>>> escreveu:
>>>>>>
>>>>>>> Thanks. I could repro myself as well. How long was the network down?
>>>>>>>
>>>>>>> Trying to get the fix into 2.7 RC2.
>>>>>>>
>>>>>>> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
>>>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>>>
>>>>>>>> Just to

Re: Problem with KafkaIO

2018-09-19 Thread Juan Carlos Garcia
Sorry for hijacking the thread, we are running Spark on top of Yarn, yarn
retries multiple times until it reachs it max attempt and then gives up.

Raghu Angadi  schrieb am Mi., 19. Sep. 2018, 18:58:

> On Wed, Sep 19, 2018 at 7:26 AM Juan Carlos Garcia 
> wrote:
>
>> Don't know if its related, but we have seen our pipeline dying (using
>> SparkRunner) when there is problem with Kafka  (network interruptions),
>> errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
>> expired while fetching topic metadata
>>
>> Maybe this will fix it as well, thanks Raghu for the hint about
>> *withConsumerFactoryFn.*
>>
>
> Wouldn't that be retried by the SparkRunner if it happens on the worker?
> or does it happen while launching the pipeline on the client?
>
>
>
>>
>>
>>
>> On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, thank you.
>>>
>>> I'm not sure though what to pass as an argument:
>>>
>>> KafkaIO.read[String,String]()
>>>   .withBootstrapServers(server)
>>>   .withTopic(topic)
>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>   .withConsumerFactoryFn(new 
>>> KafkaExecutor.ConsumerFactoryFn())
>>>   .updateConsumerProperties(properties)
>>>   .commitOffsetsInFinalize()
>>>   .withoutMetadata()
>>>
>>>
>>> Regards
>>>
>>>
>>> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
>>> escreveu:
>>>
>>>> Hi Eduardo,
>>>>
>>>> There another work around you can try without having to wait for 2.7.0
>>>> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
>>>> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>>>
>>>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>>>> private static class ConsumerFactoryFn
>>>> implements SerializableFunction,
>>>> Consumer> {
>>>>   @Override
>>>> public Consumer apply(Map config) {
>>>>   return new KafkaConsumer(config) {
>>>>   @Override
>>>>   public ConsumerRecords poll(long timeout) {
>>>>   // work around for BEAM-5375
>>>>   while (true) {
>>>>   try {
>>>> return super.poll(timeout);
>>>>  } catch (Exception e) {
>>>> // LOG & sleep for sec
>>>> }
>>>>   }
>>>>}
>>>> }
>>>> }
>>>>
>>>> [1]:
>>>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>>>> [2]:
>>>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>>>
>>>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>
>>>>> Hi Raghu, we're not sure how long the network was down. According to
>>>>> the logs no longer than one minute. A 30 second shutdown would work for 
>>>>> the
>>>>> tests.
>>>>>
>>>>> Regards
>>>>>
>>>>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>>>>> escreveu:
>>>>>
>>>>>> Thanks. I could repro myself as well. How long was the network down?
>>>>>>
>>>>>> Trying to get the fix into 2.7 RC2.
>>>>>>
>>>>>> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
>>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>>
>>>>>>> Just to make myself clear, I'm not sure how to use the patch but if
>>>>>>> you could send us some guidance would be great.
>>>>>>>
>>>>>>> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
>>>>>>> eduardo.sold...@arquivei.com.br> escreveu:
>>>>>>>
>>>>>>>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not
>>>>>>>> sure how we'd use the patch. We're using SBT and Spotify's Scio with 
>

Re: Problem with KafkaIO

2018-09-19 Thread Juan Carlos Garcia
Don't know if its related, but we have seen our pipeline dying (using
SparkRunner) when there is problem with Kafka  (network interruptions),
errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
expired while fetching topic metadata

Maybe this will fix it as well, thanks Raghu for the hint about
*withConsumerFactoryFn.*




On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, thank you.
>
> I'm not sure though what to pass as an argument:
>
> KafkaIO.read[String,String]()
>   .withBootstrapServers(server)
>   .withTopic(topic)
>   .withKeyDeserializer(classOf[StringDeserializer])
>   .withValueDeserializer(classOf[StringDeserializer])
>   .withConsumerFactoryFn(new 
> KafkaExecutor.ConsumerFactoryFn())
>   .updateConsumerProperties(properties)
>   .commitOffsetsInFinalize()
>   .withoutMetadata()
>
>
> Regards
>
>
> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
> escreveu:
>
>> Hi Eduardo,
>>
>> There another work around you can try without having to wait for 2.7.0
>> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
>> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>
>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>> private static class ConsumerFactoryFn
>> implements SerializableFunction,
>> Consumer> {
>>   @Override
>> public Consumer apply(Map config) {
>>   return new KafkaConsumer(config) {
>>   @Override
>>   public ConsumerRecords poll(long timeout) {
>>   // work around for BEAM-5375
>>   while (true) {
>>   try {
>> return super.poll(timeout);
>>  } catch (Exception e) {
>> // LOG & sleep for sec
>> }
>>   }
>>}
>> }
>> }
>>
>> [1]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>> [2]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>
>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, we're not sure how long the network was down. According to the
>>> logs no longer than one minute. A 30 second shutdown would work for the
>>> tests.
>>>
>>> Regards
>>>
>>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>>> escreveu:
>>>
 Thanks. I could repro myself as well. How long was the network down?

 Trying to get the fix into 2.7 RC2.

 On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Just to make myself clear, I'm not sure how to use the patch but if
> you could send us some guidance would be great.
>
> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> escreveu:
>
>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not sure
>> how we'd use the patch. We're using SBT and Spotify's Scio with Scala.
>>
>> Thanks
>>
>> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi 
>> escreveu:
>>
>>> Is is feasible for you to verify the fix in your dev job? I can make
>>> a patch against Beam 2.4 branch if you like.
>>>
>>> Raghu.
>>>
>>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Hi Raghu, thank you very much for the pull request.
 We'll wait for the 2.7 Beam release.

 Regards!

 Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <
 rang...@google.com> escreveu:

> Fix: https://github.com/apache/beam/pull/6391
>
> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi 
> wrote:
>
>> Filed BEAM-5375 .
>> I will fix it later this week.
>>
>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi 
>> wrote:
>>
>>>
>>>
>>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <
>>> rang...@google.com> wrote:
>>>
 Thanks for the job id, I looked at the worker logs (following
 usual support oncall access protocol that provides temporary 
 access to
 things like logs in GCP):

 Root issue looks like consumerPollLoop() mentioned earlier
 needs to handle unchecked exception. In your case it is clear that 
 poll
 thread exited with a runtime exception. The reader does not check 
 for it
 and continues to wait for poll thread to enqueue messages. A fix 
 should
 result in an IOException for read from the source. The runners 
 will 

Re: Regression of (BEAM-2277) - IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-20 Thread Juan Carlos Garcia
Thanks for taking care.

On Mon, Aug 20, 2018 at 5:02 PM Tim Robertson 
wrote:

> Hi Juan,
>
> You are correct that BEAM-2277 seems to be recurring. I have today
> stumbled upon that myself in my own pipeline (not word count).
> I have just posted a workaround at the bottom of the issue, and will
> reopen the issue.
>
> Thank you for reminding us on this,
> Tim
>
>
> On Mon, Aug 20, 2018 at 4:44 PM Juan Carlos Garcia 
> wrote:
>
>> BUMP!!!
>>
>> There are people reporting the problem for BEAM 2.6.0
>> <https://issues.apache.org/jira/browse/BEAM-2277?focusedCommentId=16585979=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16585979>,
>> any CORE dev out there?
>>
>> :)
>>
>>
>>
>>
>>
>> On Mon, Jul 30, 2018 at 3:25 PM Juan Carlos Garcia 
>> wrote:
>>
>>> Hi Folks,
>>>
>>> I experienced the issued described in (BEAM-2277
>>> <https://issues.apache.org/jira/browse/BEAM-2277>), which shows it was
>>> fixed by v2.0.0
>>>
>>> However using version 2.4.0 and 2.6.0 (another user reported it) shows
>>> the same error.
>>>
>>> So either it was not 100% fixed, or the bug appeared again.
>>>
>>> Thanks and Regards
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>

-- 

JC


Re: Regression of (BEAM-2277) - IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-20 Thread Juan Carlos Garcia
BUMP!!!

There are people reporting the problem for BEAM 2.6.0
<https://issues.apache.org/jira/browse/BEAM-2277?focusedCommentId=16585979=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16585979>,
any CORE dev out there?

:)





On Mon, Jul 30, 2018 at 3:25 PM Juan Carlos Garcia 
wrote:

> Hi Folks,
>
> I experienced the issued described in (BEAM-2277
> <https://issues.apache.org/jira/browse/BEAM-2277>), which shows it was
> fixed by v2.0.0
>
> However using version 2.4.0 and 2.6.0 (another user reported it) shows the
> same error.
>
> So either it was not 100% fixed, or the bug appeared again.
>
> Thanks and Regards
> --
>
> JC
>
>

-- 

JC


Re: Python-Beam input from Port

2018-08-15 Thread Juan Carlos Garcia
In the way i see it a SocketIIO.read() implementation should just emit
String (or byte[]) back. So creating a PTransform that open the port and
then on the expand() method create a loop to just read the String (or
byte[]) and then emit the values into the pipeline (returning a
PColletion).

Hope it gives you an idea.

On Wed, Aug 15, 2018 at 12:55 AM, Akshay Balwally 
wrote:

> Anyone know how Beam can take input from a port (or some other testable
> timed input?)
> https://stackoverflow.com/questions/51850595/apache-beam-input-from-ports
>
> --
> Akshay Balwally
> Software Engineer
> 9372716469 |
>
> 
>



-- 

JC


Regression of (BEAM-2277) - IllegalArgumentException when using Hadoop file system for WordCount example.

2018-07-30 Thread Juan Carlos Garcia
Hi Folks,

I experienced the issued described in (BEAM-2277
), which shows it was
fixed by v2.0.0

However using version 2.4.0 and 2.6.0 (another user reported it) shows the
same error.

So either it was not 100% fixed, or the bug appeared again.

Thanks and Regards
-- 

JC


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-07-30 Thread Juan Carlos Garcia
Hi Jean,

Thanks for taking a look.


On Mon, Jul 30, 2018 at 2:49 PM, Jean-Baptiste Onofré 
wrote:

> Hi Juan,
>
> it seems that has been introduce by the metrics layer in the core runner
> API.
>
> Let me check.
>
> Regards
> JB
>
> On 30/07/2018 14:47, Juan Carlos Garcia wrote:
> > Bump!
> >
> > Does any of the core-dev roam around here?
> >
> > Can someone provide a feedback about BEAM-4597
> > <https://issues.apache.org/jira/browse/BEAM-4597>
> >
> > Thanks and regards,
> >
> > On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia  > <mailto:jcgarc...@gmail.com>> wrote:
> >
> > Folks,
> >
> > Its someone using the SparkRunner out there with the Spark
> > KryoSerializer ?
> >
> > We are being force to use the not so efficient 'JavaSerializer' with
> > Spark because we face the following exception:
> >
> > 
> > Exception in thread "main" java.lang.RuntimeException:
> > org.apache.spark.SparkException: Job aborted due to stage failure:
> > Exception while getting task result:
> > com.esotericsoftware.kryo.KryoException: Unable to find class:
> > org.apache.beam.runners.core.metrics.MetricsContainerImpl$$
> Lambda$31/1875283985
> > Serialization trace:
> > factory (org.apache.beam.runners.core.metrics.MetricsMap)
> > counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> > metricsContainers
> > (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
> > metricsContainers
> > (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.
> runtimeExceptionFrom(SparkPipelineResult.java:55)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(
> SparkPipelineResult.java:72)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.access$
> 000(SparkPipelineResult.java:41)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult$
> StreamingMode.stop(SparkPipelineResult.java:163)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(
> SparkPipelineResult.java:198)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:101)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:87)
> > at
> > org.apache.beam.examples.BugWithKryoOnSpark.main(
> BugWithKryoOnSpark.java:75)
> > 
> >
> > I created a jira ticket and attached a project example on it,
> > https://issues.apache.org/jira/browse/BEAM-4597
> > <https://issues.apache.org/jira/browse/BEAM-4597>
> >
> > Any feedback is appreciated.
> >
> > --
> >
> > JC
> >
> >
> >
> >
> > --
> >
> > JC
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>



-- 

JC


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-07-30 Thread Juan Carlos Garcia
Bump!

Does any of the core-dev roam around here?

Can someone provide a feedback about BEAM-4597
<https://issues.apache.org/jira/browse/BEAM-4597>

Thanks and regards,

On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia 
wrote:

> Folks,
>
> Its someone using the SparkRunner out there with the Spark KryoSerializer ?
>
> We are being force to use the not so efficient 'JavaSerializer' with Spark
> because we face the following exception:
>
> 
> Exception in thread "main" java.lang.RuntimeException: 
> org.apache.spark.SparkException:
> Job aborted due to stage failure: Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$
> Lambda$31/1875283985
> Serialization trace:
> factory (org.apache.beam.runners.core.metrics.MetricsMap)
> counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> metricsContainers (org.apache.beam.runners.core.metrics.
> MetricsContainerStepMap)
> metricsContainers (org.apache.beam.runners.spark.io.SparkUnboundedSource$
> Metadata)
> at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(
> SparkPipelineResult.java:55)
> at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(
> SparkPipelineResult.java:72)
> at org.apache.beam.runners.spark.SparkPipelineResult.access$
> 000(SparkPipelineResult.java:41)
> at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(
> SparkPipelineResult.java:163)
> at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(
> SparkPipelineResult.java:198)
> at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:101)
> at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:87)
> at org.apache.beam.examples.BugWithKryoOnSpark.main(
> BugWithKryoOnSpark.java:75)
> 
>
> I created a jira ticket and attached a project example on it,
> https://issues.apache.org/jira/browse/BEAM-4597
>
> Any feedback is appreciated.
>
> --
>
> JC
>
>


-- 

JC


Re: Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

2018-07-20 Thread Juan Carlos Garcia
I looked into the PaneInfo but unfortunately it doesn't contain information
regarding the session window that it is useful for me (at least to tags all
the events belonging to same session with a  sessionID) . :(

I have no idea of BEAM internal or how complex would be to implement it,
but it would be cool if such hook or information (windowIdentifier) exist.

Thanks!

On Thu, Jul 19, 2018 at 10:18 PM, Lukasz Cwik  wrote:

> Note that @StartBundle is not co-related with a new pane, but an arbitrary
> runner chosen amount of elements containing any number of windows and
> elements restricted by the triggering semantics.
>
> You can introspect the PaneInfo to see the firing index, index 0
> represents the first firing. I don't believe there is a way to know what is
> the last firing without using a trigger that will produce a known number of
> firings (e.g. watermark trigger with no speculative or late firings).
>
> On Thu, Jul 19, 2018 at 6:12 AM Juan Carlos Garcia 
> wrote:
>
>> Hi Folks,
>>
>> I would like to ask if its possible to be notified when a Windows is
>> created or closed while processing a batch of data. (Sorry for the long
>> post)
>>
>> My scenario:
>> I am using a Session window with a GapDuration of 2 minutes (for
>> testing), during this processing we are assigning a Session identifier to
>> the incoming messages so we can identify them later in ElasticSearch /
>> Other tools, the process works as expected as long as we don't introduce
>> any trigger (during the @ProcessElement we have the the Iterables elements
>> from this windows and from there we can just generate our session
>> identifier like) i.e:
>>
>> 
>> PCollection>> windowedResult = input
>> .apply("Session", Window.into(Sessions.withGapDuration(Duration.
>> standardMinutes(2
>> .apply("Create KV of Users", ParDo.of(new CreateMyKV()))
>> .apply(GroupByKey.create())
>> .apply(ParDo.of(new DoFn>, KV> Iterable>>() {
>> @ProcessElement
>> public void processElement(ProcessContext c, BoundedWindow _window) {
>> System.out.println("-- window:" + _window);
>> System.out.println("session:" + UUID.randomUUID().toString());
>> System.out.println(c.element().getValue());
>> System.out.println("--");
>> c.output(c.element());
>> }
>> }));
>> 
>>
>> After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
>> each fired pane doesn't contain any indications of the windows they belong
>> to, and there is no way (at least i couldn't find) to actually hook into it
>> and generate a Session identifier for the elements that belongs to the same
>> windows.
>>
>> The behavior for @StartBundle is that it fires for each pane and the
>> behavior for @Setup is not consistent as it fires more times than windows
>> we have or sometime it fires less time.
>>
>> Any advised on this matter is welcome and by the way, in production we
>> are using the SparkRunner (which only support ProcessingTime triggers based
>> on the capability-matrix), please find below a JUnit class i am using to
>> validate this behavior.
>>
>> 
>> public class SessionWindowTest {
>> private long TIME = System.currentTimeMillis();
>>
>> @Test
>> public void testSessionWindowWorkAsExpected() {
>> final List testMesages = new LinkedList<>();
>> TIME = System.currentTimeMillis();
>>
>> //
>> // 3 Windows of data
>> //
>>
>> IntStream.range(0, 10).forEach(i -> {
>> testMesages.add("{\"user_id\":123456789,
>> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>> TIME += TimeUnit.SECONDS.toMillis(1);
>> });
>> TIME += TimeUnit.MINUTES.toMillis( 5);
>> IntStream.range(0, 10).forEach(i -> {
>> testMesages.add("{\"user_id\":123456789,
>> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>> TIME += TimeUnit.SECONDS.toMillis(2);
>> });
>> TIME += TimeUnit.MINUTES.toMillis(6);
>> IntStream.range(0, 10).forEach(i -> {
>> testMesages.add("{\"user_id\":123456789,
>> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>> TIME += TimeUnit.SECO

Beam SparkRunner and Spark KryoSerializer problem

2018-07-19 Thread Juan Carlos Garcia
Folks,

Its someone using the SparkRunner out there with the Spark KryoSerializer ?

We are being force to use the not so efficient 'JavaSerializer' with Spark
because we face the following exception:


Exception in thread "main" java.lang.RuntimeException:
org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result:
com.esotericsoftware.kryo.KryoException: Unable to find class:
org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
Serialization trace:
factory (org.apache.beam.runners.core.metrics.MetricsMap)
counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
metricsContainers
(org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
metricsContainers
(org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
at
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
at
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
at
org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
at
org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
at
org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)


I created a jira ticket and attached a project example on it,
https://issues.apache.org/jira/browse/BEAM-4597

Any feedback is appreciated.

-- 

JC


Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

2018-07-19 Thread Juan Carlos Garcia
Hi Folks,

I would like to ask if its possible to be notified when a Windows is
created or closed while processing a batch of data. (Sorry for the long
post)

My scenario:
I am using a Session window with a GapDuration of 2 minutes (for testing),
during this processing we are assigning a Session identifier to the
incoming messages so we can identify them later in ElasticSearch / Other
tools, the process works as expected as long as we don't introduce any
trigger (during the @ProcessElement we have the the Iterables elements from
this windows and from there we can just generate our session identifier
like) i.e:


PCollection>> windowedResult = input
.apply("Session",
Window.into(Sessions.withGapDuration(Duration.standardMinutes(2
.apply("Create KV of Users", ParDo.of(new CreateMyKV()))
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn>, KV>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow _window) {
System.out.println("-- window:" + _window);
System.out.println("session:" + UUID.randomUUID().toString());
System.out.println(c.element().getValue());
System.out.println("--");
c.output(c.element());
}
}));


After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
each fired pane doesn't contain any indications of the windows they belong
to, and there is no way (at least i couldn't find) to actually hook into it
and generate a Session identifier for the elements that belongs to the same
windows.

The behavior for @StartBundle is that it fires for each pane and the
behavior for @Setup is not consistent as it fires more times than windows
we have or sometime it fires less time.

Any advised on this matter is welcome and by the way, in production we are
using the SparkRunner (which only support ProcessingTime triggers based on
the capability-matrix), please find below a JUnit class i am using to
validate this behavior.


public class SessionWindowTest {
private long TIME = System.currentTimeMillis();

@Test
public void testSessionWindowWorkAsExpected() {
final List testMesages = new LinkedList<>();
TIME = System.currentTimeMillis();

//
// 3 Windows of data
//

IntStream.range(0, 10).forEach(i -> {
testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
TIME += TimeUnit.SECONDS.toMillis(1);
});
TIME += TimeUnit.MINUTES.toMillis( 5);
IntStream.range(0, 10).forEach(i -> {
testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
TIME += TimeUnit.SECONDS.toMillis(2);
});
TIME += TimeUnit.MINUTES.toMillis(6);
IntStream.range(0, 10).forEach(i -> {
testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
TIME += TimeUnit.SECONDS.toMillis(4);
});

Pipeline pipe = Pipeline.create();
PCollection input = pipe.apply("Create",
Create.of(testMesages));

PCollection>> windowedResult =
input.apply("setting the time",
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
Any deserialize =
JsonIterator.deserialize(c.element());
c.outputWithTimestamp(c.element(), new
Instant(deserialize.get("time").toLong()));
}
}))
.apply("Session",

Window.into(Sessions.withGapDuration(Duration.standardMinutes(2)))
//
.withAllowedLateness(Duration.standardSeconds(1))
//.discardingFiredPanes()
//
.triggering(AfterProcessingTime.pastFirstElementInPane())
)
.apply("Create KV of Users", ParDo.of(new CreateUserKV()))
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn>,
KV>>() {
private int counter = 0;

@StartBundle
public void startBundle() {
System.out.println("--BUNDLE--");
}

@Setup
public void setupFn() {
System.out.println("--SETUP--");
}
@ProcessElement
public void processElement(ProcessContext c,
BoundedWindow _window) {
System.out.println("-- window:" + _window);
System.out.println("session:" +
UUID.randomUUID().toString());
//System.out.println(c.element().getValue());
System.out.println("--");
c.output(c.element());
}
}));

PipelineResult run = pipe.run();
Assert.assertTrue("Pipeline is done", run.getState() ==