Re: Apache Beam Newsletter - August 2018

2018-08-22 Thread Etienne Chauchot
Hi Rose, 
I know the newsletter has already been sent, but may I add some of my ongoing 
subjects:
What's been done:- CI improvement: for each new commit on master Nexmark suite 
is run in both batch and streaming mode
in spark, flink, dataflow (thanks to Andrew) and dashboards graphs are produced 
to track functional and performance
regressions.
For talks, I guess only talks that already took place are included, not the 
ones scheduled for the ApacheCon in
September right ?
Etienne




Le vendredi 10 août 2018 à 12:37 -0700, Rose Nguyen a écrit :
> August 2018 | Newsletter
> What’s been doneApache Beam 2.6.0 Release
> The Apache Beam team is pleased to announce the release of 2.6.0 version! 
> This is the second release under the new
> build system, and the process has kept improving.You can download the release 
> here and read the release notes for more
> details.
> 
> Beam Summit London 2018 (by: Matthias Baetens, Gris Cuevas, Viktor Kotai)
> Approval from the Apache Software Foundation is underway. We are currently 
> finding a venue  and sponsors. We’ll send
> the call for participation soon to curate the agenda.If you’re interested in 
> participating in the organization of the
> event, reach out to the organizers.Dates TBD be we are considering the first 
> or last days of October.
> Support for Bounded SDF in all runners (by: Eugene Kirpichov)
> Beam introduced recently a new type of DoFn called SplittableDoFn (SDF) to 
> enable richer modularity in its IO
> connectors. Support for SDF in bounded (batch) connectors was added for all 
> runners. Apache Kudu IO (by: Tim
> Robertson)
> A new IO connector for the Apache Kudu data store was added recently.See 
> BEAM-2661 for more details on it.
> IO improvements (by: Ismaël Mejía)
> HBaseIO added a new transform based on SDF called readAll.See BEAM-4020 for 
> more details on it.
> 
> 
> What we’re working on...Interactive Runner for Beam (by: Harsh Vardhan, Sindy 
> Li, Chamikara Jayalath, Anand Iyer,
> Robert Bradshaw)
> Notebook-based interactive processing of Beam pipelines.This is now ready to 
> try out in Jupyter Notebook for
> BeamPython pipelines over DirectRunner!See the design doc for more details 
> and watch a demo here.Thoughts, comments
> and discussions welcome :)
> Python 3 Support (by, in alphabetical order: Ahmet Altay,  Robert Bradshaw, 
> Charles Chen, Matthias Feys, Holden Karau,
> Sergei Lebedev, Robbe Sneyders, Valentyn Tymofieiev)
> Major progress has been made on making Beam Python codebase 
> Python3-compatible through futurization.Read for more
> details in the proposal.
> 
> New IO connectors (by: John Rudolf Lewis, Jacob Marble)
> Amazon Simple Queue Service (SQS) is in review.Amazon Redshift is in 
> progress.Portable Runners (by: Ankur Goenka,
> Eugene Kirpichov, Ben Sidhom, Axel Magnuson, Thomas Weise, Ryan Williams , 
> Robert Bradshaw, Daniel Oliveira, Holden
> Karau)
> Good progress on Portable Flink Runner and many of the ValidatesRunner tests 
> are passing now.Portable Flink Runner can
> now execute batch WordCount in Java, Python and Go.Many enhancements and bug 
> fixes in Portable Reference Runner.See
> Jira https://issues.apache.org/jira/browse/BEAM-2889 for more details on  
> progress. Dependencies (by: Yifan Zou,
> Chamikara Jayalath)
> We added a dependencies guide for Beam and tooling to automatically create 
> JIRAs for significantly outdated
> dependencies. We are working on upgrading existing dependencies.See the Beam 
> dependencies guide for more details.
> 
> 
> 
> New MembersNew Contributors
> Rose Nguyen, Seattle, WA, USABeam docs contributor Working to improve docs 
> usability Connell O'Callaghan, Seattle, WA,
> USAInterested in growing the communityHelping with community triages and 
> managing issues
> 
> 
> 
> Talks & MeetupsStream Processing Meetup@LinkedIn  7/19/18
> Xinyu Liu gave a talk on building a Samza Runner for Beam“Beam meet up, 
> Samza!” and see it here. 
> Large Scale Landuse Classification of Satellite Images, Berlin 
> Buzzwords@Berlin 6/11/18
> Suneel Marthi and Jose Luis Contreras gave a talk on using streaming 
> pipelines built on Apache Flink for model
> training and inference. They leveraged convolutional Neural Networks (CNNs) 
> built with Apache MXNet to train Deep
> Learning models for land use classification. Read about it and watch it here.
> Big Data in Production Meetup@Cambridge, MA 6/28/18
> Robert Bradshaw and Eila Arich-Landkof gave a talk about Apache Beam and 
> machine learning. Event details here and
> watch their talks here.
> 
> ResourcesAwesome Beam (by: Pablo Estrada)
> Inspired by efforts in Awesome Flink and  Awesome Hadoop, I’ve created the 
> Awesome Beam repo to aggregate interesting
> Beam things.
> 
> 
> Until Next Time!
> This edition was curated by our community of contributors, committers and 
> PMCs. It contains work done in June and July
> of 2018 and ongoing efforts. We hope to provide visibility to what's going on 
> in the community, s

Re: Beam Summit London 2018

2018-08-22 Thread Matthias Baetens
Hi Pascal, Javier,

Thanks for your interest in submitting a talk!
@Pascal: I am happy to check for you if what you have in mind is already
being covered in another talk submitted, and happy to help you find a good
topic to talk about :)

@Javier: it will depend on the type and subject of the session. If you are
planning to do an advanced workshop, it makes sense to allocate more time
(>1h). If it is a business case also covering architecture and technical, I
think longer sessions should be possible as well (~1h including Q&A). If
your talk turns out to have to much content, we can always look at ways to
split it up into 2 stand-alone sessions if that would make sense.

Best,
Matthias

On Tue, 21 Aug 2018 at 19:45 javier ramirez 
wrote:

> Hi,
>
> What'd be the duration of the talks? So I can scope the contents of my
> proposal.
>
> Looking forward to the summit!
>
> J
>
> On Tue, 21 Aug 2018, 14:47 Pascal Gula,  wrote:
>
>> Hi Matthias,
>> we (Peat / Plantix) might be interested by submitting a talk and I would
>> like to know if we can get access to the list of already submitted "Title"
>> to avoid submitting on similar topic!
>> Cheers,
>> Pascal
>>
>> On Tue, Aug 21, 2018 at 1:59 PM, Matthias Baetens <
>> baetensmatth...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> We are happy to invite you to the first Beam Summit in London.
>>>
>>> The summit will be held in London at Level39
>>>  on *October 1 and 2.*
>>> You can register to attend for free on the Eventbrite page
>>> 
>>> .
>>>
>>> If you are interested in talking, please check our CfP form
>>>  and submit a talk!
>>>
>>> If you or your company is interested in helping out or sponsoring the
>>> summit (to keep it free), you can check out the sponsor booklet
>>> 
>>> .
>>>
>>> We will soon launch a blogpost with more details and announce the agenda
>>> closer to date.
>>>
>>> Thanks to everyone who helped make this happen, looking forward to
>>> welcoming you all in London!
>>>
>>> The Events & Meetups Group
>>>
>>> --
>>>
>>>
>>
>>
>>
>> --
>>
>> Pascal Gula
>> Senior Data Engineer / Scientist+49 (0)176 34232684 
>> <+49%20176%2034232684>www.plantix.net 
>>  PEAT GmbHKastanienallee 4
>> 10435 Berlin // Germany 
>> 
>>  Download 
>> the App! 
>>
>> --


INFO:root:Executing Error when executing a pipeline on dataflow

2018-08-22 Thread OrielResearch Eila Arich-Landkof
Hello all,

I am running a pipeline that used to be executed on dataflow with no
issues. I am using the datalab environment. See below the error. To my
understanding happening before the pipeline code is being is being executed.
Any idea what went wrong?

Thanks,
Eila


Executing the pipeline:

*p.run().wait_until_finish()*

The following error is being fired:

INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python',
'setup.py', 'sdist', '--dist-dir', '/tmp/tmp_B0gnK']
INFO:root:Starting GCS upload to
gs://archs4/staging/label-archs4-annotation-15.1534948236.075799/workflow.tar.gz...
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Completed GCS upload to
gs://archs4/staging/label-archs4-annotation-15.1534948236.075799/workflow.tar.gz
INFO:root:Staging the SDK tarball from PyPI to
gs://archs4/staging/label-archs4-annotation-15.1534948236.075799/dataflow_python_sdk.tar
INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python',
'-m', 'pip', 'install', '--download', '/tmp/tmp_B0gnK',
'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']

CalledProcessErrorTraceback (most recent call
last) in ()> 1
p.run().wait_until_finish()
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc
in run(self, test_runner_api)174   finally:175
shutil.rmtree(tmpdir)--> 176 return self.runner.run(self)177
  178   def __enter__(self):
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
in run(self, pipeline)250 # Create the job251 result =
DataflowPipelineResult(--> 252
self.dataflow_client.create_job(self.job), self)253 254
self._metrics = DataflowMetrics(self.dataflow_client, result,
self.job)
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/retry.pyc
in wrapper(*args, **kwargs)166   while True:167
try:--> 168   return fun(*args, **kwargs)169
except Exception as exn:  # pylint: disable=broad-except170
   if not retry_filter(exn):
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
in create_job(self, job)423   def create_job(self, job):424
 """Creates job description. May stage and/or submit for remote
execution."""--> 425 self.create_job_description(job)426
427 # Stage and submit the job when necessary
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
in create_job_description(self, job)446 """Creates a job
described by the workflow proto."""447 resources =
dependency.stage_job_resources(--> 448 job.options,
file_copy=self._gcs_file_copy)449 job.proto.environment =
Environment(450 packages=resources, options=job.options,
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in stage_job_resources(options, file_copy, build_setup_args, temp_dir,
populate_requirements_cache)377   else:378
sdk_remote_location = setup_options.sdk_location--> 379
_stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
380   resources.append(names.DATAFLOW_SDK_TARBALL_FILE)381
else:
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
   462   elif sdk_remote_location == 'pypi':463
logging.info('Staging the SDK tarball from PyPI to %s',
staged_path)--> 464
_dependency_file_copy(_download_pypi_sdk_package(temp_dir),
staged_path)465   else:466 raise RuntimeError(
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in _download_pypi_sdk_package(temp_dir)525   '--no-binary',
':all:', '--no-deps']526   logging.info('Executing command: %s',
cmd_args)--> 527   processes.check_call(cmd_args)528
zip_expected = os.path.join(529   temp_dir, '%s-%s.zip' %
(package_name, version))
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/processes.pyc
in check_call(*args, **kwargs) 42   if force_shell: 43
kwargs['shell'] = True---> 44   return subprocess.check_call(*args,
**kwargs) 45  46
/usr/local/envs/py2env/lib/python2.7/subprocess.pyc in
check_call(*popenargs, **kwargs)188 if cmd is None:189
cmd = popenargs[0]--> 190 raise
CalledProcessError(retcode, cmd)191 return 0192
CalledProcessError: Command '['/usr/local/envs/py2env/bin/python',
'-m', 'pip', 'install', '--download', '/tmp/tmp_B0gnK',
'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']'
returned non-zero exit status 2



-- 
Eila
www.orielresearch.org
https://www.meetu p.co

m/Deep-Learning-In-Production/


Re: Controlling Kafka Checkpoint Persistence

2018-08-22 Thread Micah Whitacre
> Could you describe your durability requirements a bit more?

The requirement is that we need "at most once" processing of the data.  So
I'm perfectly happy retrying the processing.  I'm more concerned about data
loss/skipping data in the event of processing failures, pipeline operations
(starting/restarting), failures in the runner underlying infrastructure,
and the fun use cases when they might happen at the same time (e.g.
underlying infrastructure problems that cause processing failures and need
us to restart the pipeline :)).

Are there any good resources talking about the differences at the
boundaries or the assumed guarantees?

On Tue, Aug 21, 2018 at 5:05 PM, Raghu Angadi  wrote:

>
> On Tue, Aug 21, 2018 at 2:49 PM Micah Whitacre 
> wrote:
>
>> > Is there a reason you can't trust the runner to be durable storage for
>> inprocess work?
>>
>> That's a fair question.  Are there any good resources documenting the
>> durability/stability of the different runners?  I assume there are some
>> stability requirements regarding its handling of "bundles" but it would be
>> nice to have that info available.  One of the reasons we are targeting the
>> Direct runner is to let us work with the project and let us temporarily
>> delay picking a runner.  Durability seems like another important aspect to
>> evaluate.
>>
>
> Could you describe your durability requirements a bit more?
> All the major runners comparable durability guarantees on processing
> within a running pipeline (these are required for Beam model).  The
> differences arise at the boundaries: what happens when you stop the
> pipeline, can the pipeline be updated with new code with the old state,
> etc.
>
> An often confusing area is about side effects (like committing Kafka
> offsets in your case).. the users always have to assume that processing
> might be retried (even if it rarely occurs).
>
>
>>
>> On Tue, Aug 21, 2018 at 4:24 PM, Raghu Angadi  wrote:
>>
>>> On Tue, Aug 21, 2018 at 2:04 PM Lukasz Cwik  wrote:
>>>
 Is there a reason you can't trust the runner to be durable storage for
 inprocess work?

 I can understand that the DirectRunner only stores things in memory but
 other runners have stronger durability guarantees.

>>>
>>> I think the requirement is about producing a side effect (committing
>>> offsets to Kafka) after some processing completes in the pipeline. Wait()
>>> transform helps with that. The the user still has to commit the offsets
>>> explicitly and can't get similar functionality in KafkaIO.
>>>
>>>
 On Tue, Aug 21, 2018 at 9:58 AM Raghu Angadi 
 wrote:

> I think by 'KafkaUnboundedSource checkpointing' you mean enabling
> 'commitOffsetsInFinalize()' on KafkaIO source.
> It is better option than enable.auto.commit, but does not exactly do
> what you want in this moment. It is invoked after the first stage ('Simple
> Transformation' in your case). This is certainly true for Dataflow and I
> think is also the case for DirectRunner.
>
> I don't see way to leverage built-in checkpoint for consistency
> externally. You would have to manually commit offsets.
>
> On Tue, Aug 21, 2018 at 8:55 AM Micah Whitacre 
> wrote:
>
>> I'm starting with a very simple pipeline that will read from Kafka ->
>> Simple Transformation -> GroupByKey -> Persist the data.  We are also
>> applying some simple windowing/triggering that will persist the data 
>> after
>> every 100 elements or every 60 seconds to balance slow trickles of data 
>> as
>> well as not storing too much in memory.  For now I'm just running with 
>> the
>> DirectRunner since this is just a small processing problem.
>>
>> With the potential for failure during the persisting of the data, we
>> want to ensure that the Kafka offsets are not updated until we have
>> successfully persisted the data.  Looking at KafkaIO it seems like our 
>> two
>> options for persisting offsets are:
>> * Kafka's enable.auto.commit
>> * KafkaUnboundedSource checkpointing.
>>
>> The first option would commit prematurely before we could guarantee
>> the data was persisted.  I can't unfortunately find many details about 
>> the
>> checkpointing so I was wondering if there was a way to configure it or 
>> tune
>> it more appropriately.
>>
>> Specifically I'm hoping to understand the flow so I can rely on the
>> built in KafkaIO functionality without having to write our own offset
>> management.  Or is it more common to write your own?
>>
>> Thanks,
>> Micah
>>
>
>>


Re: INFO:root:Executing Error when executing a pipeline on dataflow

2018-08-22 Thread OrielResearch Eila Arich-Landkof
I tried a simple pipeline which is runner perfectly on local runner and the
same issue on dataflow. see below. Is there anything at the environment
that need to be updated that I am not aware of?

Many thanks for any reference.
Eila

import  apache_beam  as  beam
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'PROJECT-ID'
google_cloud_options.job_name = 'try-debug'
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
#'gs://archs4/staging'
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL #'gs://archs4/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

p1 = beam.Pipeline(options=options)

(p1 | 'read' >>
beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
| 'write' >> beam.io.WriteToText('gs://bucket/test.txt', num_shards=1)
 )

p1.run().wait_until_finish()

will fire the following error:

CalledProcessErrorTraceback (most recent call last)
 in ()
  5  )
  6
> 7 p1.run().wait_until_finish()

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc
in run(self, test_runner_api)
174   finally:
175 shutil.rmtree(tmpdir)
--> 176 return self.runner.run(self)
177
178   def __enter__(self):

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
in run(self, pipeline)
250 # Create the job
251 result = DataflowPipelineResult(
--> 252 self.dataflow_client.create_job(self.job), self)
253
254 self._metrics = DataflowMetrics(self.dataflow_client,
result, self.job)

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/retry.pyc
in wrapper(*args, **kwargs)
166   while True:
167 try:
--> 168   return fun(*args, **kwargs)
169 except Exception as exn:  # pylint: disable=broad-except
170   if not retry_filter(exn):

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
in create_job(self, job)
423   def create_job(self, job):
424 """Creates job description. May stage and/or submit for
remote execution."""
--> 425 self.create_job_description(job)
426
427 # Stage and submit the job when necessary

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
in create_job_description(self, job)
446 """Creates a job described by the workflow proto."""
447 resources = dependency.stage_job_resources(
--> 448 job.options, file_copy=self._gcs_file_copy)
449 job.proto.environment = Environment(
450 packages=resources, options=job.options,

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in stage_job_resources(options, file_copy, build_setup_args, temp_dir,
populate_requirements_cache)
377   else:
378 sdk_remote_location = setup_options.sdk_location
--> 379   _stage_beam_sdk_tarball(sdk_remote_location,
staged_path, temp_dir)
380   resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
381 else:

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
462   elif sdk_remote_location == 'pypi':
463 logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
--> 464 _dependency_file_copy(_download_pypi_sdk_package(temp_dir),
staged_path)
465   else:
466 raise RuntimeError(

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in _download_pypi_sdk_package(temp_dir)
525   '--no-binary', ':all:', '--no-deps']
526   logging.info('Executing command: %s', cmd_args)
--> 527   processes.check_call(cmd_args)
528   zip_expected = os.path.join(
529   temp_dir, '%s-%s.zip' % (package_name, version))

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/processes.pyc
in check_call(*args, **kwargs)
 42   if force_shell:
 43 kwargs['shell'] = True
---> 44   return subprocess.check_call(*args, **kwargs)
 45
 46

/usr/local/envs/py2env/lib/python2.7/subprocess.pyc in
check_call(*popenargs, **kwargs)
188 if cmd is None:
189 cmd = popenargs[0]
--> 190 raise CalledProcessError(retcode, cmd)
191 return 0
192

CalledProcessError: Command '['/usr/local/envs/py2env/bin/python',
'-m', 'pip', 'install', '--download', '/tmp/tmpyyiizo',
'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']'
returned non-zero exit status 2



On Wed, Aug 22, 2018 at 10:39 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello all,
>
> I am running a pipeline that used to be executed on dataflow with no
> issues. I am using the datalab environment. S

Re: INFO:root:Executing Error when executing a pipeline on dataflow

2018-08-22 Thread OrielResearch Eila Arich-Landkof
The issue was with the pip version. --download was deprecated. I dont know
where this need to be mentioned / fixed.
running
pip install pip==9.0.3

solved the issue.

Thanks,
eila

On Wed, Aug 22, 2018 at 11:20 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> I tried a simple pipeline which is runner perfectly on local runner and
> the same issue on dataflow. see below. Is there anything at the environment
> that need to be updated that I am not aware of?
>
> Many thanks for any reference.
> Eila
>
> import  apache_beam  as  beam
> options = PipelineOptions()
> google_cloud_options = options.view_as(GoogleCloudOptions)
> google_cloud_options.project = 'PROJECT-ID'
> google_cloud_options.job_name = 'try-debug'
> google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
> #'gs://archs4/staging'
> google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL #'gs://archs4/temp'
> options.view_as(StandardOptions).runner = 'DataflowRunner'
>
> p1 = beam.Pipeline(options=options)
>
> (p1 | 'read' >> 
> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
> | 'write' >> beam.io.WriteToText('gs://bucket/test.txt', num_shards=1)
>  )
>
> p1.run().wait_until_finish()
>
> will fire the following error:
>
> CalledProcessErrorTraceback (most recent call last)
>  in ()
>   5  )
>   6
> > 7 p1.run().wait_until_finish()
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc 
> in run(self, test_runner_api)
> 174   finally:
> 175 shutil.rmtree(tmpdir)
> --> 176 return self.runner.run(self)
> 177
> 178   def __enter__(self):
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
>  in run(self, pipeline)
> 250 # Create the job
> 251 result = DataflowPipelineResult(
> --> 252 self.dataflow_client.create_job(self.job), self)
> 253
> 254 self._metrics = DataflowMetrics(self.dataflow_client, result, 
> self.job)
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/retry.pyc
>  in wrapper(*args, **kwargs)
> 166   while True:
> 167 try:
> --> 168   return fun(*args, **kwargs)
> 169 except Exception as exn:  # pylint: disable=broad-except
> 170   if not retry_filter(exn):
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
>  in create_job(self, job)
> 423   def create_job(self, job):
> 424 """Creates job description. May stage and/or submit for remote 
> execution."""
> --> 425 self.create_job_description(job)
> 426
> 427 # Stage and submit the job when necessary
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
>  in create_job_description(self, job)
> 446 """Creates a job described by the workflow proto."""
> 447 resources = dependency.stage_job_resources(
> --> 448 job.options, file_copy=self._gcs_file_copy)
> 449 job.proto.environment = Environment(
> 450 packages=resources, options=job.options,
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
>  in stage_job_resources(options, file_copy, build_setup_args, temp_dir, 
> populate_requirements_cache)
> 377   else:
> 378 sdk_remote_location = setup_options.sdk_location
> --> 379   _stage_beam_sdk_tarball(sdk_remote_location, staged_path, 
> temp_dir)
> 380   resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
> 381 else:
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
>  in _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
> 462   elif sdk_remote_location == 'pypi':
> 463 logging.info('Staging the SDK tarball from PyPI to %s', 
> staged_path)
> --> 464 _dependency_file_copy(_download_pypi_sdk_package(temp_dir), 
> staged_path)
> 465   else:
> 466 raise RuntimeError(
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
>  in _download_pypi_sdk_package(temp_dir)
> 525   '--no-binary', ':all:', '--no-deps']
> 526   logging.info('Executing command: %s', cmd_args)
> --> 527   processes.check_call(cmd_args)
> 528   zip_expected = os.path.join(
> 529   temp_dir, '%s-%s.zip' % (package_name, version))
>
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/processes.pyc
>  in check_call(*args, **kwargs)
>  42   if force_shell:
>  43 kwargs['shell'] = True
> ---> 44   return subprocess.check_call(*args, **kwargs)
>  45
>  46
>
> /usr/local/envs/py2env/lib/python2.7/subprocess.pyc in check_call(*popenargs, 
> **kwargs)
> 188 if cmd is None:
> 189 cmd = popenargs[0]
> --> 190 raise CalledPr

EOS with KafkaIO Writes on Flink

2018-08-22 Thread Abdul Qadeer
Hi!

I came across this comment for KafkaIO.Write.withEOS in Beam 2.6.0:

"Flink runner is
* one of the runners whose checkpoint semantics are not compatible with current
* implementation (hope to provide a solution in near future)"

I would like to know if the Java docs are up to date and this support is
still absent? If yes, then is there a way to get EOS for end to end
applications written with Beam and Flink runner using Kafka sources/sinks?
Is there a contribution possible for this support?


Re: preserve order of records while writing in a file

2018-08-22 Thread Chamikara Jayalath
+1 to what Ankur said. Beam is still good application for your use-case
since it looks like you can parallelize processing of individual files even
though you cannot parallelize processing a single file. What you can do
with Beam is limited though due to your exact ordering requirement. Beam
does not guarantee ordering of elements across steps so you'll have to
perform reading data from files, processing, and writing in a single fused
step (You cannot have GBKs between these steps for example). To read file
names you can use FileIO.readMatches() which should be followed by a step
that performs rest of the work.

Thanks,
Cham

On Tue, Aug 21, 2018 at 6:48 PM Ankur Goenka  wrote:

> In case of multiple files, you can use Dataflow to parallelize processing
> to individual files. However, as mentioned earlier, records within in a
> single file is not worth parallelizing in this case.
>
> Your pipeline can start with a fixed set of file names followed by GroupBy
> (to shuffle the file names) and then you should process complete file in
> your ParDo based on the file name that you get as element.
> You should still write the output directly (using Beam File System) as
> write ordering is not guaranteed.
>
> On Tue, Aug 21, 2018 at 6:07 PM asharma...@gmail.com 
> wrote:
>
>>
>>
>> On 2018/08/21 16:20:13, Lukasz Cwik  wrote:
>> > I would agree with Eugene. A simple application that does this is
>> probably
>> > what your looking for.
>> >
>> > There are ways to make this work with parallel processing systems but
>> its
>> > quite a hassle and only worthwhile if your computation is very expensive
>> > and want the additional computational power of multiple CPU cores. For
>> > example, in a parallel processing you could read the records from the
>> file
>> > and remember the file offset / line number of each record. You could
>> then
>> > group them under a single key and use the sorting extension to sort
>> using
>> > the file offset / line number and then write out all the sorted records
>> out
>> > to a single file. Note that this will likely be a lot slower then a
>> simple
>> > program.
>> >
>> > On Tue, Aug 21, 2018 at 8:02 AM Eugene Kirpichov 
>> > wrote:
>> >
>> > > It sounds like you want to sequentially read a file, sequentially
>> process
>> > > the records and sequentially write them. The best way to do this is
>> likely
>> > > without using Beam, just write some Java or Python code using
>> standard file
>> > > APIs (use Beam's FileSystem APIs if you need to access data on a
>> non-local
>> > > filesystem).
>> > >
>> > > On Tue, Aug 21, 2018 at 7:11 AM asharma...@gmail.com <
>> asharma...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi
>> > >>
>> > >> I have to process a big file and call several Pardo's to do some
>> > >> transformations.  Records in file dont have any unique key.
>> > >>
>> > >> Lets say file 'testfile' has 1 million records.
>> > >>
>> > >> After processing , I want to generate only one output file same as my
>> > >> input 'testfile' and also i have a requirement to write those 1
>> million
>> > >> records in same order (after applying some Pardo's)
>> > >>
>> > >> What is best way to do it
>> > >>
>> > >> Thanks
>> > >> Aniruddh
>> > >>
>> > >>
>> > >>
>> > >>
>> >
>>
>> Thanks Eugene and Lukasz for revert. One further query please. Use case
>> is to process 50,000 files together multiple times in a day , do some
>> processing on records (which includes DLP calls) , then to generate same
>> set of 50,000 output files and each output file containing same number of
>> records in same order as of its input file (replica of input file's records
>> preserving order). Is it an ideal use case for Dataflow . If yes , then is
>> there any example to lookup for same scenario and if not then what is
>> recommended way. Thanks a lot
>>
>


Re: Controlling Kafka Checkpoint Persistence

2018-08-22 Thread Micah Whitacre
Sorry I mistyped, we need "at least once" processing. 

On Wed, Aug 22, 2018 at 9:39 AM, Micah Whitacre 
wrote:

> > Could you describe your durability requirements a bit more?
>
> The requirement is that we need "at most once" processing of the data.  So
> I'm perfectly happy retrying the processing.  I'm more concerned about data
> loss/skipping data in the event of processing failures, pipeline operations
> (starting/restarting), failures in the runner underlying infrastructure,
> and the fun use cases when they might happen at the same time (e.g.
> underlying infrastructure problems that cause processing failures and need
> us to restart the pipeline :)).
>
> Are there any good resources talking about the differences at the
> boundaries or the assumed guarantees?
>
> On Tue, Aug 21, 2018 at 5:05 PM, Raghu Angadi  wrote:
>
>>
>> On Tue, Aug 21, 2018 at 2:49 PM Micah Whitacre 
>> wrote:
>>
>>> > Is there a reason you can't trust the runner to be durable storage
>>> for inprocess work?
>>>
>>> That's a fair question.  Are there any good resources documenting the
>>> durability/stability of the different runners?  I assume there are some
>>> stability requirements regarding its handling of "bundles" but it would be
>>> nice to have that info available.  One of the reasons we are targeting the
>>> Direct runner is to let us work with the project and let us temporarily
>>> delay picking a runner.  Durability seems like another important aspect to
>>> evaluate.
>>>
>>
>> Could you describe your durability requirements a bit more?
>> All the major runners comparable durability guarantees on processing
>> within a running pipeline (these are required for Beam model).  The
>> differences arise at the boundaries: what happens when you stop the
>> pipeline, can the pipeline be updated with new code with the old state,
>> etc.
>>
>> An often confusing area is about side effects (like committing Kafka
>> offsets in your case).. the users always have to assume that processing
>> might be retried (even if it rarely occurs).
>>
>>
>>>
>>> On Tue, Aug 21, 2018 at 4:24 PM, Raghu Angadi 
>>> wrote:
>>>
 On Tue, Aug 21, 2018 at 2:04 PM Lukasz Cwik  wrote:

> Is there a reason you can't trust the runner to be durable storage for
> inprocess work?
>
> I can understand that the DirectRunner only stores things in memory
> but other runners have stronger durability guarantees.
>

 I think the requirement is about producing a side effect (committing
 offsets to Kafka) after some processing completes in the pipeline. Wait()
 transform helps with that. The the user still has to commit the offsets
 explicitly and can't get similar functionality in KafkaIO.


> On Tue, Aug 21, 2018 at 9:58 AM Raghu Angadi 
> wrote:
>
>> I think by 'KafkaUnboundedSource checkpointing' you mean enabling
>> 'commitOffsetsInFinalize()' on KafkaIO source.
>> It is better option than enable.auto.commit, but does not exactly do
>> what you want in this moment. It is invoked after the first stage 
>> ('Simple
>> Transformation' in your case). This is certainly true for Dataflow and I
>> think is also the case for DirectRunner.
>>
>> I don't see way to leverage built-in checkpoint for consistency
>> externally. You would have to manually commit offsets.
>>
>> On Tue, Aug 21, 2018 at 8:55 AM Micah Whitacre 
>> wrote:
>>
>>> I'm starting with a very simple pipeline that will read from Kafka
>>> -> Simple Transformation -> GroupByKey -> Persist the data.  We are also
>>> applying some simple windowing/triggering that will persist the data 
>>> after
>>> every 100 elements or every 60 seconds to balance slow trickles of data 
>>> as
>>> well as not storing too much in memory.  For now I'm just running with 
>>> the
>>> DirectRunner since this is just a small processing problem.
>>>
>>> With the potential for failure during the persisting of the data, we
>>> want to ensure that the Kafka offsets are not updated until we have
>>> successfully persisted the data.  Looking at KafkaIO it seems like our 
>>> two
>>> options for persisting offsets are:
>>> * Kafka's enable.auto.commit
>>> * KafkaUnboundedSource checkpointing.
>>>
>>> The first option would commit prematurely before we could guarantee
>>> the data was persisted.  I can't unfortunately find many details about 
>>> the
>>> checkpointing so I was wondering if there was a way to configure it or 
>>> tune
>>> it more appropriately.
>>>
>>> Specifically I'm hoping to understand the flow so I can rely on the
>>> built in KafkaIO functionality without having to write our own offset
>>> management.  Or is it more common to write your own?
>>>
>>> Thanks,
>>> Micah
>>>
>>
>>>
>