Generic Type PTransform

2018-12-02 Thread Eran Twili
Hi,

We are considering using Beam in our software.
We wish to create a service for a user which will operate Beam for him, and 
obviously the user code doesn't have Beam API visibility.
For that we need to generify some Beam API.
So the user supply functions and we embed them in a generic PTransform and run 
them in a Beam pipeline.
We have some difficulties to understand how can we provide the user with option 
to perform GroupByKey operation.
The problem is that GroupByKey takes KV and our PCollections holds only user 
datatypes which should not be Beam datatypes.
So we thought about having this PTransform:
public class PlatformGroupByKey extends
PTransform>>, 
PCollection {
@Override
public PCollection>>> 
expand(PCollection>> input) {

return input
.apply("MapToKV",
MapElements.via(
new 
SimpleFunction>, KV>() {
@Override
public KV 
apply(CustomType> kv) {
return KV.of(kv.field.getKey(), 
kv.field.getValue()); }}))
.apply("GroupByKey",
GroupByKey.create())
.apply("MapToSimpleImmutableEntry",
MapElements.via(
new SimpleFunction>, 
CustomType>>>() {
@Override
public 
CustomType>> apply(KV> kv) {
return new CustomType<>(new 
SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }}));
}
}
In which we will get PCollection from our key-value type (java's 
SimpleImmutableEntry),
Convert it to KV,
Preform the GroupByKey,
And re-convert it again to SimpleImmutableEntry.

But we get this error in runtime:

java.lang.IllegalStateException: Unable to return a default Coder for 
GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one 
of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Cannot provide coder for 
parameterized type org.apache.beam.sdk.values.KV: Unable to provide a 
Coder for K.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
  Using the default output Coder from the producing PTransform failed: 
PTransform.getOutputCoder called.
at 
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at 
org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at 
org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
at 
org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at 
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at 
org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27)

We don't understand why is K generic type gets into runtime.
In runtime it will been known by the PCollection concrete input parameter that 
is being send to the expand method.
What are we doing wrong? Is there a way to achieve what we want using Beam?
Appreciate any help.

Regards,
Eran


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Graceful shutdown of long-running Beam pipeline on Flink

2018-12-02 Thread Wayne Collins
Hi all,
We have a number of Beam pipelines processing unbounded streams sourced
from Kafka on the Flink runner and are very happy with both the platform
and performance!

The problem is with shutting down the pipelines...for version upgrades,
system maintenance, load management, etc. it would be nice to be able to
gracefully shut these down under software control but haven't been able
to find a way to do so. We're in good shape on checkpointing and then
cleanly recovering but shutdowns are all destructive to Flink or the
Flink TaskManager.

Methods tried:

1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
This would be our preferred method but p.run() doesn't return until
termination and even if it did, the runner code simply throws:
"throw new UnsupportedOperationException("FlinkRunnerResult does not
support cancel.");"
so this doesn't appear to be a near-term option.

2) Inject a "termination" message into the pipeline via Kafka
This does get through, but calling exit() from a stage in the pipeline
also terminates the Flink TaskManager.

3) Inject a "sleep" message, then manually restart the cluster
This is our current method: we pause the data at the source, flood all
branches of the pipeline with a "we're going down" msg so the stages can
do a bit of housekeeping, then hard-stop the entire environment and
re-launch with the new version.

Is there a "Best Practice" method for gracefully terminating an
unbounded pipeline from within the pipeline or from the mainline that
launches it?

Thanks!
Wayne

-- 
Wayne Collins
dades.ca Inc.
mailto:wayn...@dades.ca
cell:416-898-5137



Re: Graceful shutdown of long-running Beam pipeline on Flink

2018-12-02 Thread Juan Carlos Garcia
Hi Wayne,

We have the same setup and we do daily updates to our pipeline.

The way we do it is using the flink tool via a Jenkins.

Basically our deployment job do as follow:

1. Detect if the pipeline is running (it matches via job name)

2. If found, do a flink cancel with a savepoint (we uses hdfs for
checkpoint / savepoint) under a given directory.

3. It uses the flink run command for the new job and specify the savepoint
from step 2.

I don't think there is any support to achieve the same from within the
pipeline. You need to do this externally as explained above.

Best regards,
JC


Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins 
geschrieben:

> Hi all,
> We have a number of Beam pipelines processing unbounded streams sourced
> from Kafka on the Flink runner and are very happy with both the platform
> and performance!
>
> The problem is with shutting down the pipelines...for version upgrades,
> system maintenance, load management, etc. it would be nice to be able to
> gracefully shut these down under software control but haven't been able to
> find a way to do so. We're in good shape on checkpointing and then cleanly
> recovering but shutdowns are all destructive to Flink or the Flink
> TaskManager.
>
> Methods tried:
>
> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
> This would be our preferred method but p.run() doesn't return until
> termination and even if it did, the runner code simply throws:
> "throw new UnsupportedOperationException("FlinkRunnerResult does not
> support cancel.");"
> so this doesn't appear to be a near-term option.
>
> 2) Inject a "termination" message into the pipeline via Kafka
> This does get through, but calling exit() from a stage in the pipeline
> also terminates the Flink TaskManager.
>
> 3) Inject a "sleep" message, then manually restart the cluster
> This is our current method: we pause the data at the source, flood all
> branches of the pipeline with a "we're going down" msg so the stages can do
> a bit of housekeeping, then hard-stop the entire environment and re-launch
> with the new version.
>
> Is there a "Best Practice" method for gracefully terminating an unbounded
> pipeline from within the pipeline or from the mainline that launches it?
>
> Thanks!
> Wayne
>
> --
> Wayne Collinsdades.ca Inc.mailto:wayn...@dades.ca 
> cell:416-898-5137
>
>


Re: Generic Type PTransform

2018-12-02 Thread Matt Casters
There are probably smarter people than me on this list but since I recently
been through a similar thought exercise...

For the generic use in Kettle I have a PCollection going through
the pipeline.
KettleRow is just an Object[] wrapper for which I can implement a Coder.

The "group by" that I implemented does the following:Split
PCollection into PCollection>
Then it  applies the standard GroupByKey.create() giving us
PCollection>>
This means that we can simple aggregate all the elements in
Iterable to aggregate a group.

Well, at least that works for me. The code is open so look at it over here:
https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/transform/GroupByTransform.java

Like you I had trouble with the Coder for my KettleRows so I hacked up this
to make it work:
https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/coder/KettleRowCoder.java

It's set on the pipeline:
pipeline.getCoderRegistry().registerCoderForClass( KettleRow.class, new
KettleRowCoder());

Good luck!
Matt

Op zo 2 dec. 2018 om 20:57 schreef Eran Twili :

> Hi,
>
>
>
> We are considering using Beam in our software.
>
> We wish to create a service for a user which will operate Beam for him,
> and obviously the user code doesn't have Beam API visibility.
>
> For that we need to generify some Beam API.
>
> So the user supply functions and we embed them in a generic *PTransform*
> and run them in a Beam pipeline.
>
> We have some difficulties to understand how can we provide the user with
> option to perform *GroupByKey* operation.
>
> The problem is that *GroupByKey* takes *KV* and our *PCollections* holds
> only user datatypes which should not be Beam datatypes.
>
> So we thought about having this * PTransform*:
>
> public class PlatformGroupByKey extends
> PTransform>>,
> PCollection {
> @Override
> public PCollection>>>
> expand(PCollection>> input) {
>
> return input
> .apply("MapToKV",
> MapElements.*via*(
> new
> SimpleFunction>, KV>() {
> @Override
> public KV apply
> (CustomType> kv) {
> return KV.*of*(kv.field.getKey(), kv.
> field.getValue()); }}))
> .apply("GroupByKey",
> GroupByKey.*create*())
> .apply("MapToSimpleImmutableEntry",
> MapElements.*via*(
> new SimpleFunction>,
> CustomType>>>() {
> @Override
> public CustomType Iterable>> apply(KV> kv) {
> return new CustomType<>(new
> SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }}));
> }
> }
>
> In which we will get *PCollection* from our key-value type (java's
> *SimpleImmutableEntry*),
>
> Convert it to *KV*,
>
> Preform the *GroupByKey*,
>
> And re-convert it again to *SimpleImmutableEntry*.
>
>
>
> But we get this error in runtime:
>
>
>
> java.lang.IllegalStateException: Unable to return a default Coder for
> GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct
> one of the following root causes:
>
>   No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder
> for parameterized type org.apache.beam.sdk.values.KV: Unable to
> provide a Coder for K.
>
>   Building a Coder using a registered CoderProvider failed.
>
>   See suppressed exceptions for detailed failures.
>
>   Using the default output Coder from the producing PTransform failed:
> PTransform.getOutputCoder called.
>
> at
> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
>
> at
> org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
>
> at
> org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
>
> at
> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
>
> at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>
> at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>
> at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>
> at
> org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27)
>
>
>
> We don't understand why is *K* generic type gets into runtime.
>
> In runtime it will been known by the *PCollection* concrete input
> parameter that is being send to the *expand* method.
>
> What are we doing wrong? Is there a way to achieve what we want using Beam?
>
> Appreciate any help.
>
>
>
> Regards,
>
> Eran
>
>
>
>
> Confidentiality: This communication

Re: Inferring Csv Schemas

2018-12-02 Thread Reza Rokni
Hi,

No problem and warm welcome to your Beam journey :-)

Yes, in this case when a issue was found the BigQuery API was used to make
the changes.

Cheers
Reza

On Sat, 1 Dec 2018 at 03:38, Joe Cullen  wrote:

> That's great Reza, thanks! I'm still getting to grips with Beam and
> Dataflow so apologies for all the questions. I have a few more if that's ok:
>
> 1. When the article says "the schema would be mutated", does this mean the
> BigQuery schema?
> 2. Also, when the known good BigQuery schema is retrieved, and if it's the
> BigQuery schema being updated in the question above, is this done with the
> BigQuery API rather than BigQueryIO? In other words, what is the process
> behind the step "validate and mutate BQ schema" in the image?
>
> Thanks,
> Joe
>
> On 30 Nov 2018 16:48, "Reza Rokni"  wrote:
>
> Hi Joe,
>
> That part of the blog should have been written a bit cleaner.. I blame the
> writer ;-) So while that solution worked it was inefficient, this is
> discussed in the next paragraph.. But essentially checking the validity of
> the schema every time is not efficient, especially as they are normally ok.
> So the next paragraph was..
>
>
>
> *However, this design could not make use of the inbuilt efficiencies that
> BigQueryIO provided, and also burdened us with technical debt.Chris then
> tried various other tactics to beat the boss. In his words ... "The first
> attempt at fixing this inefficiency was to remove the costly JSON schema
> detection ‘DoFn’ which every metric goes through, and move it to a ‘failed
> inserts’ section of the pipeline, which is only run when there are errors
> on inserting into BigQuery,”*
>
> Cheers
> Reza
>
> On Fri, 30 Nov 2018 at 09:01, Joe Cullen 
> wrote:
>
>> Thanks Reza, that's really helpful!
>>
>> I have a few questions:
>>
>> "He used a GroupByKey function on the JSON type and then a manual check
>> on the JSON schema against the known good BigQuery schema. If there was a
>> difference, the schema would mutate and the updates would be pushed
>> through."
>>
>> If the difference was a new column had been added to the JSON elements,
>> does there need to be any mutation? The JSON schema derived from the JSON
>> elements would already have this new column, and if BigQuery allows for
>> additive schema changes then this new JSON schema should be fine, right?
>>
>> But then I'm not sure how you would enter the 'failed inserts' section of
>> the pipeline (as the insert should have been successful).
>>
>> Have I misunderstood what is being mutated?
>>
>> Thanks,
>> Joe
>>
>> On Fri, 30 Nov 2018, 11:07 Reza Ardeshir Rokni >
>>> Hi Joe,
>>>
>>> You may find some of the info in this blog of interest, its based on
>>> streaming pipelines but useful ideas.
>>>
>>>
>>> https://cloud.google.com/blog/products/gcp/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix
>>>
>>> Cheers
>>>
>>> Reza
>>>
>>> On Thu, 29 Nov 2018 at 06:53, Joe Cullen 
>>> wrote:
>>>
 Hi all,

 I have a pipeline reading CSV files, performing some transforms, and
 writing to BigQuery. At the moment I'm reading the BigQuery schema from a
 separate JSON file. If the CSV files had a new column added (and I wanted
 to include this column in the resultant BigQuery table), I'd have to change
 the JSON schema or the pipeline itself. Is there any way to autodetect the
 schema using BigQueryIO? How do people normally deal with potential changes
 to input CSVs?

 Thanks,
 Joe

>>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>
>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.