KafkaIO Read Latency

2020-06-09 Thread Talat Uyarer
Hi,
I added some metrics on a step right after KafkaIO. When I compare the read
time difference between producer and KafkaIO it is 800ms for P99. However
somehow that step's opening and closing bundle difference is 18 seconds for
p99. The step itself does not do any specific thing. Do you have any idea
why bundle latency is very high ? Where should I check or tune on KafkaIO ?

Additional information I read from one topic. That topic has 15 partitions.
Producer write in a round robin fashion.

Thanks


Re: Pipeline Processing Time

2020-06-09 Thread Talat Uyarer
Thank you Luke and Reuven for helping me. Now I can see my pipeline
processing time for each record.

On Wed, Jun 3, 2020 at 9:25 AM Reuven Lax  wrote:

> Note: you need to tag the timestamp parameter to @ProcessElement with
> the @Timestamp annotation.
>
> On Mon, Jun 1, 2020 at 3:31 PM Luke Cwik  wrote:
>
>> You can configure KafkaIO to use some data from the record as the
>> elements timestamp. See the KafkaIO javadoc around the TimestampPolicy[1],
>> the default is current processing time.
>> You can access the timestamp of the element by adding
>> "org.joda.time.Instant timestamp" as a parameter to your @ProcessElement,
>> see this javadoc for additional details[2]. You could then compute now() -
>> timestamp to calculate processing time.
>>
>> 1:
>> https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/io/kafka/TimestampPolicy.html
>> 
>> 2:
>> https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html
>> 
>>
>> On Mon, Jun 1, 2020 at 2:00 PM Talat Uyarer 
>> wrote:
>>
>>> Sorry for the late response. Where does the beam set that timestamp
>>> field on element ? Is it set whenever KafkaIO reads that element ?
>>>
>> And also I have a windowing function on my pipeline. Does the timestamp
>>> field change for any kind of operation ? On pipeline I have the
>>> following steps: KafkaIO -> Format Conversion Pardo -> SQL Filter ->
>>> Windowing Step -> Custom Sink. If timestamp set in KafkaIO, Can I see
>>> process time by now() - timestamp in Custom Sink ?
>>>
>>>
>> Thanks
>>>
>>> On Thu, May 28, 2020 at 2:07 PM Luke Cwik  wrote:
>>>
 Dataflow provides msec counters for each transform that executes. You
 should be able to get them from stackdriver and see them from the Dataflow
 UI.

 You need to keep track of the timestamp of the element as it flows
 through the system as part of data that goes alongside the element. You can
 use the element's timestamp[1] if that makes sense (it might not if you
 intend to use a timestamp that is from the kafka record itself and the
 record's timestamp isn't the same as the ingestion timestamp). Unless you
 are writing your own sink, the sink won't track the processing time at all
 so you'll need to add a ParDo that goes right before it that writes the
 timing information to wherever you want (a counter, your own metrics
 database, logs, ...).

 1:
 https://github.com/apache/beam/blob/018e889829e300ab9f321da7e0010ff0011a73b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L257
 


 On Thu, May 28, 2020 at 1:12 PM Talat Uyarer <
 tuya...@paloaltonetworks.com> wrote:

> Yes I am trying to track how long it takes for a single element to be
> ingested into the pipeline until it is output somewhere.
>
> My pipeline is unbounded. I am using KafkaIO. I did not think about
> CPU time. if there is a way to track it too, it would be useful to improve
> my metrics.
>
> On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:
>
>> What do you mean by processing time?
>>
>> Are you trying to track how long it takes for a single element to be
>> ingested into the pipeline until it is output somewhere?
>> Do you have a bounded pipeline and want to know how long all the
>> processing takes?
>> Do you care about how much CPU time is being consumed in aggregate
>> for all the processing that your pipeline is doing?
>>
>>
>> On Thu, May 28, 2020 at 11:01 AM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> I am using Dataflow Runner. The pipeline read from kafkaIO and send
>>> Http. I could not find any metadata field on the element to set first 
>>> read
>>> time.
>>>
>>> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver 
>>> wrote:
>>>
 Which runner are you using?

Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

2020-06-09 Thread Lien Michiels
Hi Brian,

Thanks so much for your quick response!

I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
exact same error. Here is the full stacktrace:

(metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
metadata_persistor --project X --environments X --window_size 1
--input_subscription
projects/XXX/subscriptions/mds-internal-item-metadata-subscription
--runner DataflowRunner --temp_location gs:///item-metadata/temp
--staging_location gs://XXX/item-metadata/staging
/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
BeamDeprecationWarning: options is deprecated since First stable release.
References to .options will not be supported
  experiments = p.options.view_as(DebugOptions).experiments or []
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
Traceback (most recent call last):
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
line 8, in 
sys.exit(run())
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
line 246, in run
batch_size=500,
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 524, in __exit__
self.run().wait_until_finish()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 497, in run
self._options).run(False)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 510, in run
return self.runner.run_pipeline(self, self._options)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 484, in run_pipeline
allow_proto_holders=True)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 858, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1238, in from_runner_api
part = context.transforms.get_by_id(transform_id)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1244, in from_runner_api
id in proto.outputs.items()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1244, in 
id in proto.outputs.items()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
line 214, in from_runner_api
element_type=context.element_type_from_coder_id(proto.coder_id),
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 227, in element_type_from_coder_id
self.coders[coder_id].to_type_hint())
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
line 221, in to_type_hint
raise NotImplementedError('BEAM-2717')
NotImplementedError: BEAM-2717

When I was debugging and commenting out the different steps, I noticed the
location in my code that supposedly throws the error changes. Here it
complains about the WriteToBigQuery step (batch_size=500) but if I comment
out that step it just moves on to the one above. It appears it's
consistently thrown on the last run step (don't know if that's helpful,
just thought I'd mention it).

After adding beam.typehints.disable_type_annotations() it still throws the
same error.

Another thing I forgot to mention in my first email is that I registered a
ProtoCoder as suggested at the bottom of this page (
https://beam.apache.org/documentation/sdks/python-type-safety/) as:

beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)

Thanks again, really appreciate your help!
Lien

On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette  wrote:

> Hi Lien,
>
> > First time writing the email list, so please tell me if I'm doing this
> all wrong.
> Not at all! This is exactly the kind of question this list is for
>
> I 

Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

2020-06-09 Thread Pradip Thachile
Quick update: this test code works just fine on Dataflow as well as the 
DirectRunner. Looks like the FlinkRunner is problematic for some reason here.

On 2020/06/08 20:11:13, Pradip Thachile  wrote: 
> Hey folks, 
> 
> I posted this on the Flink user mailing list but didn't get any traction 
> there (potentially since this is Beam related?). I've got a Beam/Python 
> pipeline that works on the DirectRunner and now am trying to run this on a 
> local dev Flink cluster. Running this yields an error out the gate around not 
> being able to deserialize UnboundedSource (my PubSub source). I'm not sure 
> how to debug this and would love to get some feedback on how to solve this 
> issue. I'm also adding in a simple example that reproduces this error.
> 
> Beam SDK: 2.19
> Flink: 1.9.3
> Python: 3.7
> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
> '--flink_submit_uber_jar', '--streaming']
> (Stacktrace below)
> 
> #!/usr/bin/env python3
> import apache_beam as beam
> 
> class DummyPipeline(beam.PTransform):
> def expand(self, p):
> (
> p
> | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
> topic="")
> | beam.Map(print)
> )
> 
> return p
> 
> def main():
> beam_options = [
> # "--runner=DirectRunner",
> "--runner=FlinkRunner",
> "--flink_version=1.9",
> "--flink_submit_uber_jar",
> "--streaming",
> '--save_main_session',
> ]
> popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
> p = beam.Pipeline(options=popts)
> 
> (
> p
> | "Do It" >> DummyPipeline()
> )
> job = p.run()
> job.wait_until_finish()
> 
> if __name__ == "__main__":
> main()
> 
> -Pradip
> 
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - ArtifactStagingService started on localhost:55371
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - Java ExpansionService started on localhost:55372
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - JobService started on localhost:55364
> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker 
> - Invoking job 
> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f 
> with pipeline runner 
> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
> [grpc-default-executor-0] INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting 
> job invocation 
> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f
> [flink-runner-job-invoker] INFO 
> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
> Flink program.
> [flink-runner-job-invoker] INFO 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a 
> Streaming Environment.
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation 
> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f.
> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
>