Re: SDK Worker availability metrics

2022-08-10 Thread aryan m
Hi Luke! I agree `sdk_worker_parallelism`don't change after job submission. However, users can change the configuration from m -> n over a period of time. Having this information as a metric helps in observing the behavior/impact of the job with the config change. [1] https://github.com/apac

Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Sorry, I should have said that you should Flatten and do a GroupByKey, not a CoGroupByKey making the pipeline like: PCollectionA -> Flatten -> GroupByKey -> ParDo(EmitOnlyFirstElementPerKey) PCollectionB -/ The CoGroupByKey will have one iterable per PCollection containing zero or more elements de

Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Shivam Singhal
Think this should solve my problem. Thanks Evan ans Luke! On Thu, 11 Aug 2022 at 1:49 AM, Luke Cwik via user wrote: > Use CoGroupByKey to join the two PCollections and emit only the first > value of each iterable with the key. > > Duplicates will appear as iterables with more then one value whi

Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Use CoGroupByKey to join the two PCollections and emit only the first value of each iterable with the key. Duplicates will appear as iterables with more then one value while keys without duplicates will have iterables containing exactly one value. On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal

Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Evan Galpin
Hi Shivam, When you say "merge the PCollections" do you mean Flatten, or somehow join? CoGroupByKey[1] would be a good choice if you need to join based on key. You would then be able to implement application logic to keep 1 of the 2 records if there is a way to decipher an element from CollectionA

Re: SDK Worker availability metrics

2022-08-10 Thread Luke Cwik via user
Flink has a set of workers, each worker has a number of task slots. A pipeline will use the number of slots based upon what it was configured to run with. Are you trying to get the total number of workers, total number of tasks slots, number of task slots your pipeline is using or number of worker

Re: [JAVA] Batch elements from a PCollection

2022-08-10 Thread Cristian Constantinescu
Hi, If the keys bother you, you can .apply(WithKeys.of("")) before the GroupIntoBatches transform. This effectively removes parallelism as all items are funneled through one executor. Note that I think that GroupIntoBatches into batches might be broken on Flink [1]. Alternatively, create your ow

Re: [JAVA] Batch elements from a PCollection

2022-08-10 Thread Shivam Singhal
Is there no other way than https://stackoverflow.com/a/44956702 ? On Thu, 11 Aug 2022 at 1:00 AM, Shivam Singhal wrote: > I have a PCollection of type KV where each key in those > KVs is unique. > > I would like to split all those KV pairs into batches. This new > PCollection will be of type PCo

[JAVA] Batch elements from a PCollection

2022-08-10 Thread Shivam Singhal
I have a PCollection of type KV where each key in those KVs is unique. I would like to split all those KV pairs into batches. This new PCollection will be of type PCollection>>> where the iterable’s length can be configured. I know there is a PTransform called GroupIntoBatches but it batches bas

[JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Shivam Singhal
I have two PCollections, CollectionA & CollectionB of type KV. I would like to merge them into one PCollection but CollectionA & CollectionB might have some elements with the same key. In those repeated cases, I would like to keep the element from CollectionA & drop the repeated element from Coll

Re: Dataflow SQL streaming extensions

2022-08-10 Thread Marcin Kuthan
Hi Andrew Right now I'm much further with Beam SQL experimentation. Instead of Dataflow SQL ( https://cloud.google.com/dataflow/docs/guides/sql/dataflow-sql-intro) I use regular Beam SQL shell, Calcite dialect and Beam SQL extension for external tables registration. Looks much more complete, I'm

Re: read messages from kakfa: 2 different message types in kafka topic

2022-08-10 Thread Alexey Romanenko
Ops, my bad, I misread the initial question - Moritz pointed out that you have the only one topic with two different schemas… I don’t think it’s supported by KafkaIO “out-of-the-box.” In this case, you need either to write your own deserialiser which will distinguish the schemas for every inpu

Re: read messages from kakfa: 2 different message types in kafka topic

2022-08-10 Thread Alexey Romanenko
If you have two topics with different schemas in your pipeline then you need to read them separately with two different KafkaIO instances and configure every instance with a proper deserialiser based on its schema. — Alexey > On 9 Aug 2022, at 22:28, Sigalit Eliazov wrote: > > Thanks for your