Re: Any recomendation for key for GroupIntoBatches
Hi, Might be late to the discussion, but providing another option (as I think it was not mentioned or I missed it). Take a look at [this](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) as I think this is precisely what you want to achieve. Compared to other answers: - this one is elastic, to fit any downstream use case - no custom code - native Beam transform - no shuffling of the data required as the data would be batched on the worker already having the data (but pay attention to the max msg size limit of your runner) - shuffling would be required when creating artificial random-looking keys. Note that above is Python, but I do bet there is Java counterpart (or at least easy to implement). Best Wiśniowski Piotr On 15.04.2024 19:14, Reuven Lax via user wrote: There are various strategies. Here is an example of how Beam does it (taken from Reshuffle.viaRandomKey().withNumBuckets(N) Note that this does some extra hashing to work around issues with the Spark runner. If you don't care about that, you could implement something simpler (e.g. initialize shard to a random number in StartBundle, and increment it mod numBuckets in each processelement call). public static class AssignShardFn extends DoFn> { private int shard; private @Nullable Integer numBuckets; public AssignShardFn(@Nullable Integer numBuckets) { this.numBuckets = numBuckets; } @Setup public void setup() { shard =ThreadLocalRandom.current().nextInt(); } @ProcessElement public void processElement(@Element T element,OutputReceiver> r) { ++shard; // Smear the shard into something more random-looking, to avoid issues // with runners that don't properly hash the key being shuffled, but rely // on it being random-looking. E.g. Spark takes the Java hashCode() of keys, // which for Integer is a no-op and it is an issue: // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- // spark.html // This hashing strategy is copied from // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). int hashOfShard =0x1b873593 *Integer.rotateLeft(shard *0xcc9e2d51,15); if (numBuckets !=null) { UnsignedInteger unsignedNumBuckets =UnsignedInteger.fromIntBits(numBuckets); hashOfShard =UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue(); } r.output(KV.of(hashOfShard, element)); } } On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas wrote: Good day, Ruben, Would you be able to compute a shasum on the group of IDs to use as the key? Best, Damon On 2024/04/12 19:22:45 Ruben Vargas wrote: > Hello guys > > Maybe this question was already answered, but I cannot find it and > want some more input on this topic. > > I have some messages that don't have any particular key candidate, > except the ID, but I don't want to use it because the idea is to > group multiple IDs in the same batch. > > This is my use case: > > I have an endpoint where I'm gonna send the message ID, this endpoint > is gonna return me certain information which I will use to enrich my > message. In order to avoid fetching the endpoint per message I want to > batch it in 100 and send the 100 IDs in one request ( the endpoint > supports it) . I was thinking on using GroupIntoBatches. > > - If I choose the ID as the key, my understanding is that it won't > work in the way I want (because it will form batches of the same ID). > - Use a constant will be a problem for parallelism, is that correct? > > Then my question is, what should I use as a key? Maybe something > regarding the timestamp? so I can have groups of messages that arrive > at a certain second? > > Any suggestions would be appreciated > > Thanks. >
Re: Any recomendation for key for GroupIntoBatches
On Fri, Apr 12, 2024 at 1:39 PM Ruben Vargas wrote: > On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: > > > > Here is an example from a book that I'm reading now and it may be > applicable. > > > > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 > > PYTHON - ord(id[0]) % 100 > or abs(hash(id)) % 100, in case the first character of your id is not well distributed. > Maybe this is what I'm looking for. I'll give it a try. Thanks! > > > > > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian > wrote: > >> > >> How about just keeping track of a buffer and flush the buffer after 100 > messages and if there is a buffer on finish_bundle as well? > >> > >> > > If this is in memory, It could lead to potential loss of data. That is > why the state is used or at least that is my understanding. but maybe > there is a way to do this in the state? > Bundles are the unit of commitment in Beam [1], so finish_bundle won't drop any data. A possible downside is that, especially in streaming, they may be small which would cap the amount of batching you get. https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence > >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas > wrote: > >>> > >>> Hello guys > >>> > >>> Maybe this question was already answered, but I cannot find it and > >>> want some more input on this topic. > >>> > >>> I have some messages that don't have any particular key candidate, > >>> except the ID, but I don't want to use it because the idea is to > >>> group multiple IDs in the same batch. > >>> > >>> This is my use case: > >>> > >>> I have an endpoint where I'm gonna send the message ID, this endpoint > >>> is gonna return me certain information which I will use to enrich my > >>> message. In order to avoid fetching the endpoint per message I want to > >>> batch it in 100 and send the 100 IDs in one request ( the endpoint > >>> supports it) . I was thinking on using GroupIntoBatches. > >>> > >>> - If I choose the ID as the key, my understanding is that it won't > >>> work in the way I want (because it will form batches of the same ID). > >>> - Use a constant will be a problem for parallelism, is that correct? > >>> > >>> Then my question is, what should I use as a key? Maybe something > >>> regarding the timestamp? so I can have groups of messages that arrive > >>> at a certain second? > >>> > >>> Any suggestions would be appreciated > >>> > >>> Thanks. >
Re: Any recomendation for key for GroupIntoBatches
There are various strategies. Here is an example of how Beam does it (taken from Reshuffle.viaRandomKey().withNumBuckets(N) Note that this does some extra hashing to work around issues with the Spark runner. If you don't care about that, you could implement something simpler (e.g. initialize shard to a random number in StartBundle, and increment it mod numBuckets in each processelement call). public static class AssignShardFn extends DoFn> { private int shard; private @Nullable Integer numBuckets; public AssignShardFn(@Nullable Integer numBuckets) { this.numBuckets = numBuckets; } @Setup public void setup() { shard = ThreadLocalRandom.current().nextInt(); } @ProcessElement public void processElement(@Element T element, OutputReceiver> r) { ++shard; // Smear the shard into something more random-looking, to avoid issues // with runners that don't properly hash the key being shuffled, but rely // on it being random-looking. E.g. Spark takes the Java hashCode() of keys, // which for Integer is a no-op and it is an issue: // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- // spark.html // This hashing strategy is copied from // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15); if (numBuckets != null) { UnsignedInteger unsignedNumBuckets = UnsignedInteger.fromIntBits(numBuckets); hashOfShard = UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue(); } r.output(KV.of(hashOfShard, element)); } } On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas wrote: > Good day, Ruben, > > Would you be able to compute a shasum on the group of IDs to use as the > key? > > Best, > > Damon > > On 2024/04/12 19:22:45 Ruben Vargas wrote: > > Hello guys > > > > Maybe this question was already answered, but I cannot find it and > > want some more input on this topic. > > > > I have some messages that don't have any particular key candidate, > > except the ID, but I don't want to use it because the idea is to > > group multiple IDs in the same batch. > > > > This is my use case: > > > > I have an endpoint where I'm gonna send the message ID, this endpoint > > is gonna return me certain information which I will use to enrich my > > message. In order to avoid fetching the endpoint per message I want to > > batch it in 100 and send the 100 IDs in one request ( the endpoint > > supports it) . I was thinking on using GroupIntoBatches. > > > > - If I choose the ID as the key, my understanding is that it won't > > work in the way I want (because it will form batches of the same ID). > > - Use a constant will be a problem for parallelism, is that correct? > > > > Then my question is, what should I use as a key? Maybe something > > regarding the timestamp? so I can have groups of messages that arrive > > at a certain second? > > > > Any suggestions would be appreciated > > > > Thanks. > > >
Re: Any recomendation for key for GroupIntoBatches
Good day, Ruben, Would you be able to compute a shasum on the group of IDs to use as the key? Best, Damon On 2024/04/12 19:22:45 Ruben Vargas wrote: > Hello guys > > Maybe this question was already answered, but I cannot find it and > want some more input on this topic. > > I have some messages that don't have any particular key candidate, > except the ID, but I don't want to use it because the idea is to > group multiple IDs in the same batch. > > This is my use case: > > I have an endpoint where I'm gonna send the message ID, this endpoint > is gonna return me certain information which I will use to enrich my > message. In order to avoid fetching the endpoint per message I want to > batch it in 100 and send the 100 IDs in one request ( the endpoint > supports it) . I was thinking on using GroupIntoBatches. > > - If I choose the ID as the key, my understanding is that it won't > work in the way I want (because it will form batches of the same ID). > - Use a constant will be a problem for parallelism, is that correct? > > Then my question is, what should I use as a key? Maybe something > regarding the timestamp? so I can have groups of messages that arrive > at a certain second? > > Any suggestions would be appreciated > > Thanks. >
Re: Any recomendation for key for GroupIntoBatches
Yeah unfortunately the data on the endpoint could change at any point in time and I need to make sure to have the latest one :/ That limits my options here. But I also have other sources that can benefit from this caching :) Thank you very much! On Mon, Apr 15, 2024 at 9:37 AM XQ Hu wrote: > > I am not sure you still need to do batching since Web API can handle caching. > > If you really need it, I think GoupIntoBatches is a good way to go. > > On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas wrote: >> >> Is there a way to do batching in that transformation? I'm assuming for >> now no. or may be using in conjuntion with GoupIntoBatches >> >> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas wrote: >> > >> > Interesting >> > >> > I think the cache feature could be interesting for some use cases I have. >> > >> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu wrote: >> > > >> > > For the new web API IO, the page lists these features: >> > > >> > > developers provide minimal code that invokes Web API endpoint >> > > delegate to the transform to handle request retries and exponential >> > > backoff >> > > optional caching of request and response associations >> > > optional metrics >> > > >> > > >> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas >> > > wrote: >> > >> >> > >> That one looks interesting >> > >> >> > >> What is not clear to me is what are the advantages of using it? Is >> > >> only the error/retry handling? anything in terms of performance? >> > >> >> > >> My PCollection is unbounded but I was thinking of sending my messages >> > >> in batches to the external API in order to gain some performance >> > >> (don't expect to send 1 http request per message). >> > >> >> > >> Thank you very much for all your responses! >> > >> >> > >> >> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user >> > >> wrote: >> > >> > >> > >> > To enrich your data, have you checked >> > >> > https://cloud.google.com/dataflow/docs/guides/enrichment? >> > >> > >> > >> > This transform is built on top of >> > >> > https://beam.apache.org/documentation/io/built-in/webapis/ >> > >> > >> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas >> > >> > wrote: >> > >> >> >> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim >> > >> >> wrote: >> > >> >> > >> > >> >> > Here is an example from a book that I'm reading now and it may be >> > >> >> > applicable. >> > >> >> > >> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 >> > >> >> > PYTHON - ord(id[0]) % 100 >> > >> >> >> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks! >> > >> >> >> > >> >> > >> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian >> > >> >> > wrote: >> > >> >> >> >> > >> >> >> How about just keeping track of a buffer and flush the buffer >> > >> >> >> after 100 messages and if there is a buffer on finish_bundle as >> > >> >> >> well? >> > >> >> >> >> > >> >> >> >> > >> >> >> > >> >> If this is in memory, It could lead to potential loss of data. That >> > >> >> is >> > >> >> why the state is used or at least that is my understanding. but maybe >> > >> >> there is a way to do this in the state? >> > >> >> >> > >> >> >> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas >> > >> >> >> wrote: >> > >> >> >>> >> > >> >> >>> Hello guys >> > >> >> >>> >> > >> >> >>> Maybe this question was already answered, but I cannot find it >> > >> >> >>> and >> > >> >> >>> want some more input on this topic. >> > >> >> >>> >> > >> >> >>> I have some messages that don't have any particular key >> > >> >> >>> candidate, >> > >> >> >>> except the ID, but I don't want to use it because the idea is to >> > >> >> >>> group multiple IDs in the same batch. >> > >> >> >>> >> > >> >> >>> This is my use case: >> > >> >> >>> >> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this >> > >> >> >>> endpoint >> > >> >> >>> is gonna return me certain information which I will use to >> > >> >> >>> enrich my >> > >> >> >>> message. In order to avoid fetching the endpoint per message I >> > >> >> >>> want to >> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the >> > >> >> >>> endpoint >> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches. >> > >> >> >>> >> > >> >> >>> - If I choose the ID as the key, my understanding is that it >> > >> >> >>> won't >> > >> >> >>> work in the way I want (because it will form batches of the same >> > >> >> >>> ID). >> > >> >> >>> - Use a constant will be a problem for parallelism, is that >> > >> >> >>> correct? >> > >> >> >>> >> > >> >> >>> Then my question is, what should I use as a key? Maybe something >> > >> >> >>> regarding the timestamp? so I can have groups of messages that >> > >> >> >>> arrive >> > >> >> >>> at a certain second? >> > >> >> >>> >> > >> >> >>> Any suggestions would be appreciated >> > >> >> >>> >> > >> >> >>> Thanks.
Re: Any recomendation for key for GroupIntoBatches
I am not sure you still need to do batching since Web API can handle caching. If you really need it, I think GoupIntoBatches is a good way to go. On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas wrote: > Is there a way to do batching in that transformation? I'm assuming for > now no. or may be using in conjuntion with GoupIntoBatches > > On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas > wrote: > > > > Interesting > > > > I think the cache feature could be interesting for some use cases I have. > > > > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu wrote: > > > > > > For the new web API IO, the page lists these features: > > > > > > developers provide minimal code that invokes Web API endpoint > > > delegate to the transform to handle request retries and exponential > backoff > > > optional caching of request and response associations > > > optional metrics > > > > > > > > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas > wrote: > > >> > > >> That one looks interesting > > >> > > >> What is not clear to me is what are the advantages of using it? Is > > >> only the error/retry handling? anything in terms of performance? > > >> > > >> My PCollection is unbounded but I was thinking of sending my messages > > >> in batches to the external API in order to gain some performance > > >> (don't expect to send 1 http request per message). > > >> > > >> Thank you very much for all your responses! > > >> > > >> > > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user > wrote: > > >> > > > >> > To enrich your data, have you checked > https://cloud.google.com/dataflow/docs/guides/enrichment? > > >> > > > >> > This transform is built on top of > https://beam.apache.org/documentation/io/built-in/webapis/ > > >> > > > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas < > ruben.var...@metova.com> wrote: > > >> >> > > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim > wrote: > > >> >> > > > >> >> > Here is an example from a book that I'm reading now and it may > be applicable. > > >> >> > > > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 > > >> >> > PYTHON - ord(id[0]) % 100 > > >> >> > > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks! > > >> >> > > >> >> > > > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian < > ged1...@gmail.com> wrote: > > >> >> >> > > >> >> >> How about just keeping track of a buffer and flush the buffer > after 100 messages and if there is a buffer on finish_bundle as well? > > >> >> >> > > >> >> >> > > >> >> > > >> >> If this is in memory, It could lead to potential loss of data. > That is > > >> >> why the state is used or at least that is my understanding. but > maybe > > >> >> there is a way to do this in the state? > > >> >> > > >> >> > > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas < > ruben.var...@metova.com> wrote: > > >> >> >>> > > >> >> >>> Hello guys > > >> >> >>> > > >> >> >>> Maybe this question was already answered, but I cannot find > it and > > >> >> >>> want some more input on this topic. > > >> >> >>> > > >> >> >>> I have some messages that don't have any particular key > candidate, > > >> >> >>> except the ID, but I don't want to use it because the idea is > to > > >> >> >>> group multiple IDs in the same batch. > > >> >> >>> > > >> >> >>> This is my use case: > > >> >> >>> > > >> >> >>> I have an endpoint where I'm gonna send the message ID, this > endpoint > > >> >> >>> is gonna return me certain information which I will use to > enrich my > > >> >> >>> message. In order to avoid fetching the endpoint per message I > want to > > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the > endpoint > > >> >> >>> supports it) . I was thinking on using GroupIntoBatches. > > >> >> >>> > > >> >> >>> - If I choose the ID as the key, my understanding is that it > won't > > >> >> >>> work in the way I want (because it will form batches of the > same ID). > > >> >> >>> - Use a constant will be a problem for parallelism, is that > correct? > > >> >> >>> > > >> >> >>> Then my question is, what should I use as a key? Maybe > something > > >> >> >>> regarding the timestamp? so I can have groups of messages that > arrive > > >> >> >>> at a certain second? > > >> >> >>> > > >> >> >>> Any suggestions would be appreciated > > >> >> >>> > > >> >> >>> Thanks. >
Re: Any recomendation for key for GroupIntoBatches
Is there a way to do batching in that transformation? I'm assuming for now no. or may be using in conjuntion with GoupIntoBatches On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas wrote: > > Interesting > > I think the cache feature could be interesting for some use cases I have. > > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu wrote: > > > > For the new web API IO, the page lists these features: > > > > developers provide minimal code that invokes Web API endpoint > > delegate to the transform to handle request retries and exponential backoff > > optional caching of request and response associations > > optional metrics > > > > > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas > > wrote: > >> > >> That one looks interesting > >> > >> What is not clear to me is what are the advantages of using it? Is > >> only the error/retry handling? anything in terms of performance? > >> > >> My PCollection is unbounded but I was thinking of sending my messages > >> in batches to the external API in order to gain some performance > >> (don't expect to send 1 http request per message). > >> > >> Thank you very much for all your responses! > >> > >> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user > >> wrote: > >> > > >> > To enrich your data, have you checked > >> > https://cloud.google.com/dataflow/docs/guides/enrichment? > >> > > >> > This transform is built on top of > >> > https://beam.apache.org/documentation/io/built-in/webapis/ > >> > > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas > >> > wrote: > >> >> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: > >> >> > > >> >> > Here is an example from a book that I'm reading now and it may be > >> >> > applicable. > >> >> > > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 > >> >> > PYTHON - ord(id[0]) % 100 > >> >> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks! > >> >> > >> >> > > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian > >> >> > wrote: > >> >> >> > >> >> >> How about just keeping track of a buffer and flush the buffer after > >> >> >> 100 messages and if there is a buffer on finish_bundle as well? > >> >> >> > >> >> >> > >> >> > >> >> If this is in memory, It could lead to potential loss of data. That is > >> >> why the state is used or at least that is my understanding. but maybe > >> >> there is a way to do this in the state? > >> >> > >> >> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas > >> >> >> wrote: > >> >> >>> > >> >> >>> Hello guys > >> >> >>> > >> >> >>> Maybe this question was already answered, but I cannot find it and > >> >> >>> want some more input on this topic. > >> >> >>> > >> >> >>> I have some messages that don't have any particular key candidate, > >> >> >>> except the ID, but I don't want to use it because the idea is to > >> >> >>> group multiple IDs in the same batch. > >> >> >>> > >> >> >>> This is my use case: > >> >> >>> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this > >> >> >>> endpoint > >> >> >>> is gonna return me certain information which I will use to enrich my > >> >> >>> message. In order to avoid fetching the endpoint per message I want > >> >> >>> to > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint > >> >> >>> supports it) . I was thinking on using GroupIntoBatches. > >> >> >>> > >> >> >>> - If I choose the ID as the key, my understanding is that it won't > >> >> >>> work in the way I want (because it will form batches of the same > >> >> >>> ID). > >> >> >>> - Use a constant will be a problem for parallelism, is that correct? > >> >> >>> > >> >> >>> Then my question is, what should I use as a key? Maybe something > >> >> >>> regarding the timestamp? so I can have groups of messages that > >> >> >>> arrive > >> >> >>> at a certain second? > >> >> >>> > >> >> >>> Any suggestions would be appreciated > >> >> >>> > >> >> >>> Thanks.
Re: Any recomendation for key for GroupIntoBatches
Interesting I think the cache feature could be interesting for some use cases I have. On Mon, Apr 15, 2024 at 9:18 AM XQ Hu wrote: > > For the new web API IO, the page lists these features: > > developers provide minimal code that invokes Web API endpoint > delegate to the transform to handle request retries and exponential backoff > optional caching of request and response associations > optional metrics > > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas wrote: >> >> That one looks interesting >> >> What is not clear to me is what are the advantages of using it? Is >> only the error/retry handling? anything in terms of performance? >> >> My PCollection is unbounded but I was thinking of sending my messages >> in batches to the external API in order to gain some performance >> (don't expect to send 1 http request per message). >> >> Thank you very much for all your responses! >> >> >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user wrote: >> > >> > To enrich your data, have you checked >> > https://cloud.google.com/dataflow/docs/guides/enrichment? >> > >> > This transform is built on top of >> > https://beam.apache.org/documentation/io/built-in/webapis/ >> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas >> > wrote: >> >> >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: >> >> > >> >> > Here is an example from a book that I'm reading now and it may be >> >> > applicable. >> >> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 >> >> > PYTHON - ord(id[0]) % 100 >> >> >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks! >> >> >> >> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian >> >> > wrote: >> >> >> >> >> >> How about just keeping track of a buffer and flush the buffer after >> >> >> 100 messages and if there is a buffer on finish_bundle as well? >> >> >> >> >> >> >> >> >> >> If this is in memory, It could lead to potential loss of data. That is >> >> why the state is used or at least that is my understanding. but maybe >> >> there is a way to do this in the state? >> >> >> >> >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas >> >> >> wrote: >> >> >>> >> >> >>> Hello guys >> >> >>> >> >> >>> Maybe this question was already answered, but I cannot find it and >> >> >>> want some more input on this topic. >> >> >>> >> >> >>> I have some messages that don't have any particular key candidate, >> >> >>> except the ID, but I don't want to use it because the idea is to >> >> >>> group multiple IDs in the same batch. >> >> >>> >> >> >>> This is my use case: >> >> >>> >> >> >>> I have an endpoint where I'm gonna send the message ID, this endpoint >> >> >>> is gonna return me certain information which I will use to enrich my >> >> >>> message. In order to avoid fetching the endpoint per message I want to >> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint >> >> >>> supports it) . I was thinking on using GroupIntoBatches. >> >> >>> >> >> >>> - If I choose the ID as the key, my understanding is that it won't >> >> >>> work in the way I want (because it will form batches of the same ID). >> >> >>> - Use a constant will be a problem for parallelism, is that correct? >> >> >>> >> >> >>> Then my question is, what should I use as a key? Maybe something >> >> >>> regarding the timestamp? so I can have groups of messages that arrive >> >> >>> at a certain second? >> >> >>> >> >> >>> Any suggestions would be appreciated >> >> >>> >> >> >>> Thanks.
Re: Any recomendation for key for GroupIntoBatches
For the new web API IO, the page lists these features: - developers provide minimal code that invokes Web API endpoint - delegate to the transform to handle request retries and exponential backoff - optional caching of request and response associations - optional metrics On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas wrote: > That one looks interesting > > What is not clear to me is what are the advantages of using it? Is > only the error/retry handling? anything in terms of performance? > > My PCollection is unbounded but I was thinking of sending my messages > in batches to the external API in order to gain some performance > (don't expect to send 1 http request per message). > > Thank you very much for all your responses! > > > On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user > wrote: > > > > To enrich your data, have you checked > https://cloud.google.com/dataflow/docs/guides/enrichment? > > > > This transform is built on top of > https://beam.apache.org/documentation/io/built-in/webapis/ > > > > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas > wrote: > >> > >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: > >> > > >> > Here is an example from a book that I'm reading now and it may be > applicable. > >> > > >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 > >> > PYTHON - ord(id[0]) % 100 > >> > >> Maybe this is what I'm looking for. I'll give it a try. Thanks! > >> > >> > > >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian > wrote: > >> >> > >> >> How about just keeping track of a buffer and flush the buffer after > 100 messages and if there is a buffer on finish_bundle as well? > >> >> > >> >> > >> > >> If this is in memory, It could lead to potential loss of data. That is > >> why the state is used or at least that is my understanding. but maybe > >> there is a way to do this in the state? > >> > >> > >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas > wrote: > >> >>> > >> >>> Hello guys > >> >>> > >> >>> Maybe this question was already answered, but I cannot find it and > >> >>> want some more input on this topic. > >> >>> > >> >>> I have some messages that don't have any particular key candidate, > >> >>> except the ID, but I don't want to use it because the idea is to > >> >>> group multiple IDs in the same batch. > >> >>> > >> >>> This is my use case: > >> >>> > >> >>> I have an endpoint where I'm gonna send the message ID, this > endpoint > >> >>> is gonna return me certain information which I will use to enrich my > >> >>> message. In order to avoid fetching the endpoint per message I want > to > >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint > >> >>> supports it) . I was thinking on using GroupIntoBatches. > >> >>> > >> >>> - If I choose the ID as the key, my understanding is that it won't > >> >>> work in the way I want (because it will form batches of the same > ID). > >> >>> - Use a constant will be a problem for parallelism, is that correct? > >> >>> > >> >>> Then my question is, what should I use as a key? Maybe something > >> >>> regarding the timestamp? so I can have groups of messages that > arrive > >> >>> at a certain second? > >> >>> > >> >>> Any suggestions would be appreciated > >> >>> > >> >>> Thanks. >
Re: Any recomendation for key for GroupIntoBatches
That one looks interesting What is not clear to me is what are the advantages of using it? Is only the error/retry handling? anything in terms of performance? My PCollection is unbounded but I was thinking of sending my messages in batches to the external API in order to gain some performance (don't expect to send 1 http request per message). Thank you very much for all your responses! On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user wrote: > > To enrich your data, have you checked > https://cloud.google.com/dataflow/docs/guides/enrichment? > > This transform is built on top of > https://beam.apache.org/documentation/io/built-in/webapis/ > > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas wrote: >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: >> > >> > Here is an example from a book that I'm reading now and it may be >> > applicable. >> > >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 >> > PYTHON - ord(id[0]) % 100 >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks! >> >> > >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian >> > wrote: >> >> >> >> How about just keeping track of a buffer and flush the buffer after 100 >> >> messages and if there is a buffer on finish_bundle as well? >> >> >> >> >> >> If this is in memory, It could lead to potential loss of data. That is >> why the state is used or at least that is my understanding. but maybe >> there is a way to do this in the state? >> >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas >> >> wrote: >> >>> >> >>> Hello guys >> >>> >> >>> Maybe this question was already answered, but I cannot find it and >> >>> want some more input on this topic. >> >>> >> >>> I have some messages that don't have any particular key candidate, >> >>> except the ID, but I don't want to use it because the idea is to >> >>> group multiple IDs in the same batch. >> >>> >> >>> This is my use case: >> >>> >> >>> I have an endpoint where I'm gonna send the message ID, this endpoint >> >>> is gonna return me certain information which I will use to enrich my >> >>> message. In order to avoid fetching the endpoint per message I want to >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint >> >>> supports it) . I was thinking on using GroupIntoBatches. >> >>> >> >>> - If I choose the ID as the key, my understanding is that it won't >> >>> work in the way I want (because it will form batches of the same ID). >> >>> - Use a constant will be a problem for parallelism, is that correct? >> >>> >> >>> Then my question is, what should I use as a key? Maybe something >> >>> regarding the timestamp? so I can have groups of messages that arrive >> >>> at a certain second? >> >>> >> >>> Any suggestions would be appreciated >> >>> >> >>> Thanks.
Re: Any recomendation for key for GroupIntoBatches
To enrich your data, have you checked https://cloud.google.com/dataflow/docs/guides/enrichment? This transform is built on top of https://beam.apache.org/documentation/io/built-in/webapis/ On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas wrote: > On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: > > > > Here is an example from a book that I'm reading now and it may be > applicable. > > > > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 > > PYTHON - ord(id[0]) % 100 > > Maybe this is what I'm looking for. I'll give it a try. Thanks! > > > > > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian > wrote: > >> > >> How about just keeping track of a buffer and flush the buffer after 100 > messages and if there is a buffer on finish_bundle as well? > >> > >> > > If this is in memory, It could lead to potential loss of data. That is > why the state is used or at least that is my understanding. but maybe > there is a way to do this in the state? > > > >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas > wrote: > >>> > >>> Hello guys > >>> > >>> Maybe this question was already answered, but I cannot find it and > >>> want some more input on this topic. > >>> > >>> I have some messages that don't have any particular key candidate, > >>> except the ID, but I don't want to use it because the idea is to > >>> group multiple IDs in the same batch. > >>> > >>> This is my use case: > >>> > >>> I have an endpoint where I'm gonna send the message ID, this endpoint > >>> is gonna return me certain information which I will use to enrich my > >>> message. In order to avoid fetching the endpoint per message I want to > >>> batch it in 100 and send the 100 IDs in one request ( the endpoint > >>> supports it) . I was thinking on using GroupIntoBatches. > >>> > >>> - If I choose the ID as the key, my understanding is that it won't > >>> work in the way I want (because it will form batches of the same ID). > >>> - Use a constant will be a problem for parallelism, is that correct? > >>> > >>> Then my question is, what should I use as a key? Maybe something > >>> regarding the timestamp? so I can have groups of messages that arrive > >>> at a certain second? > >>> > >>> Any suggestions would be appreciated > >>> > >>> Thanks. >
Re: Any recomendation for key for GroupIntoBatches
On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: > > Here is an example from a book that I'm reading now and it may be applicable. > > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 > PYTHON - ord(id[0]) % 100 Maybe this is what I'm looking for. I'll give it a try. Thanks! > > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian wrote: >> >> How about just keeping track of a buffer and flush the buffer after 100 >> messages and if there is a buffer on finish_bundle as well? >> >> If this is in memory, It could lead to potential loss of data. That is why the state is used or at least that is my understanding. but maybe there is a way to do this in the state? >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas wrote: >>> >>> Hello guys >>> >>> Maybe this question was already answered, but I cannot find it and >>> want some more input on this topic. >>> >>> I have some messages that don't have any particular key candidate, >>> except the ID, but I don't want to use it because the idea is to >>> group multiple IDs in the same batch. >>> >>> This is my use case: >>> >>> I have an endpoint where I'm gonna send the message ID, this endpoint >>> is gonna return me certain information which I will use to enrich my >>> message. In order to avoid fetching the endpoint per message I want to >>> batch it in 100 and send the 100 IDs in one request ( the endpoint >>> supports it) . I was thinking on using GroupIntoBatches. >>> >>> - If I choose the ID as the key, my understanding is that it won't >>> work in the way I want (because it will form batches of the same ID). >>> - Use a constant will be a problem for parallelism, is that correct? >>> >>> Then my question is, what should I use as a key? Maybe something >>> regarding the timestamp? so I can have groups of messages that arrive >>> at a certain second? >>> >>> Any suggestions would be appreciated >>> >>> Thanks.
Re: Any recomendation for key for GroupIntoBatches
Here is an example from a book that I'm reading now and it may be applicable. JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100 PYTHON - ord(id[0]) % 100 On Sat, 13 Apr 2024 at 06:12, George Dekermenjian wrote: > How about just keeping track of a buffer and flush the buffer after 100 > messages and if there is a buffer on finish_bundle as well? > > > On Fri, Apr 12, 2024 at 21.23 Ruben Vargas > wrote: > >> Hello guys >> >> Maybe this question was already answered, but I cannot find it and >> want some more input on this topic. >> >> I have some messages that don't have any particular key candidate, >> except the ID, but I don't want to use it because the idea is to >> group multiple IDs in the same batch. >> >> This is my use case: >> >> I have an endpoint where I'm gonna send the message ID, this endpoint >> is gonna return me certain information which I will use to enrich my >> message. In order to avoid fetching the endpoint per message I want to >> batch it in 100 and send the 100 IDs in one request ( the endpoint >> supports it) . I was thinking on using GroupIntoBatches. >> >> - If I choose the ID as the key, my understanding is that it won't >> work in the way I want (because it will form batches of the same ID). >> - Use a constant will be a problem for parallelism, is that correct? >> >> Then my question is, what should I use as a key? Maybe something >> regarding the timestamp? so I can have groups of messages that arrive >> at a certain second? >> >> Any suggestions would be appreciated >> >> Thanks. >> >
Re: Any recomendation for key for GroupIntoBatches
How about just keeping track of a buffer and flush the buffer after 100 messages and if there is a buffer on finish_bundle as well? On Fri, Apr 12, 2024 at 21.23 Ruben Vargas wrote: > Hello guys > > Maybe this question was already answered, but I cannot find it and > want some more input on this topic. > > I have some messages that don't have any particular key candidate, > except the ID, but I don't want to use it because the idea is to > group multiple IDs in the same batch. > > This is my use case: > > I have an endpoint where I'm gonna send the message ID, this endpoint > is gonna return me certain information which I will use to enrich my > message. In order to avoid fetching the endpoint per message I want to > batch it in 100 and send the 100 IDs in one request ( the endpoint > supports it) . I was thinking on using GroupIntoBatches. > > - If I choose the ID as the key, my understanding is that it won't > work in the way I want (because it will form batches of the same ID). > - Use a constant will be a problem for parallelism, is that correct? > > Then my question is, what should I use as a key? Maybe something > regarding the timestamp? so I can have groups of messages that arrive > at a certain second? > > Any suggestions would be appreciated > > Thanks. >