Re: Inferring Csv Schemas

2018-11-30 Thread Joe Cullen
That's great Reza, thanks! I'm still getting to grips with Beam and
Dataflow so apologies for all the questions. I have a few more if that's ok:

1. When the article says "the schema would be mutated", does this mean the
BigQuery schema?
2. Also, when the known good BigQuery schema is retrieved, and if it's the
BigQuery schema being updated in the question above, is this done with the
BigQuery API rather than BigQueryIO? In other words, what is the process
behind the step "validate and mutate BQ schema" in the image?

Thanks,
Joe

On 30 Nov 2018 16:48, "Reza Rokni"  wrote:

Hi Joe,

That part of the blog should have been written a bit cleaner.. I blame the
writer ;-) So while that solution worked it was inefficient, this is
discussed in the next paragraph.. But essentially checking the validity of
the schema every time is not efficient, especially as they are normally ok.
So the next paragraph was..



*However, this design could not make use of the inbuilt efficiencies that
BigQueryIO provided, and also burdened us with technical debt.Chris then
tried various other tactics to beat the boss. In his words ... "The first
attempt at fixing this inefficiency was to remove the costly JSON schema
detection ‘DoFn’ which every metric goes through, and move it to a ‘failed
inserts’ section of the pipeline, which is only run when there are errors
on inserting into BigQuery,”*

Cheers
Reza

On Fri, 30 Nov 2018 at 09:01, Joe Cullen  wrote:

> Thanks Reza, that's really helpful!
>
> I have a few questions:
>
> "He used a GroupByKey function on the JSON type and then a manual check on
> the JSON schema against the known good BigQuery schema. If there was a
> difference, the schema would mutate and the updates would be pushed
> through."
>
> If the difference was a new column had been added to the JSON elements,
> does there need to be any mutation? The JSON schema derived from the JSON
> elements would already have this new column, and if BigQuery allows for
> additive schema changes then this new JSON schema should be fine, right?
>
> But then I'm not sure how you would enter the 'failed inserts' section of
> the pipeline (as the insert should have been successful).
>
> Have I misunderstood what is being mutated?
>
> Thanks,
> Joe
>
> On Fri, 30 Nov 2018, 11:07 Reza Ardeshir Rokni 
>> Hi Joe,
>>
>> You may find some of the info in this blog of interest, its based on
>> streaming pipelines but useful ideas.
>>
>>
>> https://cloud.google.com/blog/products/gcp/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix
>>
>> Cheers
>>
>> Reza
>>
>> On Thu, 29 Nov 2018 at 06:53, Joe Cullen 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a pipeline reading CSV files, performing some transforms, and
>>> writing to BigQuery. At the moment I'm reading the BigQuery schema from a
>>> separate JSON file. If the CSV files had a new column added (and I wanted
>>> to include this column in the resultant BigQuery table), I'd have to change
>>> the JSON schema or the pipeline itself. Is there any way to autodetect the
>>> schema using BigQueryIO? How do people normally deal with potential changes
>>> to input CSVs?
>>>
>>> Thanks,
>>> Joe
>>>
>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


Re: Beam Metrics questions

2018-11-30 Thread Phil Franklin
Etienne, I’ve just discovered that the code I used for my tests overrides the 
command-line arguments, and while I thought I was testing with the SparkRunner 
and FlinkRunner, in fact every test used DirectRunner, which explains why I was 
seeing the committed values.  So there’s no need for a ticket concerning 
committed values from the FlinkRunner.  Sorry for the confusion.

-Phil

Re: Inferring Csv Schemas

2018-11-30 Thread Reza Rokni
Hi Joe,

That part of the blog should have been written a bit cleaner.. I blame the
writer ;-) So while that solution worked it was inefficient, this is
discussed in the next paragraph.. But essentially checking the validity of
the schema every time is not efficient, especially as they are normally ok.
So the next paragraph was..



*However, this design could not make use of the inbuilt efficiencies that
BigQueryIO provided, and also burdened us with technical debt.Chris then
tried various other tactics to beat the boss. In his words ... "The first
attempt at fixing this inefficiency was to remove the costly JSON schema
detection ‘DoFn’ which every metric goes through, and move it to a ‘failed
inserts’ section of the pipeline, which is only run when there are errors
on inserting into BigQuery,”*

Cheers
Reza

On Fri, 30 Nov 2018 at 09:01, Joe Cullen  wrote:

> Thanks Reza, that's really helpful!
>
> I have a few questions:
>
> "He used a GroupByKey function on the JSON type and then a manual check on
> the JSON schema against the known good BigQuery schema. If there was a
> difference, the schema would mutate and the updates would be pushed
> through."
>
> If the difference was a new column had been added to the JSON elements,
> does there need to be any mutation? The JSON schema derived from the JSON
> elements would already have this new column, and if BigQuery allows for
> additive schema changes then this new JSON schema should be fine, right?
>
> But then I'm not sure how you would enter the 'failed inserts' section of
> the pipeline (as the insert should have been successful).
>
> Have I misunderstood what is being mutated?
>
> Thanks,
> Joe
>
> On Fri, 30 Nov 2018, 11:07 Reza Ardeshir Rokni 
>> Hi Joe,
>>
>> You may find some of the info in this blog of interest, its based on
>> streaming pipelines but useful ideas.
>>
>>
>> https://cloud.google.com/blog/products/gcp/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix
>>
>> Cheers
>>
>> Reza
>>
>> On Thu, 29 Nov 2018 at 06:53, Joe Cullen 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a pipeline reading CSV files, performing some transforms, and
>>> writing to BigQuery. At the moment I'm reading the BigQuery schema from a
>>> separate JSON file. If the CSV files had a new column added (and I wanted
>>> to include this column in the resultant BigQuery table), I'd have to change
>>> the JSON schema or the pipeline itself. Is there any way to autodetect the
>>> schema using BigQueryIO? How do people normally deal with potential changes
>>> to input CSVs?
>>>
>>> Thanks,
>>> Joe
>>>
>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


Re: Beam Metrics questions

2018-11-30 Thread Phil Franklin
Hi again, Etienne!  You didn’t mention whether spark is reporting committed 
values now, but you also didn’t mention opening a ticket concerning the spark 
output.  Am I right in inferring that spark does in fact report committed 
values?

Thanks!
-Phil

On 2018/11/30 13:57:43, Etienne Chauchot  wrote: 
> Hi Phil,> 
> Thanks for using MetricsPusher and Beam in general ! > 
> - MetricsHttpSink works that way: it filters out committed metrics from the 
> json output when committed metrics are not> 
> supported.  I checked, Flink runner still does not support committed metrics. 
> So there should be no committed metrics> 
> values in the output json.There might be a bug. I'll open a ticket: thx for 
> pointing out ! You tested on flink and spark> 
> right? and both output committed metrics values right?> 
> - there is no default mechanism to fallback committed metrics values on 
> attempted ones> 
> - Apache Flink does no make flink Accumulators available in detached mode, so 
> indeed, metrics are not available in this> 
> mode.> 
> CCing dev list.> 
> Etienne> 
> Le lundi 26 novembre 2018 à 15:57 -0600, Phil Franklin a écrit :> 
> > All of the discussion I’ve seen says that Flink and Spark only provided 
> > attempted metric values, but when I use> 
> > MetricsHttpSink and look at the JSON it has both attempted and committed 
> > values (albeit, both the same for my simple> 
> > testing).  Has the metrics processing been updated recently, and I’m just 
> > missing the change updates?  Or are the> 
> > committed values being defaulted to the attempted values? > 
> > > 
> > Also, I’ve seen it mentioned that Flink doesn’t report metrics when in 
> > detached mode.  Is this still the case?> 
> > > 
> > > 
> > Thanks for your help!> 
> 

Re: Beam Metrics questions

2018-11-30 Thread Phil Franklin
Hi, Etienne!  Thanks for the response.  Yes, I ran the test with both the flink 
and spark runners, and both showed committed and attempted values.  

I didn’t actually use MetricsPusher for these tests.  I have questions about 
MetricsPusher, but I’ll put those in another post.

-Phil

On 2018/11/30 13:57:43, Etienne Chauchot  wrote: 
> Hi Phil,> 
> Thanks for using MetricsPusher and Beam in general ! > 
> - MetricsHttpSink works that way: it filters out committed metrics from the 
> json output when committed metrics are not> 
> supported.  I checked, Flink runner still does not support committed metrics. 
> So there should be no committed metrics> 
> values in the output json.There might be a bug. I'll open a ticket: thx for 
> pointing out ! You tested on flink and spark> 
> right? and both output committed metrics values right?> 
> - there is no default mechanism to fallback committed metrics values on 
> attempted ones> 
> - Apache Flink does no make flink Accumulators available in detached mode, so 
> indeed, metrics are not available in this> 
> mode.> 
> CCing dev list.> 
> Etienne> 
> Le lundi 26 novembre 2018 à 15:57 -0600, Phil Franklin a écrit :> 
> > All of the discussion I’ve seen says that Flink and Spark only provided 
> > attempted metric values, but when I use> 
> > MetricsHttpSink and look at the JSON it has both attempted and committed 
> > values (albeit, both the same for my simple> 
> > testing).  Has the metrics processing been updated recently, and I’m just 
> > missing the change updates?  Or are the> 
> > committed values being defaulted to the attempted values? > 
> > > 
> > Also, I’ve seen it mentioned that Flink doesn’t report metrics when in 
> > detached mode.  Is this still the case?> 
> > > 
> > > 
> > Thanks for your help!> 
> 

Re: Inferring Csv Schemas

2018-11-30 Thread Joe Cullen
Thanks Reza, that's really helpful!

I have a few questions:

"He used a GroupByKey function on the JSON type and then a manual check on
the JSON schema against the known good BigQuery schema. If there was a
difference, the schema would mutate and the updates would be pushed
through."

If the difference was a new column had been added to the JSON elements,
does there need to be any mutation? The JSON schema derived from the JSON
elements would already have this new column, and if BigQuery allows for
additive schema changes then this new JSON schema should be fine, right?

But then I'm not sure how you would enter the 'failed inserts' section of
the pipeline (as the insert should have been successful).

Have I misunderstood what is being mutated?

Thanks,
Joe

On Fri, 30 Nov 2018, 11:07 Reza Ardeshir Rokni  Hi Joe,
>
> You may find some of the info in this blog of interest, its based on
> streaming pipelines but useful ideas.
>
>
> https://cloud.google.com/blog/products/gcp/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix
>
> Cheers
>
> Reza
>
> On Thu, 29 Nov 2018 at 06:53, Joe Cullen 
> wrote:
>
>> Hi all,
>>
>> I have a pipeline reading CSV files, performing some transforms, and
>> writing to BigQuery. At the moment I'm reading the BigQuery schema from a
>> separate JSON file. If the CSV files had a new column added (and I wanted
>> to include this column in the resultant BigQuery table), I'd have to change
>> the JSON schema or the pipeline itself. Is there any way to autodetect the
>> schema using BigQueryIO? How do people normally deal with potential changes
>> to input CSVs?
>>
>> Thanks,
>> Joe
>>
>


Re: Beam Metrics questions

2018-11-30 Thread Etienne Chauchot
Hi Phil,
Thanks for using MetricsPusher and Beam in general ! 
- MetricsHttpSink works that way: it filters out committed metrics from the 
json output when committed metrics are not
supported.  I checked, Flink runner still does not support committed metrics. 
So there should be no committed metrics
values in the output json.There might be a bug. I'll open a ticket: thx for 
pointing out ! You tested on flink and spark
right? and both output committed metrics values right?
- there is no default mechanism to fallback committed metrics values on 
attempted ones
- Apache Flink does no make flink Accumulators available in detached mode, so 
indeed, metrics are not available in this
mode.
CCing dev list.
Etienne
Le lundi 26 novembre 2018 à 15:57 -0600, Phil Franklin a écrit :
> All of the discussion I’ve seen says that Flink and Spark only provided 
> attempted metric values, but when I use
> MetricsHttpSink and look at the JSON it has both attempted and committed 
> values (albeit, both the same for my simple
> testing).  Has the metrics processing been updated recently, and I’m just 
> missing the change updates?  Or are the
> committed values being defaulted to the attempted values? 
> 
> Also, I’ve seen it mentioned that Flink doesn’t report metrics when in 
> detached mode.  Is this still the case?
> 
> 
> Thanks for your help!


Re: Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

2018-11-30 Thread Robert Bradshaw
I'd also like to put the perspective out there that composite
transforms are like subroutines; their inner complexity should not
concern the end user and probably is the wrong thing to optimize for
(assuming there are not other costs, e.g. performance, and of course
we shouldn't have unnecessary complexity). This allows, operations
like Write (as an example) can be build out of the same kind of
simpler pipeline operations rather than having special classes
operators sinks (that still need all the complexity (write out
temporary shards, move successful ones to a consistent naming, remove
temporaries, etc.) but where the structure is less flexibly hard-coded
into the system).

Better representations of this hierarchical structure (vs. being
forced to look at the entire everything-unrolled-and-inlined view) is
what I think we should solve long-term. For word count it's annoying
(and surprising) that so much is going on, this becomes especially
important for larger pipelines with hundreds or thousands of stages
that may have high-level structure but become impenetrable in the
flattened, physical view.

At the very least, it'd be useful if these UIs had hooks where we
could map physical views into the logical views as understood by beam.
How best to represent these mappings is as yet an unsolved problem.

- Robert


On Fri, Nov 30, 2018 at 12:40 PM Maximilian Michels  wrote:
>
> Hi Akshay,
>
> I think you're bringing up a very important point. Simplicity with
> minimal complexity is something that we strive for. In the case of the
> Write transform, the complexity was mainly added due to historical
> reasons which Kenneth mentioned.
>
> It is to note that some Runners don't even benefit from it because they
> don't support incremental recovery. I believe we will do work in the
> future to simplify the Write transform.
>
> If you look at other pipelines which don't use that transform you will
> find that they are much simpler.
>
> What can really help is to not expand the composite transforms, but
> transforms need to be expanded during translation and collapsing those
> transforms again after translation to Spark/Flink can be tricky.
>
> Generally speaking, we have realized this is an issue and have plans to
> fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859.
>
> Thanks,
> Max
>
> On 28.11.18 16:52, Kenneth Knowles wrote:
> > In fact, that logic in FileIO is "required" to have consistent output
> > even just for batch computation, because any step along the way may fail
> > and retry.
> >
> > I put "required" in quotes because there's a legacy concern at play here
> > - FileIO is written using the assumption that shuffles are
> > checkpointing. What is actually required is that the input to the last
> > stage will be the same, even if the whole pipeline goes down and has to
> > be brought back up. So the extra shuffle in those pipelines represents a
> > necessary checkpoint prior to running the last stage. In the
> > saveAsTextFile primitive (caveat: my understanding is vague and stale)
> > this would be done in a checkpoint finalization callback, and you have
> > to wait for that to complete before consuming the output if you want to
> > ensure correctness.
> >
> > Another relevant piece of information is that Beam has support for two
> > things that would make it easier to decipher the UI:
> >
> > 1. Nodes can have meaningful names. So that would make it obvious which
> > part is doing what.
> > 2. Transforms can be built as composites of other transforms, and this
> > is encouraged. In some UIs, notable Cloud Dataflow, the composites are
> > shown as a single box, so it is easier to understand.
> >
> > I would not expect every engine to adopt all of Beam's features like
> > these, but there might be a clever way to make the information available.
> >
> > Kenn
> >
> > On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson
> > mailto:timrobertson...@gmail.com>> wrote:
> >
> > Hi Akshay
> >
> > My understanding is that this all comes from the final FileIO
> > write() stage.
> >
> > When writing, the FileIO puts data into temporary files similar to
> > the output formats for Hadoop MapReduce. Once ready to commit, it
> > does something along the lines of a directory scan to determine
> > which files need to be moved into the final output location. It is
> > that directory scan stage that causes the complex DAG and it runs
> > very quickly. While it looks complicated, I gather it is necessary
> > to support the needs of batch/streaming and in particular the
> > behaviour under failure scenarios.
> >
> > I agree with you that from a developer perspective it is very
> > difficult to understand. If you were to replace the FileIO write()
> > with e.g. a push into a database (JdbcIO), or ElasticsearchIO or
> > SolrIO etc you will see a much more familiar and simpler to
> > understand DAG - it might be worth trying that to see. Over time I
> > expe

Re:

2018-11-30 Thread Matt Casters
I just wanted to thank you again.  I split up my project in a beam core
stuff and my plugin.  This got rid of a number of circular dependency
issues and lib conflicts.
I also gave the Dataflow PipelineOptions the list of files to stage.

That has made things work and much quicker than I anticipated I must admit.
I'm in awe of how clean and intuitive the Beam API is (once you get the
hang of it).
Thanks for everything!

https://github.com/mattcasters/kettle-beam-core
https://github.com/mattcasters/kettle-beam

Cheers,

Matt
---
Matt Casters attcast...@gmail.com>
Senior Solution Architect, Kettle Project Founder


Op do 29 nov. 2018 om 19:03 schreef Matt Casters :

> Thanks a lot for the replies. The problem is not that the jar files aren't
> in the classloader, it's that something somewhere insists on using the
> parent classloader.
> I guess it makes sense since I noticed that running in my IDEA Beam copied
> all required runtime binaries into GCP Storage so it must have an idea of
> what to pick up.
> I'm guessing it tries to pick up everything in the classpath.
>
> Throwing all the generated maven jar files into the main classpath of
> Kettle in this case is a bit messy I'm going to look for an alternative
> like an application alongside to communicate with.
>
> I'll report back once I get a bit further along.
>
> Cheers,
> Matt
>
> Op do 29 nov. 2018 om 17:10 schreef Juan Carlos Garcia <
> jcgarc...@gmail.com>:
>
>> If you are using Gradle for packaging, make sure your final jar (fat-jar)
>> contains all the services files merged.
>>
>> Using the Gradle shadowJar plugin include "*mergeServiceFiles()*"
>> instruction like:
>>
>> apply plugin: 'com.github.johnrengelman.shadow'
>> shadowJar {
>> mergeServiceFiles()
>>
>> zip64 true
>> classifier = 'bundled'
>> }
>>
>> If you are using Maven then use the Shade plugin.
>>
>> On Thu, Nov 29, 2018 at 4:50 PM Robert Bradshaw 
>> wrote:
>>
>>> BeamJava uses com.google.auto.service.AutoService which, at the end of
>>> the day, is shorthand for Java's standard ServiceLoader mechanisms
>>> (e.g. see [1]). I'm not an expert on the details of how this works,
>>> but you'll probably have to make sure these filesystem dependencies
>>> are in your custom classloader's jar.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
>>> On Thu, Nov 29, 2018 at 3:57 PM Matt Casters 
>>> wrote:
>>> >
>>> > Hello Beam,
>>> >
>>> > I've been taking great steps forward in having Kettle generate Beam
>>> pipelines and they actually execute just find in unit testing in IntelliJ.
>>> > The problem starts when I collect all the libraries needed for Beam
>>> and the Runners and throw them into the Kettle project as a plugin.
>>> >
>>> > Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme gs
>>> > at org.apache.beam.sdk.io
>>> .FileSystems.getFileSystemInternal(FileSystems.java:456)
>>> > at org.apache.beam.sdk.io
>>> .FileSystems.matchNewResource(FileSystems.java:526)
>>> > at org.apache.beam.sdk.io
>>> .FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:213)
>>> > at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:700)
>>> > at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1028)
>>> > at
>>> org.kettle.beam.core.transform.BeamOutputTransform.expand(BeamOutputTransform.java:87)
>>> > ... 32 more
>>> >
>>> > This also happens for local file execution ("scheme file" in that
>>> case).
>>> >
>>> > So the questions are: how is Beam bootstrapped? How does Beam
>>> determine which libraries to use and what is the recommended way for
>>> packaging things up properly?
>>> > The Beam plugin is running in a separate URLClassloader so I think
>>> something is going awry there.
>>> >
>>> > Thanks a lot for any answers or tips you might have!
>>> >
>>> > Matt
>>> > ---
>>> > Matt Casters 
>>> > Senior Solution Architect, Kettle Project Founder
>>> >
>>> >
>>>
>>
>>
>> --
>>
>> JC
>>
>>


Re: Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

2018-11-30 Thread Maximilian Michels

Hi Akshay,

I think you're bringing up a very important point. Simplicity with 
minimal complexity is something that we strive for. In the case of the 
Write transform, the complexity was mainly added due to historical 
reasons which Kenneth mentioned.


It is to note that some Runners don't even benefit from it because they 
don't support incremental recovery. I believe we will do work in the 
future to simplify the Write transform.


If you look at other pipelines which don't use that transform you will 
find that they are much simpler.


What can really help is to not expand the composite transforms, but 
transforms need to be expanded during translation and collapsing those 
transforms again after translation to Spark/Flink can be tricky.


Generally speaking, we have realized this is an issue and have plans to 
fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859.


Thanks,
Max

On 28.11.18 16:52, Kenneth Knowles wrote:
In fact, that logic in FileIO is "required" to have consistent output 
even just for batch computation, because any step along the way may fail 
and retry.


I put "required" in quotes because there's a legacy concern at play here 
- FileIO is written using the assumption that shuffles are 
checkpointing. What is actually required is that the input to the last 
stage will be the same, even if the whole pipeline goes down and has to 
be brought back up. So the extra shuffle in those pipelines represents a 
necessary checkpoint prior to running the last stage. In the 
saveAsTextFile primitive (caveat: my understanding is vague and stale) 
this would be done in a checkpoint finalization callback, and you have 
to wait for that to complete before consuming the output if you want to 
ensure correctness.


Another relevant piece of information is that Beam has support for two 
things that would make it easier to decipher the UI:


1. Nodes can have meaningful names. So that would make it obvious which 
part is doing what.
2. Transforms can be built as composites of other transforms, and this 
is encouraged. In some UIs, notable Cloud Dataflow, the composites are 
shown as a single box, so it is easier to understand.


I would not expect every engine to adopt all of Beam's features like 
these, but there might be a clever way to make the information available.


Kenn

On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson 
mailto:timrobertson...@gmail.com>> wrote:


Hi Akshay

My understanding is that this all comes from the final FileIO
write() stage.

When writing, the FileIO puts data into temporary files similar to
the output formats for Hadoop MapReduce. Once ready to commit, it
does something along the lines of a directory scan to determine
which files need to be moved into the final output location. It is
that directory scan stage that causes the complex DAG and it runs
very quickly. While it looks complicated, I gather it is necessary
to support the needs of batch/streaming and in particular the
behaviour under failure scenarios.

I agree with you that from a developer perspective it is very
difficult to understand. If you were to replace the FileIO write()
with e.g. a push into a database (JdbcIO), or ElasticsearchIO or
SolrIO etc you will see a much more familiar and simpler to
understand DAG - it might be worth trying that to see. Over time I
expect you will simply ignore that final job when looking at the DAG
as you know it is just the output committer stage.

I don't know if you are using HDFS but if so, please be aware of
BEAM-5036 [1] which is fixed in 2.9.0-SNAPSHOT and will be released
with 2.9.0 in the coming days. It relates to what I outline above,
where the files were actually copied into place rather than simply
moved. On my jobs, I saw a very large increase in performance
because of this and brought Beam much closer to native spark in
terms of runtime performance.

I hope this helps,
Tim


[1] https://issues.apache.org/jira/browse/BEAM-5036

On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole
mailto:akshaymend...@gmail.com>> wrote:

Hi,
     We are in a process of evaluating different execution
engines (mainly apache spark and apache flink) for our
production batch and streaming pipelines. We thought of using
apache beam as a unified programming model framework to write
the pipelines. When we executed simple wordcount pipeline using
both flink-runner and spark-runner, we saw that the DAG for the
pipeline in both flink and spark when executed using beam code
had lot of operators/nodes which cannot be explained. When we
wrote the same wordcount program using the APIs provided by the
underlined execution engine, the DAGs were way too simpler and
could be easily explained.
Below is an example of wordcount program executed in spark.

This is the DAG w

Re: Inferring Csv Schemas

2018-11-30 Thread Reza Ardeshir Rokni
Hi Joe,

You may find some of the info in this blog of interest, its based on
streaming pipelines but useful ideas.

https://cloud.google.com/blog/products/gcp/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix

Cheers

Reza

On Thu, 29 Nov 2018 at 06:53, Joe Cullen  wrote:

> Hi all,
>
> I have a pipeline reading CSV files, performing some transforms, and
> writing to BigQuery. At the moment I'm reading the BigQuery schema from a
> separate JSON file. If the CSV files had a new column added (and I wanted
> to include this column in the resultant BigQuery table), I'd have to change
> the JSON schema or the pipeline itself. Is there any way to autodetect the
> schema using BigQueryIO? How do people normally deal with potential changes
> to input CSVs?
>
> Thanks,
> Joe
>