Re: Try Beam Katas Today

2020-05-12 Thread Pablo Estrada
Sharing Damon's email with the user@ list as well. Thanks Damon!

On Tue, May 12, 2020 at 9:02 PM Damon Douglas 
wrote:

> Hello Everyone,
>
> If you don't already know, there are helpful instructional tools for
> learning the Apache Beam SDKs called Beam Katas hosted on
> https://stepik.org.  Similar to traditional Kata
> , they are meant to be repeated as
> practice.  Before practicing the katas myself, I found myself copy/pasting
> code (Please accept my confession 😎 ).  Now I find myself actually
> composing pipelines.  Just like kata forms, you find them becoming part of
> you.  If you are interested, below are listed the current available katas:
>
> 1.  Java - https://stepik.org/course/54530
>
> 2.  Python -  https://stepik.org/course/54532
>
> 3.  Go (in development) - https://stepik.org/course/70387
>
> If you are absolutely brand new to Beam and it scares you like it scared
> me, come talk to me.
>
> Best,
>
> Damon
>


Webinar: Best practices towards a production-ready Beam pipeline

2020-05-12 Thread Aizhamal Nurmamat kyzy
Hi all,

A friendly reminder that the webinar on 'Best practices towards a
production ready pipeline' will start tomorrow at 10:00 PST. You can join
by signing up here: https://learn.xnextcon.com/event/eventdetails/W20051310

If you cannot get into the meeting room on Zoom, you can go to this  Youtube

channel
for the same livestream, but we encourage attendees to join Zoom to be able
to ask speakers questions.

The webinar will be recorded and posted on Beam's YT channel later on Wed,
and all the resources used during the presentation will be shared on this
repo: https://github.com/aijamalnk/beam-learning-month/blob/master/README.md

Thanks,
Aizhamal


Re: Behavior of KafkaIO

2020-05-12 Thread Eleanore Jin
Hi Alex,

Thanks a lot for the suggestion, it seems that with my previous experiment,
I did not pre-ingest enough amount of messages. So it looks like each
partition gets a slice of time to be consumed by the same consumer. And
maybe during partition1's time slice, it already drill down to zero, and
hence the observation.

I tried to ingest more data, and I see all of the partitions are making
progress. I will update if I have more findings.

Thanks a lot!
Eleanore

On Tue, May 12, 2020 at 10:44 AM Alexey Romanenko 
wrote:

> Hi Eleanore,
>
> Interesting topic, thank you for more information. I don’t see that this
> is unexpected behavior for KafkaIO since, as Heejong said before,  it
> relies on implementation of KafkaConsumer that is used in your case.
>
> According to KafkaConsumer Javadoc [1], in most cases it should read
> fairly from different partitions in case if one consumer handles several of
> them:
>
> “Consumption Flow Control
>
> If a consumer is assigned multiple partitions to fetch data from, it will
> try to consume from all of them at the same time, effectively giving these
> partitions the same priority for consumption. However in some cases
> consumers may want to first focus on fetching from some subset of the
> assigned partitions at full speed, and only start fetching other partitions
> when these partitions have few or no data to consume.”
>
> Perhaps, you may want to try to change
> fetch.max.bytes or max.partition.fetch.bytes options and see if it will
> help.
>
>
> [1]
> http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
>
>
>
> On 12 May 2020, at 07:52, Eleanore Jin  wrote:
>
> Hi Chamikara and Lee,
>
> Thanks for the information, I did more experiment on my local laptop.
> (Flink Runner local mode, Job Manager and Task Manager runs in the same JVM)
> setup: input topic 4 partitions
> 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0
> lags, then move to the another partition
> 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and
> move to the rest of the partitions
> 3. with 4 parallelism: KafkaIO read will read 4 partitions together.
>
> In production, we run multiple Flink Task managers, from the consumer lag
> reported, we also see some partitions goes to 0, while other
> partitions remain high lag.
>
> Thanks!
> Eleanore
>
> On Mon, May 11, 2020 at 8:19 PM Heejong Lee  wrote:
>
>> If we assume that there's only one reader, all partitions are assigned to
>> a single KafkaConsumer. I think the order of reading each partition depends
>> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
>> messages.
>>
>> Reference:
>> assigning partitions:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
>> polling records:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
>> creating a record batch:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>>
>> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath 
>> wrote:
>>
>>> The number of partitions assigned to a given split depends on the
>>> desiredNumSplits value provided by the runner.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>>
>>> (This is assuming that you are using Beam Kafka source not a native
>>> Flink override).
>>>
>>> Do you see the same behavior when you increase the number of workers of
>>> your Flink cluster ?
>>>
>>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin 
>>> wrote:
>>>
 Hi community,

 In my pipeline, I am using KafkaIO to read and write. The source topic
 has 4 partitions and pipeline parallelism is 1.

 I noticed from consumer lag metrics, it will consume from 1 partition
 until all the messages from that partition is processed then it will
 consume from another partition.

 Is this the expected behavior?

 Runner is Flink.

 Thanks a lot!
 Eleanore

>>>
>


Re: GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'

2020-05-12 Thread OrielResearch Eila Arich-Landkof
Thank you! added workerOptions (GoogleCloudOption fired an error on worker
machine)

code below for anyone else that might need it:

Apache beam SDK:
!pip show apache-beam

Name: apache-beam
Version: 2.20.0
Summary: Apache Beam SDK for Python
Home-page: https://beam.apache.org
Author: Apache Software Foundation
Author-email: d...@beam.apache.org
License: Apache License, Version 2.0
Location: /usr/local/envs/py3env/lib/python3.5/site-packages
Requires: grpcio, python-dateutil, typing, dill, pyarrow, pydot,
future, hdfs, httplib2, pytz, typing-extensions, fastavro,
avro-python3, crcmod, mock, oauth2client, protobuf, pymongo, numpy
Required-by:


*Python 3 code:*


options = PipelineOptions()
standard_cloud_options = options.view_as(StandardOptions)
standard_cloud_options.runner = RUNNER #'DataflowRunner'
worker_cloud_options = options.view_as(WorkerOptions)
worker_cloud_options.machine_type = 'n1-highcpu-96'
setup_cloud_options = options.view_as(SetupOptions)
setup_cloud_options.setup_file = "./setup.py"
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project_name'
job_rand = ''.join(random.choice('0123456789abcdef') for j in range(4))
google_cloud_options.job_name = 'n1-highcpu-96-'+ job_rand
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
google_cloud_options.region = 'us-central1'

HTH,
Eila



On Tue, May 12, 2020 at 11:43 AM Brian Hulette  wrote:

> Hi Eila,
>
> It looks like you're attempting to set the option on the
> GoogleCloudOptions class directly, I think you want to set it on an
> instance of PipelineOptions that you've viewed as GoogleCloudOptions. Like
> this example from
> https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#configuring-pipelineoptions-for-execution-on-the-cloud-dataflow-service
>
> # Create and set your PipelineOptions.
> options = PipelineOptions(flags=argv)
>
> # For Cloud execution, specify DataflowRunner and set the Cloud Platform
> # project, job name, staging file location, temp file location, and region.
> options.view_as(StandardOptions).runner = 'DataflowRunner'
> google_cloud_options = options.view_as(GoogleCloudOptions)
> google_cloud_options.project = 'my-project-id'
> ...
> # Create the Pipeline with the specified options.
> p = Pipeline(options=options)
>
> Alternatively you should be able to just specify --worker_machine_type at
> the command line if you're parsing the PipelineOptions from sys.argv. Does
> that help?
>
> Brian
>
> On Tue, May 12, 2020 at 8:30 AM OrielResearch Eila Arich-Landkof <
> e...@orielresearch.org> wrote:
>
>> Hello,
>>
>> I am trying to check if the setting of the resources are actually being
>> implemented.
>> What will be the right way to do it.
>> *the code is:*
>> GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'
>>
>> and *the dataflow view is *the following (nothing that reflects
>> the highcpu machine.
>> Please advice
>>
>> Thanks,
>> Eila
>> Resource metrics
>> Current vCPUs
>>
>> 1
>>
>> Total vCPU time
>>
>> 0.07 vCPU hr
>>
>> Current memory
>>
>> 3.75 GB
>>
>> Total memory time
>>
>> 0.264 GB hr
>>
>> Current PD
>>
>> 250 GB
>>
>> Total PD time
>>
>> 17.632 GB hr
>>
>> Current SSD PD
>>
>> 0 B
>>
>> Total SSD PD time
>>
>> 0 GB hr
>>
>>
>> --
>> Eila
>> 
>> Meetup 
>>
>

-- 
Eila

Meetup 


Re: Behavior of KafkaIO

2020-05-12 Thread Alexey Romanenko
Hi Eleanore,

Interesting topic, thank you for more information. I don’t see that this is 
unexpected behavior for KafkaIO since, as Heejong said before,  it relies on 
implementation of KafkaConsumer that is used in your case.

According to KafkaConsumer Javadoc [1], in most cases it should read fairly 
from different partitions in case if one consumer handles several of them: 

“Consumption Flow Control

If a consumer is assigned multiple partitions to fetch data from, it will try 
to consume from all of them at the same time, effectively giving these 
partitions the same priority for consumption. However in some cases consumers 
may want to first focus on fetching from some subset of the assigned partitions 
at full speed, and only start fetching other partitions when these partitions 
have few or no data to consume.”

Perhaps, you may want to try to change fetch.max.bytes or 
max.partition.fetch.bytes options and see if it will help.


[1] 
http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
 





> On 12 May 2020, at 07:52, Eleanore Jin  wrote:
> 
> Hi Chamikara and Lee, 
> 
> Thanks for the information, I did more experiment on my local laptop. (Flink 
> Runner local mode, Job Manager and Task Manager runs in the same JVM)
> setup: input topic 4 partitions
> 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0 
> lags, then move to the another partition
> 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and move 
> to the rest of the partitions
> 3. with 4 parallelism: KafkaIO read will read 4 partitions together.
> 
> In production, we run multiple Flink Task managers, from the consumer lag 
> reported, we also see some partitions goes to 0, while other partitions 
> remain high lag. 
> 
> Thanks!
> Eleanore
> 
> On Mon, May 11, 2020 at 8:19 PM Heejong Lee  > wrote:
> If we assume that there's only one reader, all partitions are assigned to a 
> single KafkaConsumer. I think the order of reading each partition depends on 
> KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns messages.
> 
> Reference:
> assigning partitions: 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
>  
> 
> polling records: 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
>  
> 
> creating a record batch: 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>  
> 
> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath  > wrote:
> The number of partitions assigned to a given split depends on the 
> desiredNumSplits value provided by the runner.
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>  
> 
> 
> (This is assuming that you are using Beam Kafka source not a native Flink 
> override).
> 
> Do you see the same behavior when you increase the number of workers of your 
> Flink cluster ?
> 
> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin  > wrote:
> Hi community, 
> 
> In my pipeline, I am using KafkaIO to read and write. The source topic has 4 
> partitions and pipeline parallelism is 1. 
> 
> I noticed from consumer lag metrics, it will consume from 1 partition until 
> all the messages from that partition is processed then it will consume from 
> another partition. 
> 
> Is this the expected behavior? 
> 
> Runner is Flink. 
> 
> Thanks a lot! 
> Eleanore 



Re: Running NexMark Tests

2020-05-12 Thread Maximilian Michels
A heads-up if anybody else sees this, we have removed the flag:
https://jira.apache.org/jira/browse/BEAM-9900

Further contributions are very welcome :)

-Max

On 11.05.20 17:05, Sruthi Sree Kumar wrote:
> I have opened a PR with the documentation change.
> https://github.com/apache/beam/pull/11662
> 
> Regards,
> Sruthi
> 
> On 2020/04/21 20:22:17, Ismaël Mejía  wrote: 
>> You need to instruct the Flink runner to shutdown the the source
>> otherwise it will stay waiting.
>> You can this by adding the extra
>> argument`--shutdownSourcesOnFinalWatermark=true`
>> And if that works and you want to open a PR to update our
>> documentation that would be greatly appreciated.
>>
>> Regards,
>> Ismaël
>>
>>
>> On Tue, Apr 21, 2020 at 10:04 PM Sruthi Sree Kumar
>>  wrote:
>>>
>>> Hello,
>>>
>>> I am trying to run nexmark queries using flink runner streaming. Followed 
>>> the documentation and used the command
>>> ./gradlew :sdks:java:testing:nexmark:run \
>>>
>>> -Pnexmark.runner=":runners:flink:1.10" \
>>> -Pnexmark.args="
>>> --runner=FlinkRunner
>>> --suite=SMOKE
>>> --streamTimeout=60
>>> --streaming=true
>>> --manageResources=false
>>> --monitorJobs=true
>>> --flinkMaster=[local]"
>>>
>>>
>>> But after the events are read from the source, there is no further progress 
>>> and the job is always stuck at 99%. Is there any configuration that I am 
>>> missing?
>>>
>>> Regards,
>>> Sruthi
>>


Re: GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'

2020-05-12 Thread Brian Hulette
Hi Eila,

It looks like you're attempting to set the option on the GoogleCloudOptions
class directly, I think you want to set it on an instance of
PipelineOptions that you've viewed as GoogleCloudOptions. Like this example
from
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#configuring-pipelineoptions-for-execution-on-the-cloud-dataflow-service

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, staging file location, temp file location, and region.
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
...
# Create the Pipeline with the specified options.
p = Pipeline(options=options)

Alternatively you should be able to just specify --worker_machine_type at
the command line if you're parsing the PipelineOptions from sys.argv. Does
that help?

Brian

On Tue, May 12, 2020 at 8:30 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello,
>
> I am trying to check if the setting of the resources are actually being
> implemented.
> What will be the right way to do it.
> *the code is:*
> GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'
>
> and *the dataflow view is *the following (nothing that reflects
> the highcpu machine.
> Please advice
>
> Thanks,
> Eila
> Resource metrics
> Current vCPUs
>
> 1
>
> Total vCPU time
>
> 0.07 vCPU hr
>
> Current memory
>
> 3.75 GB
>
> Total memory time
>
> 0.264 GB hr
>
> Current PD
>
> 250 GB
>
> Total PD time
>
> 17.632 GB hr
>
> Current SSD PD
>
> 0 B
>
> Total SSD PD time
>
> 0 GB hr
>
>
> --
> Eila
> 
> Meetup 
>


GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'

2020-05-12 Thread OrielResearch Eila Arich-Landkof
Hello,

I am trying to check if the setting of the resources are actually being
implemented.
What will be the right way to do it.
*the code is:*
GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'

and *the dataflow view is *the following (nothing that reflects the highcpu
machine.
Please advice

Thanks,
Eila
Resource metrics
Current vCPUs

1

Total vCPU time

0.07 vCPU hr

Current memory

3.75 GB

Total memory time

0.264 GB hr

Current PD

250 GB

Total PD time

17.632 GB hr

Current SSD PD

0 B

Total SSD PD time

0 GB hr


-- 
Eila

Meetup 


Unbounded stream to FileIO.write

2020-05-12 Thread Nathan Fisher
Hi Folks,

Cross-posting from the Slack channel from the other day.

I started looking at Beam again over the weekend. I have an unbounded
stream with a CassandraIO input and am trying to write files using FileIO
and ParquetIO.

I'm using the following:

Beam: 2.20.0
Flink Runner/Cluster: 1.9(.3)

java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
--sdkWorkerParallelism=0 --runner=FlinkRunner

When submitting to a Flink cluster I include --flinkMaster=localhost:8081
in the command.

If I replace the FileIO with a simple log writer it prints out the records
and makes progress. Using the FileIO with ParquetIO it stalls on the
stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
->
write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous).

That brings me to ask the following questions:

   1. What's the best way to test and monitor a beam pipeline?
   2. What adjustments are required to get this pipeline writing files?
   3. Is there some kind of way to evaluate the DAG and identify scenarios
   where this stall is likely?

   PipelineOptions pipelineOptions = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.create();
Pipeline p = Pipeline.create(pipelineOptions);
CoderRegistry registry = p.getCoderRegistry();
registry.registerCoderForClass(GenericRecord.class,
AvroCoder.of(SCHEMA));PCollection metrics =
p.apply("cassandra",
CassandraIO.read()
.withHosts(hosts)
.withPort(9042)
.withLocalDc("datacenter1")
.withKeyspace(KEY_SPACE)
.withTable(TABLE)
.withMinNumberOfSplits(100)
.withEntity(Metric.class)
.withCoder(SerializableCoder.of(Metric.class)));
metrics.apply("window",
Window.into(
FixedWindows.of(Duration.standardSeconds(30)))
.withAllowedLateness(Duration.standardSeconds(5))
.accumulatingFiredPanes())
.apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG)))
.apply("write", FileIO.write()
.via(ParquetIO.sink(SCHEMA))
.withNumShards(200)
.to("./metrics/")
.withPrefix("metrics")
.withSuffix(".parquet"));
p.run().waitUntilFinish();

I also loaded this into a Flink cluster and it appears to stall on the
temporary file sharding as outlined above and eventually fails after
processing about 600-700k records.

Rereading the windowing section in the document I changed it to
discardFiredPanes() as it seems the more appropriate behaviour for what I
want but that doesn't appear to have changed the results any.

Regards,
-- 
Nathan Fisher
 w: http://junctionbox.ca/


Re: TextIO. Writing late files

2020-05-12 Thread Jose Manuel
Hi,

I would like to clarify that while TextIO is writing every data are in the
files (shards). The losing happens when file names emitted by
getPerDestinationOutputFilenames are processed by a window.

I have created a pipeline to reproduce the scenario in which some filenames
are loss after the getPerDestinationOutputFilenames. Please, note I tried
to simplify the code as much as possible, but the scenario is not easy to
reproduce.

Please check this project https://github.com/kiuby88/windowing-textio
Check readme to build and run (
https://github.com/kiuby88/windowing-textio#build-and-run)
Project contains only a class with the pipeline PipelineWithTextIo,
a log4j2.xml file in the resources and the pom.

The pipeline in PipelineWithTextIo generates unbounded data using a
sequence. It adds a little delay (10s) per data entry, it uses a distinct
(just to apply the window), and then it writes data using TexIO.
The windows for the distinct is fixed (5 seconds) and it does not use
lateness.
Generated files can be found in
windowing-textio/pipe_with_lateness_0s/files. To write files the
FileNamePolicy uses window + timing + shards (see
https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
)
Files are emitted using getPerDestinationOutputFilenames()
(see the code here,
https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
)

Then, File names in the PCollection are extracted and logged. Please, note
file names dot not have pain information in that point.

To apply a window a distinct is used again. Here several files are
discarded as late and they are not processed by this second distinct.
Please, see
https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83

Debug is enabled for WindowTracing, so you can find in the terminal several
messages as the followiing:
DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping
element at 2020-05-12T14:05:14.999Z for
key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
behind inputWatermark:2020-05-12T14:05:19.799Z;
outputWatermark:2020-05-12T14:05:19.799Z`

What happen here? I think that messages are generated per second and a
window of 5 seconds group them. Then a delay is added and finally data are
written in a file.
The pipeline reads more data, increasing the watermark.
Then, file names are emitted without pane information (see "Emitted File"
in logs). Window in second distinct compares file names' timestamp and the
pipeline watermark and then it discards file names as late.


Bonus
-
You can add a lateness to the pipeline. See
https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness

If a minute is added a lateness for window the file names are processed as
late. As result the traces of LateDataFilter disappear.

Moreover, in order to illustrate better that file names are emitted as late
for the second discarded I added a second TextIO to write file names in
other files.
Same FileNamePolicy than before was used (window + timing + shards). Then,
you can find files that contains the original filenames in
windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
interesting part, because you will find several files with LATE in their
names.

Please, let me know if you need more information or if the example is not
enough to check the expected scenarios.

Kby.

















El dom., 10 may. 2020 a las 17:04, Reuven Lax () escribió:

> Pane info is supposed to be preserved across transforms. If the Fink
> runner does not, than I believe that is a bug.
>
> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek 
> wrote:
>
>> I am using FileIO and I do observe the drop of pane info information on
>> Flink runner too. It was mentioned in this thread:
>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>>
>> It is a result of different reshuffle expansion for optimisation reasons.
>> However, I did not observe a data loss in my case. Windowing and watermark
>> info should be preserved. Pane info is not, which brings a question how
>> reliable pane info should be in terms of SDK and runner.
>>
>> If you do observe a data loss, it would be great to share a test case
>> which replicates the problem.
>>
>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax  wrote:
>>
>>> Ah, I think I see the problem.
>>>
>>> It appears that for some reason, the Flink runner loses windowing
>>> information when a Reshuffle is applied. I'm not entirely sure why, because
>>> windowing information should be maintained across a Reshuffle.
>>>
>>> Reuven
>>>
>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel 
>>> wrote:
>>>

 Hi,

 I have added some logs to the pipeline as following (you can find the
 log function in the Appendix):

>

Re: resources management at worker machine or how to debug hanging execution on worker machine

2020-05-12 Thread OrielResearch Eila Arich-Landkof
Hi all,

The first error that eventually disconnects the worker from the
service is: *root
disk error*.
I have changed the machine type to highmem and high-cpu. The error
still appears.
What is required to make sure that the running container will have enough
Persistent Disk space?

insertId:
"s=daca7bc3557c480d9c63083568d20125;i=9e4;b=8519c276c8d648ac8375065aea297519;m=994ae1d;t=5a56cd040a171;x=d65246e7d6d46c8d"

jsonPayload: {
line: "fsHandler.go:121"
*message: **"failed to collect filesystem stats - rootDiskErr: du command
failed on
/var/lib/docker/overlay2/817abf70e0768b2eb7c59fd457c484ebf930f66672e7e503d69b498bfb14c1ab/diff
with output stdout: 3373444
/var/lib/docker/overlay2/817abf70e0768b2eb7c59fd457c484ebf930f66672e7e503d69b498bfb14c1ab/diff*
"
thread: "650"
}
labels: {
compute.googleapis.com/resource_id: "1439140863255482249"
compute.googleapis.com/resource_name:
"step-3-2-n1-highcpu-96-59-05112217-nslq-harness-sl2d"
compute.googleapis.com/resource_type: "instance"
dataflow.googleapis.com/job_id: "2020-05-11_22_17_07-15355301937546864211"
dataflow.googleapis.com/job_name: "step-3-2-n1-highcpu-96-5912"
dataflow.googleapis.com/region: "us-central1"
}
logName: "projects/***/logs/dataflow.googleapis.com%2Fkubelet"
receiveTimestamp: "2020-05-12T05:34:13.646022258Z"
resource: {
labels: {
job_id: "2020-05-11_22_17_07-15355301937546864211"
job_name: "step-3-2-n1-highcpu-96-5912"
project_id: "***"
region: "us-central1"
step_id: ""
}
type: "dataflow_step"
}
severity: "ERROR"
timestamp: "2020-05-12T05:34:12.500823Z"
}

On Mon, May 11, 2020 at 12:52 PM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hi all,
>
> I am trying to run the Kallisto package command on the apache beam worker.
> Below is a table that describes my steps on the apache beam pipeline code
> and local compute Debian machine (new machine). I used both of them
> for debug and comparison.
> On a local machine, the execution completes with no issues. On apache
> beam, I am having issues with no error. Very challenging to debug.
>
> The only issue that I am familiar with the Kallisto package is when there
> is not enough disk for the input and the output. I have added the resources
> commands on the local and remote machine. Please let me know if there is
> another way to manage the resources.
>
> Thank you,
> Eila
>
>
> task
>
> Local
>
> Apache worker
>
> resources
>
> n1-standard-8 (8 vCPUs, 30 GB memory)
>
> 60 GB persistent disk
>
> GoogleCloudOptions.disk_size_gb = 60
>
> GoogleCloudOptions.worker_machine_type = 'n1-standard-4'
>
> anaconda
>
> A created base environment with Kallisto package
>
> Created base environment with kallisto package
>
> command
>
> from subprocess import Popen, PIPE, STDOUT
>
> import logging
>
> script = "/home/eila_orielresearch_org/etc/profile.d/conda.sh"
>
> cmd1 = ". {}; env".format(script)
>
> cmd2 = "echo finished kallisto"
>
> cmd3 = "echo before init"
>
> cmd4 = "conda init --all"
>
> cmd5 = "conda activate"
>
> cmd6 = "kallisto quant -t 2 -i release-99_transcripts.idx --single -l 200
> -s 20 -o srr SRR2144345.fastq"
>
> cmd7 = "conda deactivate"
>
> final = Popen("{}; {}; {}; {}; {}; {};
> {}".format(cmd1,cmd2,cmd3,cmd4,cmd5,cmd6,cmd7), shell=True,
> stdin=PIPE,stdout=PIPE, stderr=STDOUT, close_fds=True)
>
> stdout, nothing = final.communicate()
>
> stdout
>
> from subprocess import Popen, PIPE, STDOUT
>
> import logging
>
> script = "/opt/userowned/etc/profile.d/conda.sh"
>
> cmd1 = ". {}; env".format(script)
>
> cmd2 = "echo finished kallisto"
>
> cmd3 = "echo before init"
>
> cmd4 = "conda init --all"
>
> cmd5 = "conda activate"
>
> cmd6 = "kallisto quant -t 2 -i release-99_transcripts.idx --single -l 200
> -s 20 -o srr SRR2144345.fastq"
>
> cmd7 = "conda deactivate"
>
> final = Popen("{}; {}; {}; {}; {}; {};
> {}".format(cmd1,cmd2,cmd3,cmd4,cmd5,cmd6,cmd7), shell=True,
> stdin=PIPE,stdout=PIPE, stderr=STDOUT, close_fds=True)
>
> stdout, nothing = final.communicate()
>
> stdout
>
> output
>
> eila_orielresearch_org@instance-1:~/srr$ ls -lt
>
> total 8548
>
> -rw-r--r-- 1 eila_orielresearch_org eila_orielresearch_org 2174869 May 11
> 16:19 abundance.h5
>
> -rw-r--r-- 1 eila_orielresearch_org eila_orielresearch_org 6570911 May 11
> 16:19 abundance.tsv
>
> -rw-r--r-- 1 eila_orielresearch_org eila_orielresearch_org 371 May 11
> 16:19 run_info.json
>
> No output.
>
> hanging on the yellow command. no error. restarting DoFn execution
>
>
> --
> Eila
> 
> Meetup 
>


-- 
Eila

Meetup 


Re: PubSub latency for Beam pipeline on Flink runner

2020-05-12 Thread Reza Ardeshir Rokni
Hi Vincent,

Did you mean <=3000 or did you want that to be <=3?

Cheers
Reza

On Fri, 8 May 2020 at 04:23, Vincent Domingues <
vincent.doming...@dailymotion.com> wrote:

> Hi all,
>
> We are trying to work with Beam on Flink runner to consume PubSub messages.
>
> We are facing latency issue even with very low PubSub throughput.
>
> For example if you try the following simple beam pipeline consuming a
> PubSub subscription :
>
>
> ---
>
> package org.apache.beam.examples;
>
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.metrics.Counter;
> import org.apache.beam.sdk.metrics.Distribution;
> import org.apache.beam.sdk.metrics.Metrics;
> import org.apache.beam.sdk.options.Description;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.joda.time.Instant;
>
> import java.util.logging.Logger;
>
> public class PubSubBeam {
>
> private final static Logger LOGGER =
> Logger.getLogger(PubSubBeam.class.getName());
>
> public interface MyOptions extends PipelineOptions {
> @Description("Topic to use in PubSub")
> String getInputSubscription();
> void setInputSubscription(String value);
> }
>
> static class LogMessages extends DoFn {
> private final Distribution distribution =
> Metrics.distribution(this.getClass().getName(), "latency-distribution");
> private final Counter counter_30 =
> Metrics.counter(this.getClass().getName(), "0s_30s");
> private final Counter counter_60 =
> Metrics.counter(this.getClass().getName(), "30s_60s");
> private final Counter counter_90 =
> Metrics.counter(this.getClass().getName(), "60s_90s");
> private final Counter counter_120 =
> Metrics.counter(this.getClass().getName(), "90s_120s");
> private final Counter counter_240 =
> Metrics.counter(this.getClass().getName(), "120s_240s");
> private final Counter counter_inf =
> Metrics.counter(this.getClass().getName(), "240s_infs");
> private final Counter total =
> Metrics.counter(this.getClass().getName(), "total");
>
> @ProcessElement
> public void processElement(ProcessContext c) {
> Long latency = Instant.now().getMillis() -
> c.timestamp().getMillis();
> if (latency <= 3000){
> counter_30.inc();
> }
> else if (latency <= 6){
> counter_60.inc();
> }
> else if (latency <= 9){
> counter_90.inc();
> }
> else if (latency <= 12){
> counter_120.inc();
> }
> else if (latency <= 24){
> counter_240.inc();
> }
> else if (latency > 24){
> counter_inf.inc();
> }
> total.inc();
> distribution.update(latency);
> }
> }
>
> public static void main(String[] args) {
> MyOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
> Pipeline p = Pipeline.create(options);
>
> p.apply("Read PubSub Messages",
> PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
>  .apply(ParDo.of(new LogMessages()));
>
> p.run();
> }
> }
>
>
> ---
>
> If you recover pipeline metrics you'll get this latency distribution:
>
> 0s_30s: 31.81%
> 30s_60s: 64.89%
> 60s_90s: 2.73%
> 90s_120s: 0.44%
> 120s_240s: 0.13%
> 240s_infs: 0.01%
> total: 100.00%
>
>
> With almost no operations on our pipeline 64% of our messages need between
> 30 to 60 seconds to get acknowledged.
>
> Someone already faced this situation or it is a known issue ?
> I am interested on any clue to deal with this latency issue.
>
> Thanks for your help
> Stay safe
>
> Regards,
> Vincent
>