Re: SDK Worker availability metrics

2022-08-10 Thread aryan m
Hi Luke!
   I agree `sdk_worker_parallelism`don't change after job submission.
However, users can change the configuration from m -> n over a period of
time.
   Having this information as a metric helps in observing the
behavior/impact of the job with the config change.

[1]
https://github.com/apache/beam/blob/master/website/www/site/content/en/documentation/runtime/sdk-harness-config.md#sdk-harness-configuration


On Wed, Aug 10, 2022 at 1:05 PM Luke Cwik  wrote:

> Flink has a set of workers, each worker has a number of task slots. A
> pipeline will use the number of slots based upon what it was configured to
> run with.
>
> Are you trying to get the total number of workers, total number of tasks
> slots, number of task slots your pipeline is using or number of workers
> your pipeline is executing on?
>
> I was under the impression that the first two were properties of the Flink
> cluster and don't change while the third property is configured at job
> submission time and also doesn't change.
>
> I may not be understanding what you're trying to measure and why at
> pipeline runtime for Flink since many of these values don't change through
> the lifetime of the cluster and/or job.
>
> On Mon, Aug 8, 2022 at 4:59 PM aryan m  wrote:
>
>> Hi Luke!
>> Thanks !! We use the Flink Runner and run SDK workers as processes
>> [1] within a k8s pod. Can you please share broad steps on how one can do in
>> the runner ?
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
>>
>>
>> On Mon, Aug 8, 2022 at 8:51 AM Luke Cwik via user 
>> wrote:
>>
>>> That code only executes within a runner and is only used by certain
>>> runners and wouldn't work in general from user code that is monitoring the
>>> job or user code executing within one of the workers.
>>>
>>> You would need to author code that is likely runner specific to look up
>>> the number of workers associated with a job as I don't believe there is a
>>> general way to do this for an arbitrary Apache Beam runner.
>>>
>>>  Which runner would you most likely want to use?
>>>
>>> On Sun, Aug 7, 2022 at 1:02 PM aryan m  wrote:
>>>
 Hi Users!
 Is there a recommended approach to publish metrics on the number of
 sdk workers available/running as a gauge ?


 [1]
 https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L267
 [2]
 https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L148


 -- Aryan

>>>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Sorry, I should have said that you should Flatten and do a GroupByKey, not
a CoGroupByKey making the pipeline like:
PCollectionA -> Flatten -> GroupByKey -> ParDo(EmitOnlyFirstElementPerKey)
PCollectionB -/

The CoGroupByKey will have one iterable per PCollection containing zero or
more elements depending on how many elements each PCollection had for that
key. So yes you could solve it with CoGroupByKey but Flatten+GroupByKey is
much simpler.

On Wed, Aug 10, 2022 at 1:31 PM Shivam Singhal 
wrote:

> Think this should solve my problem.
>
> Thanks Evan ans Luke!
>
> On Thu, 11 Aug 2022 at 1:49 AM, Luke Cwik via user 
> wrote:
>
>> Use CoGroupByKey to join the two PCollections and emit only the first
>> value of each iterable with the key.
>>
>> Duplicates will appear as iterables with more then one value while keys
>> without duplicates will have iterables containing exactly one value.
>>
>> On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal <
>> shivamsinghal5...@gmail.com> wrote:
>>
>>> I have two PCollections, CollectionA & CollectionB of type KV>> Byte[]>.
>>>
>>>
>>> I would like to merge them into one PCollection but CollectionA &
>>> CollectionB might have some elements with the same key. In those repeated
>>> cases, I would like to keep the element from CollectionA & drop the
>>> repeated element from CollectionB.
>>>
>>> Does anyone know a simple method to do this?
>>>
>>> Thanks,
>>> Shivam Singhal
>>>
>>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Shivam Singhal
Think this should solve my problem.

Thanks Evan ans Luke!

On Thu, 11 Aug 2022 at 1:49 AM, Luke Cwik via user 
wrote:

> Use CoGroupByKey to join the two PCollections and emit only the first
> value of each iterable with the key.
>
> Duplicates will appear as iterables with more then one value while keys
> without duplicates will have iterables containing exactly one value.
>
> On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal <
> shivamsinghal5...@gmail.com> wrote:
>
>> I have two PCollections, CollectionA & CollectionB of type KV> Byte[]>.
>>
>>
>> I would like to merge them into one PCollection but CollectionA &
>> CollectionB might have some elements with the same key. In those repeated
>> cases, I would like to keep the element from CollectionA & drop the
>> repeated element from CollectionB.
>>
>> Does anyone know a simple method to do this?
>>
>> Thanks,
>> Shivam Singhal
>>
>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Use CoGroupByKey to join the two PCollections and emit only the first value
of each iterable with the key.

Duplicates will appear as iterables with more then one value while keys
without duplicates will have iterables containing exactly one value.

On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal 
wrote:

> I have two PCollections, CollectionA & CollectionB of type KV Byte[]>.
>
>
> I would like to merge them into one PCollection but CollectionA &
> CollectionB might have some elements with the same key. In those repeated
> cases, I would like to keep the element from CollectionA & drop the
> repeated element from CollectionB.
>
> Does anyone know a simple method to do this?
>
> Thanks,
> Shivam Singhal
>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Evan Galpin
Hi Shivam,

When you say "merge the PCollections" do you mean Flatten, or somehow join?
CoGroupByKey[1] would be a good choice if you need to join based on key.
You would then be able to implement application logic to keep 1 of the 2
records if there is a way to decipher an element from CollectionA vs.
CollectionB by only examining the elements.

If there isn't a natural way of determining which element to keep by only
examining the elements themselves, you could further nest the data in a KV
ex. If CollectionA holds data like KV and CollectionB is KV
you could transform these into something like KV> and KV>. Then when you CoGroupByKey, these
elements would be grouped based on both having k1, and the source/origin
PCollection could be deciphered based on the key of the inner KV.

Thanks,
Evan

[1]
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/

On Wed, Aug 10, 2022 at 3:25 PM Shivam Singhal 
wrote:

> I have two PCollections, CollectionA & CollectionB of type KV Byte[]>.
>
>
> I would like to merge them into one PCollection but CollectionA &
> CollectionB might have some elements with the same key. In those repeated
> cases, I would like to keep the element from CollectionA & drop the
> repeated element from CollectionB.
>
> Does anyone know a simple method to do this?
>
> Thanks,
> Shivam Singhal
>


Re: SDK Worker availability metrics

2022-08-10 Thread Luke Cwik via user
Flink has a set of workers, each worker has a number of task slots. A
pipeline will use the number of slots based upon what it was configured to
run with.

Are you trying to get the total number of workers, total number of tasks
slots, number of task slots your pipeline is using or number of workers
your pipeline is executing on?

I was under the impression that the first two were properties of the Flink
cluster and don't change while the third property is configured at job
submission time and also doesn't change.

I may not be understanding what you're trying to measure and why at
pipeline runtime for Flink since many of these values don't change through
the lifetime of the cluster and/or job.

On Mon, Aug 8, 2022 at 4:59 PM aryan m  wrote:

> Hi Luke!
> Thanks !! We use the Flink Runner and run SDK workers as processes [1]
> within a k8s pod. Can you please share broad steps on how one can do in the
> runner ?
>
>
> [1]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
>
>
> On Mon, Aug 8, 2022 at 8:51 AM Luke Cwik via user 
> wrote:
>
>> That code only executes within a runner and is only used by certain
>> runners and wouldn't work in general from user code that is monitoring the
>> job or user code executing within one of the workers.
>>
>> You would need to author code that is likely runner specific to look up
>> the number of workers associated with a job as I don't believe there is a
>> general way to do this for an arbitrary Apache Beam runner.
>>
>>  Which runner would you most likely want to use?
>>
>> On Sun, Aug 7, 2022 at 1:02 PM aryan m  wrote:
>>
>>> Hi Users!
>>> Is there a recommended approach to publish metrics on the number of
>>> sdk workers available/running as a gauge ?
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L267
>>> [2]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L148
>>>
>>>
>>> -- Aryan
>>>
>>


Re: [JAVA] Batch elements from a PCollection

2022-08-10 Thread Cristian Constantinescu
Hi,

If the keys bother you, you can .apply(WithKeys.of("")) before the
GroupIntoBatches transform. This effectively removes parallelism as all
items are funneled through one executor.

Note that I think that GroupIntoBatches into batches might be broken on
Flink [1].

Alternatively, create your own stateful ParDo (needs a KV input) that
counts and outputs.

Hope it helps,
Cristian

[1] https://lists.apache.org/thread/xo714ntwxgpm199kqrwt9lzn40z882t1

On Wed, Aug 10, 2022 at 3:45 PM Shivam Singhal 
wrote:

> Is there no other way than
> https://stackoverflow.com/a/44956702 ?
>
> On Thu, 11 Aug 2022 at 1:00 AM, Shivam Singhal <
> shivamsinghal5...@gmail.com> wrote:
>
>> I have a PCollection of type KV where each key in those
>> KVs is unique.
>>
>> I would like to split all those KV pairs into batches. This new
>> PCollection will be of type PCollection>>>
>> where the iterable’s length can be configured.
>>
>>
>> I know there is a PTransform called GroupIntoBatches but it batches based
>> in the keys which is not my usecase.
>>
>> Will be great if someone could help in this.
>>
>> Thanks,
>> Shivam Singhal
>>
>


Re: [JAVA] Batch elements from a PCollection

2022-08-10 Thread Shivam Singhal
Is there no other way than
https://stackoverflow.com/a/44956702 ?

On Thu, 11 Aug 2022 at 1:00 AM, Shivam Singhal 
wrote:

> I have a PCollection of type KV where each key in those
> KVs is unique.
>
> I would like to split all those KV pairs into batches. This new
> PCollection will be of type PCollection>>>
> where the iterable’s length can be configured.
>
>
> I know there is a PTransform called GroupIntoBatches but it batches based
> in the keys which is not my usecase.
>
> Will be great if someone could help in this.
>
> Thanks,
> Shivam Singhal
>


[JAVA] Batch elements from a PCollection

2022-08-10 Thread Shivam Singhal
I have a PCollection of type KV where each key in those KVs
is unique.

I would like to split all those KV pairs into batches. This new PCollection
will be of type PCollection>>> where the
iterable’s length can be configured.


I know there is a PTransform called GroupIntoBatches but it batches based
in the keys which is not my usecase.

Will be great if someone could help in this.

Thanks,
Shivam Singhal


[JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Shivam Singhal
I have two PCollections, CollectionA & CollectionB of type KV.


I would like to merge them into one PCollection but CollectionA &
CollectionB might have some elements with the same key. In those repeated
cases, I would like to keep the element from CollectionA & drop the
repeated element from CollectionB.

Does anyone know a simple method to do this?

Thanks,
Shivam Singhal


Re: Dataflow SQL streaming extensions

2022-08-10 Thread Marcin Kuthan
Hi Andrew

Right now I'm much further with Beam SQL experimentation. Instead of
Dataflow SQL (
https://cloud.google.com/dataflow/docs/guides/sql/dataflow-sql-intro) I use
regular Beam SQL shell, Calcite dialect and Beam SQL extension for external
tables registration.

Looks much more complete, I'm able to register Pubsub topics as tables,
specify timestamp attribute for event time tracking and save the processing
results into BigQuery. Everything works as expected and the correct results
are calculated in the event time domain.

Because everything is a little magic and undocumented, I'm going to publish
the blog post about it
.
If you have spare 10 minutes I would appreciate any review and comments to
the early version :)
https://github.com/mkuthan/mkuthan.github.io/pull/11/files?short_path=60a54d1#diff-60a54d1fc16bb898873e0583526c01b7a535764358fe24105c0b2678ed91c3c5

Best regards,
Marcin




On Tue, 9 Aug 2022 at 21:31, Andrew Pilloud via user 
wrote:

> Hi Marcin,
>
> I'm having a little trouble understanding this. I think this is a
> summary of your problem statement: You have a pipeline that windows
> data on event time. Your event generator has an artificial 30 second
> delay. The pipeline appears to be experiencing a 10-20 second delay
> instead of the expected 30 second delay so you think it may be using
> processing time instead of event time. You want some help
> investigating the issue.
>
> Is it possible that your clocks are not synchronised as well as you
> think they are? The 30 second delay is somewhat small, does the issue
> persist if you up it to an hour?
>
> This list isn't going to be much help in debugging Dataflow SQL
> issues, you should contact GCP support for that, but we can help with
> Beam SQL (which it is based on). Most Beam SQL pipelines only support
> using an older syntax where the windows are in a GROUP BY clause. I
> believe the GROUP BY format is supported by Dataflow SQL, can you try
> that? Documentation is here:
>
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
>
> Andrew
>
>
> On Fri, Aug 5, 2022 at 8:15 AM Marcin Kuthan 
> wrote:
> >
> > Hi
> >
> >
> >
> > I'm experimenting with Dataflow SQL streaming extension and I observed
> that the event_timestamp field in the payload is ignored.
> >
> >
> >
> > I would like to calculate the average value of the values reported by
> the sensor every 5 seconds.
> >
> >
> >
> > SELECT CURRENT_TIMESTAMP() AS created_at, * FROM
> >
> > (SELECT
> >
> > s1.window_start AS window_start,
> >
> > s1.window_end AS window_end,
> >
> > MIN(event_timestamp) AS event_timestamp_min,
> >
> > MAX(event_timestamp) AS event_timestamp_max,
> >
> > AVG(s1.sensor_value) AS sensor_value_avg,
> >
> > FROM TUMBLE(
> >
> > (SELECT * FROM
> pubsub.topic.`sc-9366-nga-dev`.`marcin-atm22-signal-1`),
> >
> > DESCRIPTOR(event_timestamp),
> >
> > "INTERVAL 5 SECOND"
> >
> > ) as s1
> >
> > GROUP BY window_start, window_end)
> >
> >
> >
> > For testing purposes sensor data is artificially generated, and
> event_timestamp is always 30 seconds behind current time.
> >
> >
> >
> > current timestamp: 2022-08-05T15:00:24+00:00
> >
> > {'event_timestamp': '2022-08-05T14:59:54+00:00', 'sensor_value':
> 0.4083962116009032}
> >
> >
> >
> > But I get the following result at 15:00:28 (the latest row stored in BQ)
> :
> >
> > [{
> >
> >   "created_at": "2022-08-05T15:00:20.170Z",
> >
> >   "window_start": "2022-08-05T15:00:05Z",
> >
> >   "window_end": "2022-08-05T15:00:10Z",
> >
> >   "event_timestamp_min": "2022-08-05T15:00:05.019Z",
> >
> >   "event_timestamp_max": "2022-08-05T15:00:09.035Z",
> >
> >   "sensor_value_avg": "0.1612730883"
> >
> > }]
> >
> >
> >
> > Why is there a record created at 15:00:20 with a window
> 15:00:05-15:00:10 if the input event_time is always delayed by 30 seconds?
> At 15:00:20 the latest emitted sensor event_timestamp is ~ 14:59:50.
> >
> >
> >
> > Moreover the watermark lag reported by dataflow is always 10-20 seconds,
> even if the event_timestamp reported by the sensor is far behind the
> wallclock.
> >
> >
> > Any ideas?
> >
> >
> > Regards,
> >
> > Marcin
> >
> >
>


Re: read messages from kakfa: 2 different message types in kafka topic

2022-08-10 Thread Alexey Romanenko
Ops, my bad, I misread the initial question - Moritz pointed out that you have 
the only one topic with two different schemas… 

I don’t think it’s supported by KafkaIO “out-of-the-box.” In this case, you 
need either to write your own deserialiser which will distinguish the schemas 
for every input message or split this topic into two where every topic contains 
the messages with only one schema or use Avro union as it was suggested above.

—
Alexey

> On 10 Aug 2022, at 15:03, Alexey Romanenko  wrote:
> 
> If you have two topics with different schemas in your pipeline then you need 
> to read them separately with two different KafkaIO instances and configure 
> every instance with a proper deserialiser based on its schema.
> 
> —
> Alexey
> 
>> On 9 Aug 2022, at 22:28, Sigalit Eliazov > > wrote:
>> 
>> Thanks for your response
>> we have different messages with separate schemas.
>> 
>> I'll review the suggested solution.
>> BR
>> Sigalit
>> 
>> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack > > wrote:
>> Hi Sigalit,
>> 
>>  
>> 
>> Could you explain a bit more in detail what you mean by 2 different types of 
>> messages?
>> 
>> Do they share the same schema, e.g. using a union / one of type? Or are you 
>> in fact talking about different messages with separate schemas (e.g. 
>> discriminated using a message header)?
>> 
>>  
>> 
>> The recommended usage (at least with Confluent) is to use one schema per 
>> topic. Using the Confluent registry it’s fairly simple then:
>> 
>>  
>> 
>>  .withValueDeserializer(
>> 
>> 
>> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* 
>> latest */, config)))
>> 
>>  
>> 
>> Most likely you have to implement a similar DeserializerProvider for 
>> Apicurio. You could also try using  apicurio.registry.as-confluent, but that 
>> requires to configure your producers accordingly.
>> 
>> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. 
>> That should lead you a path forward.
>> 
>>  
>> 
>> Best,
>> 
>> Moritz
>> 
>>  
>> 
>> On 09.08.22, 13:08, "Sigalit Eliazov" > > wrote:
>> 
>>  
>> 
>> Hi all we have a single kafka topic which is used to receive 2 different 
>> types of messages. These 2 messages are Avro. So when reading messages from 
>> kafka i used the GenericRecord KafkaIO. read() 
>> .withBootstrapServers(bootstrapServers)
>> 
>> Hi all
>> 
>> we have a single kafka topic which is used to receive 2 different types of 
>> messages.
>> 
>> These 2 messages are Avro.
>> 
>> So when reading messages from kafka i used the GenericRecord
>> 
>>  
>> 
>> KafkaIO.read()
>> .withBootstrapServers(bootstrapServers)
>> .withTopic(topic)
>> .withConsumerConfigUpdates(ImmutableMap.of(
>> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
>> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
>> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
>> ))
>> .withKeyDeserializer(StringDeserializer.class)
>> I am not sure how to define the withValueDeserializer and coder.
>> i tried to read the message as GenericRecord but it fails with
>>  "Could not extract the Kafka Deserializer type from class 
>> io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
>> i am using apicurio as the schema registry
>>  
>> Thanks
>> Sigalit
>> As a recipient of an email from Talend, your contact personal data will be 
>> on our systems. Please see our privacy notice. 
>> 
>> 
> 



Re: read messages from kakfa: 2 different message types in kafka topic

2022-08-10 Thread Alexey Romanenko
If you have two topics with different schemas in your pipeline then you need to 
read them separately with two different KafkaIO instances and configure every 
instance with a proper deserialiser based on its schema.

—
Alexey

> On 9 Aug 2022, at 22:28, Sigalit Eliazov  wrote:
> 
> Thanks for your response
> we have different messages with separate schemas.
> 
> I'll review the suggested solution.
> BR
> Sigalit
> 
> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack  > wrote:
> Hi Sigalit,
> 
>  
> 
> Could you explain a bit more in detail what you mean by 2 different types of 
> messages?
> 
> Do they share the same schema, e.g. using a union / one of type? Or are you 
> in fact talking about different messages with separate schemas (e.g. 
> discriminated using a message header)?
> 
>  
> 
> The recommended usage (at least with Confluent) is to use one schema per 
> topic. Using the Confluent registry it’s fairly simple then:
> 
>  
> 
>  .withValueDeserializer(
> 
> 
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* 
> latest */, config)))
> 
>  
> 
> Most likely you have to implement a similar DeserializerProvider for 
> Apicurio. You could also try using  apicurio.registry.as-confluent, but that 
> requires to configure your producers accordingly.
> 
> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. 
> That should lead you a path forward.
> 
>  
> 
> Best,
> 
> Moritz
> 
>  
> 
> On 09.08.22, 13:08, "Sigalit Eliazov"  > wrote:
> 
>  
> 
> Hi all we have a single kafka topic which is used to receive 2 different 
> types of messages. These 2 messages are Avro. So when reading messages from 
> kafka i used the GenericRecord KafkaIO. read() 
> .withBootstrapServers(bootstrapServers)
> 
> Hi all
> 
> we have a single kafka topic which is used to receive 2 different types of 
> messages.
> 
> These 2 messages are Avro.
> 
> So when reading messages from kafka i used the GenericRecord
> 
>  
> 
> KafkaIO.read()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withConsumerConfigUpdates(ImmutableMap.of(
> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
> ))
> .withKeyDeserializer(StringDeserializer.class)
> I am not sure how to define the withValueDeserializer and coder.
> i tried to read the message as GenericRecord but it fails with
>  "Could not extract the Kafka Deserializer type from class 
> io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
> i am using apicurio as the schema registry
>  
> Thanks
> Sigalit
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice. 
>