Re: Checkpointing Dataflow Pipeline

2021-04-08 Thread Michael Luckey
 checkpoint, so the stream would resume off of the last
>>> committed checkpoint location.
>>>
>>> It doesn't seem Beam has an external checkpoint that persists beyond a
>>> single stream execution, so in Beam with Kinesis I believe you'll have to
>>> manage your own offsets deliberately with an external source if you want to
>>> achieve 'exactly once' semantics in the event of shutting down a stream and
>>>  resuming it at a later point.
>>>
>>> In Kafka you don't need this since as long as we ensure our offsets are
>>> committed in finalization of a bundle, the offsets for a particular group
>>> id are stored on the server.
>>>
>>>
>>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:
>>>
>>>> This sounds similar to the "Kafka Commit" in
>>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>>>  and also to how PubsubIO ACKs messages in the
>>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>>>> just asking to clarify, in case other folks know more, like +Alexey
>>>> Romanenko  and +Ismaël Mejía
>>>>  have modified KinesisIO. If the feature does not
>>>> exist today, perhaps we can identify the best practices around this 
>>>> pattern.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey 
>>>> wrote:
>>>>
>>>>> Hi Kenn,
>>>>>
>>>>> yes, resuming reading at the proper timestamp is exactly the issue we
>>>>> are currently struggling with. E.g. with Kinesis Client Lib we could store
>>>>> the last read within some dynamo table. This mechanism is not used with
>>>>> beam, as we understand, the runner is responsible to track that checkpoint
>>>>> mark.
>>>>>
>>>>> Now, obviously on restarting the pipeline, e.g. on non compatible
>>>>> upgrade, that is, an pipeline update is just not feasible, there must be
>>>>> some mechanism in place on how Dataflow will know where to continue. Is
>>>>> that simply the pipeline name? Or is there more involved? So how does
>>>>> checkpointing actually work here?
>>>>>
>>>>> Based on 'name', wouldn't that imply that something like (example
>>>>> taken from
>>>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>>>>> )
>>>>>
>>>>>   export REGION="us-central1"
>>>>>
>>>>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>>>>> +%Y%m%d-%H%M%S`" \
>>>>> --template-file-gcs-location "$TEMPLATE_PATH" \
>>>>> --parameters inputSubscription="$SUBSCRIPTION" \
>>>>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>>>>> --region "$REGION"
>>>>>
>>>>> will not resume on last read on rerun, because the name obviously
>>>>> changes here?
>>>>>
>>>>> best,
>>>>>
>>>>> michel
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> I would assume the main issue is resuming reading from the Kinesis
>>>>>> stream from the last read? In the case for Pubsub (just as another 
>>>>>> example
>>>>>> of the idea) this is part of the internal state of a pre-created
>>>>>> subscription.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi list,
>>>>>>>
>>>>>>> with our current project we are implementing our streaming pipeline
>>>>>>> based on Google Dataflow.
>>>>>>>
>>>>>>> Essentially we receive input via Kinesis, doing some filtering,
>>>>>>> enrichment and sessionizing and output to PubSub and/or google storage.
>>>>>>>
>>>>>>> After short investigations it is not clear to us, how checkpointing
>>>>>>> will work running on Dataflow in connection with KinesisIO. Is there any
>>>>>>> documentation/discussions to get a better understanding on how that 
>>>>>>> will be
>>>>>>> working? Especially if we are forced to restart our pipelines, how 
>>>>>>> could we
>>>>>>> ensure not to loose any events?
>>>>>>>
>>>>>>> As far as I understand currently, it should work 'auto-magically'
>>>>>>> but it is not yet clear to us, how it will actually behave. Before we 
>>>>>>> try
>>>>>>> to start testing our expectations or even try to implement some
>>>>>>> watermark-tracking by ourself we hoped to get some insights from other
>>>>>>> users here.
>>>>>>>
>>>>>>> Any help appreciated.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> michel
>>>>>>>
>>>>>>
>>>
>>> ~Vincent
>>>
>>


Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Michael Luckey
Hi Kenn,

yes, resuming reading at the proper timestamp is exactly the issue we are
currently struggling with. E.g. with Kinesis Client Lib we could store the
last read within some dynamo table. This mechanism is not used with beam,
as we understand, the runner is responsible to track that checkpoint mark.

Now, obviously on restarting the pipeline, e.g. on non compatible upgrade,
that is, an pipeline update is just not feasible, there must be some
mechanism in place on how Dataflow will know where to continue. Is that
simply the pipeline name? Or is there more involved? So how does
checkpointing actually work here?

Based on 'name', wouldn't that imply that something like (example taken
from
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
)

  export REGION="us-central1"

  gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters inputSubscription="$SUBSCRIPTION" \
--parameters outputTable="$PROJECT:$DATASET.$TABLE" \
--region "$REGION"

will not resume on last read on rerun, because the name obviously changes
here?

best,

michel



On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:

> I would assume the main issue is resuming reading from the Kinesis stream
> from the last read? In the case for Pubsub (just as another example of the
> idea) this is part of the internal state of a pre-created subscription.
>
> Kenn
>
> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  wrote:
>
>> Hi list,
>>
>> with our current project we are implementing our streaming pipeline based
>> on Google Dataflow.
>>
>> Essentially we receive input via Kinesis, doing some filtering,
>> enrichment and sessionizing and output to PubSub and/or google storage.
>>
>> After short investigations it is not clear to us, how checkpointing will
>> work running on Dataflow in connection with KinesisIO. Is there any
>> documentation/discussions to get a better understanding on how that will be
>> working? Especially if we are forced to restart our pipelines, how could we
>> ensure not to loose any events?
>>
>> As far as I understand currently, it should work 'auto-magically' but it
>> is not yet clear to us, how it will actually behave. Before we try to start
>> testing our expectations or even try to implement some watermark-tracking
>> by ourself we hoped to get some insights from other users here.
>>
>> Any help appreciated.
>>
>> Best,
>>
>> michel
>>
>


Checkpointing Dataflow Pipeline

2021-04-06 Thread Michael Luckey
Hi list,

with our current project we are implementing our streaming pipeline based
on Google Dataflow.

Essentially we receive input via Kinesis, doing some filtering, enrichment
and sessionizing and output to PubSub and/or google storage.

After short investigations it is not clear to us, how checkpointing will
work running on Dataflow in connection with KinesisIO. Is there any
documentation/discussions to get a better understanding on how that will be
working? Especially if we are forced to restart our pipelines, how could we
ensure not to loose any events?

As far as I understand currently, it should work 'auto-magically' but it is
not yet clear to us, how it will actually behave. Before we try to start
testing our expectations or even try to implement some watermark-tracking
by ourself we hoped to get some insights from other users here.

Any help appreciated.

Best,

michel


Re: Problem with gzip

2019-05-15 Thread Michael Luckey
@Robert

Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?

On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw  wrote:

> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than the source that it's reading from. In my
> experience, generally the read is the bottleneck in this case, but
> it's possible (e.g. if the input compresses extremely well) that it is
> the write that is slow (which you seem to indicate based on your
> observation of the UI, right?).
>
> It could be that materializing to temporary files is cheaper than
> materializing randomly to shuffle (especially on pre-portable Python).
> In that case you could force a fusion break with a side input instead.
> E.g.
>
> class FusionBreak(beam.PTransform):
> def expand(self, pcoll):
> # Create an empty PCollection that depends on pcoll.
> empty = pcoll | beam.FlatMap(lambda x: ())
> # Use this empty PCollection as a side input, which will force
> a fusion break.
> return pcoll | beam.Map(lambda x, unused: x,
> beam.pvalue.AsIterable(empty))
>
> which could be used in place of Reshard like
>
> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>
> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
>
> On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
> >
> > Hi
> >
> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >
> > https://github.com/nielsbasjes/splittablegzip
> >
> >
> > I have used this a lot in the past.
> > Perhaps porting this project to beam is an option?
> >
> > Niels Basjes
> >
> >
> >
> > On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
> >>
> >> Sorry I couldn't be more helpful.
> >>
> >> From: Allie Chen 
> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> To: 
> >> Cc: user
> >>
> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option
> for us.
> >>>
> >>>
> >>> I am trying to speed up Reshuffle step, since it waits for all data.
> Here are two ways I have tried:
> >>>
> >>> 1.  add timestamps to the PCollection's elements after reading (since
> it is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
> >>>
> >>>
> >>> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >>>
> >>> DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.
> >>>
> >>> Streaming autoscaling
> >>>
> >>> I doubt whether this approach can solve my issue.
> >>>
> >>>
> >>> Thanks so much!
> >>>
> >>> Allie
> >>>
> >>>
> >>> From: Lukasz Cwik 
> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >>> To: dev
> >>> Cc: user
> >>>
>  Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
>  If not, you could structure your pipeline
>  ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>  ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>  ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>  and then run it as a batch pipeline.
> 
>  You can set --streaming=true on the pipeline and then it will run in
> a streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> 
>  From: Allie Chen 
>  Date: Tue, May 14, 2019 at 7:38 AM
>  To: 
>  Cc: user
> 
> > Is it possible to use windowing or somehow pretend it is streaming
> so Reshuffle or GroupByKey won't wait until all data has been read?
> >
> > Thanks!
> > Allie
> >
> > From: Lukasz Cwik 
> > Date: Fri, May 10, 2019 at 5:36 PM
> > To: dev
> > Cc: user
> >
> >> There is no such flag to turn of fusion.
> >>
> >> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >>
> >> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
> >>
> >> On Fri, May 10, 2019 at 2:24 PM Allie Chen 
> wrote:
> >>>
> >>> Re Lukasz: Thanks! I am not able to control the compression format
> but I will see whether the splitting gzip files will work. Is there a
> simple flag in Dataflow that could turn off the fusion?
> >>>
> >>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow 

Re: Problem with gzip

2019-05-10 Thread Michael Luckey
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.

[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118

On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik  wrote:

> There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax 
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen 
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * 
>>> *Cc: * 
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
 Reshuffle transform itself takes hours or even days to run, according to
 one test (24 gzip files, 17 million lines in total) I did.

 The file format for our users are mostly gzip format, since
 uncompressed files would be too costly to store (It could be in hundreds of
 GB).

 Thanks,

 Allie


 *From: *Lukasz Cwik 
 *Date: *Fri, May 10, 2019 at 4:07 PM
 *To: *dev, 

 +user@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
> all the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow 
>> fused
>> all transforms together.  There are a large amount of data in the file, 
>> and
>> I expect to see more workers spinning up after reading transforms. I 
>> tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed 
>> until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the
>> other transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Testing of Metrics in context of DoFnTester

2017-05-10 Thread Michael Luckey
Hi Pablo,

thx for the pointers. I'll have a look into it and let you know.

Regards,

michel

On Wed, May 10, 2017 at 3:18 AM, Pablo Estrada <pabl...@google.com> wrote:

> Hi Michael,
> I'm sorry. I see I did not read your first email properly.
>
> There are a couple places in the core SDK or runner code and tests that
> used to use aggregators, and now use metrics. There are two reasonable
> options for this:
> 1. In [1], the test sets up the metrics global environment by setting the
> current container (e.g.  MetricsEnvironment.setCurrentContainer(new
> MetricsContainer("anystep"));), and the LateDataFilter uses metrics
> normally[2], by creating a counter that relies on the environment set up in
> the test.
>
> 2. If you'd rather not rely on setting up a global environment, you can
> use CounterCell, and pass it in to your test. In [3] it's not a test, but a
> CounterCell is still created to keep internal statistics, and later its
> value is checked [4]. As a note, you may note in [3] that CounterCells are
> a bit quirky to create, as we did not intend for external users to be able
> to create them.
>
> Let me know if these suggestions are helpful.
> Best
> -P.
>
> [1] - https://github.com/apache/beam/blob/master/runners/core-
> java/src/test/java/org/apache/beam/runners/core/
> LateDataDroppingDoFnRunnerTest.java#L61
> [2] - https://github.com/apache/beam/blob/master/runners/core-
> java/src/main/java/org/apache/beam/runners/core/
> LateDataDroppingDoFnRunner.java#L96
> [3] - https://github.com/apache/beam/blob/master/runners/
> spark/src/main/java/org/apache/beam/runners/spark/stateful/
> SparkGroupAlsoByWindowViaWindowSet.java#L210
> [4] - https://github.com/apache/beam/blob/master/runners/
> spark/src/main/java/org/apache/beam/runners/spark/stateful/
> SparkGroupAlsoByWindowViaWindowSet.java#L326
>
> On Tue, May 9, 2017 at 4:33 PM Michael Luckey <adude3...@gmail.com> wrote:
>
>> Hi Pablo,
>>
>> thanks for your help! We certainly could change our testing code and
>> involve execution of a pipeline during tests.
>>
>> But currently we are leveraging DoFnTester, i.e. scoping our tests to the
>> DoFn only, which means, there is neither a pipeline nor a pipeline result
>> involved, which i could call upon.
>>
>> It might be a bad idea trying to test counters on this basis, but as it
>> was supported previously i thought we might have overlooked an API for
>> accessing these metrics somehow within DoFnTesting. Not sure, wether it
>> makes sense for the DoFnTester to somehow provide Metrics-Support to enable
>> this kind of tests. I certainly do not like the idea to much starting to do
>> some mocking of the metrics api within my test implementation.
>>
>> Regards,
>>
>> michel
>>
>>
>> On Wed, May 10, 2017 at 1:10 AM, Pablo Estrada <pabl...@google.com>
>> wrote:
>>
>>> Hi Michael,
>>> For the Metrics API, the way to programatically query the value of a
>>> metric is by using the MetricResults.queryMetrics method. You get the
>>> MetricResults object from the PipelineResult object, and query it like so:
>>>
>>> PipelineResult res = p.run();
>>> MetricQueryResults metricResult = res.metrics().queryMetrics();
>>>
>>> The queryMetrics method takes in a MetricsFilter instance.
>>>
>>> Not all runners support this operation. For the dataflow runner, PR
>>> 2896[1] should add it.
>>>
>>> Let me know if you need more help with this.
>>> Best
>>> -P.
>>>
>>> [1] - https://github.com/apache/beam/pull/2896
>>>
>>> On Tue, May 9, 2017 at 3:48 PM Michael Luckey <adude3...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> currently we are evaluating a migration from 0.6.0 to current. We
>>>> encountered the following problem, which we currently not sure, how to best
>>>> solve.
>>>>
>>>> Say we have a DoFn, which is using Aggregators, e.g.
>>>>
>>>> ctr = createAggregator("someCounter", Sum.ofLongs());
>>>>
>>>>  We were testing them with DoFn-Tester like
>>>>
>>>> final DoFnTester<Record, Record> doFnTester =
>>>> DoFnTester.of(fixture);
>>>> doFnTester.processElement(input);
>>>>
>>>>  assertThat(doFnTester.getAggregatorValue(fixture.ctr).longValue(),
>>>> is(1L));
>>>>
>>>> As aggregators are removed now from the codebase, we are considering
>>>> using Metrics instead. But we did not find an equivalent to the
>>>> getAggregatorValue method on DoFnTester.
>>>>
>>>> Any suggestion how we could keep our counters tested within a unit test
>>>> based on DoFnTester? Or do we have to find a completely different solution?
>>>> Are we doing something completely wrong trying to test correct workings of
>>>> our counters with this approach?
>>>>
>>>> Regards,
>>>>
>>>> michel
>>>>
>>>>
>>>>
>>


Re: Testing of Metrics in context of DoFnTester

2017-05-09 Thread Michael Luckey
Hi Pablo,

thanks for your help! We certainly could change our testing code and
involve execution of a pipeline during tests.

But currently we are leveraging DoFnTester, i.e. scoping our tests to the
DoFn only, which means, there is neither a pipeline nor a pipeline result
involved, which i could call upon.

It might be a bad idea trying to test counters on this basis, but as it was
supported previously i thought we might have overlooked an API for
accessing these metrics somehow within DoFnTesting. Not sure, wether it
makes sense for the DoFnTester to somehow provide Metrics-Support to enable
this kind of tests. I certainly do not like the idea to much starting to do
some mocking of the metrics api within my test implementation.

Regards,

michel


On Wed, May 10, 2017 at 1:10 AM, Pablo Estrada <pabl...@google.com> wrote:

> Hi Michael,
> For the Metrics API, the way to programatically query the value of a
> metric is by using the MetricResults.queryMetrics method. You get the
> MetricResults object from the PipelineResult object, and query it like so:
>
> PipelineResult res = p.run();
> MetricQueryResults metricResult = res.metrics().queryMetrics();
>
> The queryMetrics method takes in a MetricsFilter instance.
>
> Not all runners support this operation. For the dataflow runner, PR
> 2896[1] should add it.
>
> Let me know if you need more help with this.
> Best
> -P.
>
> [1] - https://github.com/apache/beam/pull/2896
>
> On Tue, May 9, 2017 at 3:48 PM Michael Luckey <adude3...@gmail.com> wrote:
>
>> Hi,
>>
>> currently we are evaluating a migration from 0.6.0 to current. We
>> encountered the following problem, which we currently not sure, how to best
>> solve.
>>
>> Say we have a DoFn, which is using Aggregators, e.g.
>>
>> ctr = createAggregator("someCounter", Sum.ofLongs());
>>
>>  We were testing them with DoFn-Tester like
>>
>> final DoFnTester<Record, Record> doFnTester =
>> DoFnTester.of(fixture);
>> doFnTester.processElement(input);
>>
>>  assertThat(doFnTester.getAggregatorValue(fixture.ctr).longValue(),
>> is(1L));
>>
>> As aggregators are removed now from the codebase, we are considering
>> using Metrics instead. But we did not find an equivalent to the
>> getAggregatorValue method on DoFnTester.
>>
>> Any suggestion how we could keep our counters tested within a unit test
>> based on DoFnTester? Or do we have to find a completely different solution?
>> Are we doing something completely wrong trying to test correct workings of
>> our counters with this approach?
>>
>> Regards,
>>
>> michel
>>
>>
>>


Testing of Metrics in context of DoFnTester

2017-05-09 Thread Michael Luckey
Hi,

currently we are evaluating a migration from 0.6.0 to current. We
encountered the following problem, which we currently not sure, how to best
solve.

Say we have a DoFn, which is using Aggregators, e.g.

ctr = createAggregator("someCounter", Sum.ofLongs());

 We were testing them with DoFn-Tester like

final DoFnTester doFnTester =
DoFnTester.of(fixture);
doFnTester.processElement(input);

 assertThat(doFnTester.getAggregatorValue(fixture.ctr).longValue(),
is(1L));

As aggregators are removed now from the codebase, we are considering using
Metrics instead. But we did not find an equivalent to the
getAggregatorValue method on DoFnTester.

Any suggestion how we could keep our counters tested within a unit test
based on DoFnTester? Or do we have to find a completely different solution?
Are we doing something completely wrong trying to test correct workings of
our counters with this approach?

Regards,

michel


Slack invite

2017-05-05 Thread Michael Luckey
Would you please invite me to the slack group also?

Cheers,

michel


Re: Recommendations on reading/writing avro to hdfs

2017-03-12 Thread Michael Luckey
Hi Davor,

thanks for your reply. And do not worry, we did know in advance, that our
solution would be temporary only.

So for the time being, we go for the HDFSFileSource/Sink approach and look
forward for the final solution as soon as HDFSFileSystem will be ready.

Thanks again for your help!

On Tue, Mar 7, 2017 at 4:39 AM, Davor Bonaci <da...@apache.org> wrote:

> Hi Michael,
> Sorry about the inconvenience here; AvroWrapperCoder is indeed removed
> recently from Hadoop/HDFS IO.
>
> I think the best approach would be to use HDFSFileSource; this is the only
> approach I can recommend today.
>
> Going forward, we are working on being able to read Avro files via AvroIO,
> regardless which file system the files may be stored on. So, you'd do
> something like AvroIO.Read.from("hdfs://..."), just as you can today do
> AvroIO.Read.from("gs://...").
>
> Hope this helps!
>
> Davor
>
> On Tue, Feb 28, 2017 at 4:24 PM, Michael Luckey <adude3...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> we are currently using beam over spark, reading and writing avro files to
>> hdfs.
>>
>> Until now we use HDFSFileSource for reading and HadoopIO for writing,
>> essentially reading and writing PCollection<AvroKey>
>>
>> With the changes introduced by https://issues.apache.org/jira
>> /browse/BEAM-1497 this seems to be not directly supported anymore by
>> beam, as the required AvroWrapperCoder is deleted.
>>
>> So as we have to change our code anyway, we are wondering, what would be
>> the recommended approach to read/write avro files from/to hdfs with beam on
>> spark.
>>
>> - use the new implementation of HDFSFileSource/HDFSFileSink
>> - use spark provided HadoopIO (and probably reimplement AvroWrapperCoder
>> by ourself?)
>>
>> What ware the trade offs here, possibly also considering already planned
>> changes on IO? Do we have advantages using the spark HadoopIO as our
>> underlying engine is currently spark, or will this eventually be deprecated
>> and exists only for ‘historical’ reasons?
>>
>> Any thoughts and advice here?
>>
>> Regards,
>>
>> michel
>>
>
>


Recommendations on reading/writing avro to hdfs

2017-02-28 Thread Michael Luckey
Hi all,

we are currently using beam over spark, reading and writing avro files to
hdfs.

Until now we use HDFSFileSource for reading and HadoopIO for writing,
essentially reading and writing PCollection

With the changes introduced by
https://issues.apache.org/jira/browse/BEAM-1497 this seems to be not
directly supported anymore by beam, as the required AvroWrapperCoder is
deleted.

So as we have to change our code anyway, we are wondering, what would be
the recommended approach to read/write avro files from/to hdfs with beam on
spark.

- use the new implementation of HDFSFileSource/HDFSFileSink
- use spark provided HadoopIO (and probably reimplement AvroWrapperCoder by
ourself?)

What ware the trade offs here, possibly also considering already planned
changes on IO? Do we have advantages using the spark HadoopIO as our
underlying engine is currently spark, or will this eventually be deprecated
and exists only for ‘historical’ reasons?

Any thoughts and advice here?

Regards,

michel