Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Reuven Lax via user
There are various strategies. Here is an example of how Beam does it (taken
from Reshuffle.viaRandomKey().withNumBuckets(N)

Note that this does some extra hashing to work around issues with the Spark
runner. If you don't care about that, you could implement something simpler
(e.g. initialize shard to a random number in StartBundle, and increment it
mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
  private int shard;
  private @Nullable Integer numBuckets;

  public AssignShardFn(@Nullable Integer numBuckets) {
this.numBuckets = numBuckets;
  }

  @Setup
  public void setup() {
shard = ThreadLocalRandom.current().nextInt();
  }

  @ProcessElement
  public void processElement(@Element T element,
OutputReceiver> r) {
++shard;
// Smear the shard into something more random-looking, to avoid issues
// with runners that don't properly hash the key being shuffled, but rely
// on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
// which for Integer is a no-op and it is an issue:
// 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
// spark.html
// This hashing strategy is copied from
// 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
if (numBuckets != null) {
  UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
  hashOfShard =
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
}
r.output(KV.of(hashOfShard, element));
  }
}



On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
wrote:

> Good day, Ruben,
>
> Would you be able to compute a shasum on the group of IDs to use as the
> key?
>
> Best,
>
> Damon
>
> On 2024/04/12 19:22:45 Ruben Vargas wrote:
> > Hello guys
> >
> > Maybe this question was already answered, but I cannot find it  and
> > want some more input on this topic.
> >
> > I have some messages that don't have any particular key candidate,
> > except the ID,  but I don't want to use it because the idea is to
> > group multiple IDs in the same batch.
> >
> > This is my use case:
> >
> > I have an endpoint where I'm gonna send the message ID, this endpoint
> > is gonna return me certain information which I will use to enrich my
> > message. In order to avoid fetching the endpoint per message I want to
> > batch it in 100 and send the 100 IDs in one request ( the endpoint
> > supports it) . I was thinking on using GroupIntoBatches.
> >
> > - If I choose the ID as the key, my understanding is that it won't
> > work in the way I want (because it will form batches of the same ID).
> > - Use a constant will be a problem for parallelism, is that correct?
> >
> > Then my question is, what should I use as a key? Maybe something
> > regarding the timestamp? so I can have groups of messages that arrive
> > at a certain second?
> >
> > Any suggestions would be appreciated
> >
> > Thanks.
> >
>


Re: How to handle Inheritance with AutoValueSchema

2024-04-09 Thread Reuven Lax via user
I don't see any unit tests for inherited AutoValue accessors, so I suspect
it simply does not work today with AutoValueSchema. This is something
that's probably fixable (though such a fix does risk breaking some users).

On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas 
wrote:

> Hello Guys
>
> I have a PCollection with a "Session" object, inside that object I
> have a list of events
>
> For each event, I have different types, EventA, EventB, EventC and so
> on.. all of them extend from Event, which will contain common fields.
>
> According to the AutoValue documentation, inheritance from another
> AutoValue class is not supported. but extend to have the fields is.
> (
> https://github.com/google/auto/blob/main/value/userguide/howto.md#-have-autovalue-also-implement-abstract-methods-from-my-supertypes
> )
>
> But when I run my pipeline, it fails with an NPE.
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:153)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:143)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> ~[?:?]
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> ~[?:?]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:?]
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:144)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:138)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:93)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:145)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:130)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
> ~[beam-sdks-java-core-2.55.0.jar:?]
>
>
> Is this not supported? or is it a Bug?  should I file an issue in GH then?
>
> Thanks
>


Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are ways you can manually force the coder here. However I would first
try to split up the KV creation into two operations. Have ProcessEvents
just create a PCollection, and then a following operation
to create the KV. Something like this:

input.apply(ParDo.of(New ProcessEvents()))
.apply(WithKeys.of((SerializableFunction)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));

I suspect that this will allow the mechanism to better infer the final
Coder. If that doesn't work, you could always brute force it like this:

PCollection coreEvents = input.apply(ParDo.of(New
ProcessEvents()));
coreEvents.apply(WithKeys.of((SerializableFunction)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
 .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
 .apply(Reshuffle.of())
 ... etc.


On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas  wrote:

> ProcessEvents receive as an input a Session object and créate a KV SharedCoreEvent> as an output
>
> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user <
> user@beam.apache.org> escribió:
>
>> There are some sharp edges unfortunately around auto-inference of KV
>> coders and schemas. Is there a previous PCollection of type
>> SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents?
>>
>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas 
>> wrote:
>>
>>> Hello guys
>>>
>>> I have a question, is it possible to use KV along with AutoValueSchema
>>> objects? I'm having troubles when I try to use it together.
>>>
>>> I have an object like the following
>>>
>>> @AutoValue
>>> @DefaultSchema(AutoValueSchema.class)
>>> public abstract class SharedCoreEvent {
>>>
>>> @JsonProperty("subscriptionId")
>>> public abstract String getSubscription();
>>>
>>> 
>>> }
>>>
>>> Then I have a pipeline like the following:
>>>
>>>  input.apply(ParDo.of(new ProcessEvents()))
>>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>>
>>> My input is a single object and my ProcessEvents will produce tons of
>>> events, in a fan-out fashion. that is why I used Reshuffle here
>>>
>>> But when I run this pipeline it throws the following error:
>>>
>>> lang.IllegalStateException: Unable to return a default Coder for
>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>>> [PCollection@2131266396]. Correct one of the following root causes:
>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>
>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>> coder for parameterized type
>>> org.apache.beam.sdk.values.KV:
>>> Unable to provide a Coder for events.SharedCoreEvent
>>>   Building a Coder using a registered CoderProvider failed.
>>>
>>>
>>> Something similar happens with my source when I use KafkaIO and the
>>> source produces a KV  PCollection.
>>>
>>> Is there any reason for this? Maybe I'm misusing the schemas?
>>>
>>> Really appreciate your help
>>>
>>> Thanks
>>> Ruben
>>>
>>


Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are some sharp edges unfortunately around auto-inference of KV coders
and schemas. Is there a previous PCollection of type SharedCoreEvent, or is
the SharedCoreEvent created in ProcessEvents?

On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas  wrote:

> Hello guys
>
> I have a question, is it possible to use KV along with AutoValueSchema
> objects? I'm having troubles when I try to use it together.
>
> I have an object like the following
>
> @AutoValue
> @DefaultSchema(AutoValueSchema.class)
> public abstract class SharedCoreEvent {
>
> @JsonProperty("subscriptionId")
> public abstract String getSubscription();
>
> 
> }
>
> Then I have a pipeline like the following:
>
>  input.apply(ParDo.of(new ProcessEvents()))
> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>
> My input is a single object and my ProcessEvents will produce tons of
> events, in a fan-out fashion. that is why I used Reshuffle here
>
> But when I run this pipeline it throws the following error:
>
> lang.IllegalStateException: Unable to return a default Coder for
> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
> [PCollection@2131266396]. Correct one of the following root causes:
>   No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Cannot provide
> coder for parameterized type
> org.apache.beam.sdk.values.KV:
> Unable to provide a Coder for events.SharedCoreEvent
>   Building a Coder using a registered CoderProvider failed.
>
>
> Something similar happens with my source when I use KafkaIO and the
> source produces a KV  PCollection.
>
> Is there any reason for this? Maybe I'm misusing the schemas?
>
> Really appreciate your help
>
> Thanks
> Ruben
>


Re: Some events are discarded from a FixedWindow

2024-02-21 Thread Reuven Lax via user
On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user <
user@beam.apache.org> wrote:

> Hi,
>
>
>
> We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a
> bunch of events from Kafka and should execute an SQL command on a 1-hour
> window. Some of the events arrive late.
>
> I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s
> fields as the timestamp.
>
> For the aggregation, it’s important that the window triggers *exactly
> once* with all the events, with allowed lateness of 3 minutes. I defined
> the window as:
>
>
>
> final PCollection windowSelectFields = selectFields
>
> .apply("Windowing", Window
>
>
> .into(FixedWindows.of(Duration.standardHours(1)))
>
> .triggering(Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3)))
>
> )
>
> .withAllowedLateness(Duration.standardMinutes(3))
>
> .accumulatingFiredPanes()
>
> );
>
>
>
> When tested on a smaller window with a small number of events, I see that
> the first 3 out of 10 events are being discarded. From the log, it looks
> like the trigger is executed *1 second ahead of time*. I suspect that as
> a result, its shouldFire() method returns false, since 3 minutes have not
> passed yet.
>

Processing-time triggers are based on the local clock on a worker, and
clocks can skew between workers (they can even skew between different
processes on the same worker).


>
>
> 2024-02-21 16:27:08,079 DEBUG
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
> [] - Setting timer: 1:1708533008079 at 1708533008079 with output time
> 1708533008079.  (that is *4:30:08.079 PM*)
>
>
>
> And later on:
>
>
>
> 2024-02-21 *16:30:07,944* DEBUG
> org.apache.beam.sdk.util.WindowTracing   [] -
> ReduceFnRunner: Received timer key:Row:
>
> call_direction:-1729318488
>
> ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z);
> data:TimerData{timerId=1:1708533008079, timerFamilyId=,
> namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)),
> timestamp=2024-02-*21T16:30:08.079Z*,
> outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME,
> deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z;
> outputWatermark:2024-02-21T16:18:04.000Z
>
>
>
> Is my understanding correct?
>
> Did I define the window and timestamps correctly?
>
> Any help would be appreciated.
>
>
>
> Thanks,
>
> Ifat
>
>
>


Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
If you want to define the sql query via configuration, does that mean you
know the query when you launch the pipeline (as configuration is set at
launch time)? If so, you can also dynamically set the schema at pipeline
launch time.

On Sun, Jan 28, 2024 at 11:21 AM Sigalit Eliazov 
wrote:

> Im trying to define a generic pipeline that reads message from kafka where
> the topic is configurable.
> We read a generic record and schema id is part of thr consumed message .
> So this part can be generalized
>
> Then we would like to activate different queries on the stream.
> I would like to be able to define the sql query via configuration.
> In addition in our use case the kafka message schema and the row schema
> are pretty much the same. So i wonder if i could reuse it.
>
> Thanks
> Sigalit
>
> בתאריך יום א׳, 28 בינו׳ 2024, 20:23, מאת Reuven Lax via user ‏<
> user@beam.apache.org>:
>
>> Can you explain the use case a bit more? In order to write a SQL
>> statement (at least one that doesn't use wildcard selection) you also need
>> to know the schema ahead of time. What are you trying to accomplish with
>> these dynamic schemas?
>>
>> Reuven
>>
>> On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov 
>> wrote:
>>
>>> Hello, In the upcoming process, we extract Avro messages from Kafka 
>>> utilizing the Confluent Schema Registry.
>>>
>>> Our intention is to implement SQL queries on the streaming data.
>>>
>>>
>>> As far as I understand, since I am using the Flink runner, when creating  
>>> the features PCollection, I must specify the
>>>
>>> row schema or a coder.
>>>
>>>
>>> I am interested in utilizing the schema obtained from the recently read 
>>> message (refer to ConvertRow).
>>>
>>> Is it possible to accomplish this when executing on a Flinkrunner?
>>>
>>> I noticed that the Flink runner anticipates the row schema to be 
>>> predetermined during pipeline deployment.
>>>
>>>
>>> Are there any potential solutions or workarounds for this situation?
>>>
>>>
>>> public class BeamSqlTest {
>>>
>>>
>>> public static void main(String[] args) {
>>>
>>> Pipeline pipeline;
>>> PCollection> 
>>> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
>>> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>>>  options.getSourceKafkaTopic(), PIPELINE_NAME));
>>> PCollection> avroMessages = 
>>> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
>>> ConvertFromKafkaRecord<>()));
>>>
>>> PCollection features = avroMessages.apply(ParDo.of(new 
>>> ConvertToRow())).setRowSchema(XXX);
>>> final PCollection select_fields = features.apply("Select 
>>> Fields", Select.fieldNames("X","Y","Z"));
>>>
>>> final PCollection windowRes = select_fields.apply("Windowing", 
>>> Window.into(FixedWindows.of(Duration.standardMinutes(1;
>>> PCollection outputStream = 
>>> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
>>> pipeline.run().waitUntilFinish();
>>> }
>>>
>>> @AllArgsConstructor
>>> public static class ConvertToRow extends DoFn>> GenericRecord>, Row> {
>>> @ProcessElement
>>> @SuppressWarnings({"ConstantConditions", "unused"})
>>> public void processElement(ProcessContext c) {
>>> GenericRecord record = c.element().getValue();
>>> final org.apache.avro.Schema avroSchema = record.getSchema();
>>> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>>>
>>> Object x = record.get("X");
>>> Object y = record.get("Y");
>>> Object z = record.get("Z");
>>> Row row = Row.withSchema(schema).addValues(x, y, z).build();
>>> c.output(row);
>>> }
>>> }
>>> }
>>>
>>>
>>> Thanks
>>>
>>> Sigalit
>>>
>>>


Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
Can you explain the use case a bit more? In order to write a SQL statement
(at least one that doesn't use wildcard selection) you also need to know
the schema ahead of time. What are you trying to accomplish with these
dynamic schemas?

Reuven

On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov  wrote:

> Hello, In the upcoming process, we extract Avro messages from Kafka utilizing 
> the Confluent Schema Registry.
>
> Our intention is to implement SQL queries on the streaming data.
>
>
> As far as I understand, since I am using the Flink runner, when creating  the 
> features PCollection, I must specify the
>
> row schema or a coder.
>
>
> I am interested in utilizing the schema obtained from the recently read 
> message (refer to ConvertRow).
>
> Is it possible to accomplish this when executing on a Flinkrunner?
>
> I noticed that the Flink runner anticipates the row schema to be 
> predetermined during pipeline deployment.
>
>
> Are there any potential solutions or workarounds for this situation?
>
>
> public class BeamSqlTest {
>
>
> public static void main(String[] args) {
>
> Pipeline pipeline;
> PCollection> 
> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>  options.getSourceKafkaTopic(), PIPELINE_NAME));
> PCollection> avroMessages = 
> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
> ConvertFromKafkaRecord<>()));
>
> PCollection features = avroMessages.apply(ParDo.of(new 
> ConvertToRow())).setRowSchema(XXX);
> final PCollection select_fields = features.apply("Select 
> Fields", Select.fieldNames("X","Y","Z"));
>
> final PCollection windowRes = select_fields.apply("Windowing", 
> Window.into(FixedWindows.of(Duration.standardMinutes(1;
> PCollection outputStream = 
> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
> pipeline.run().waitUntilFinish();
> }
>
> @AllArgsConstructor
> public static class ConvertToRow extends DoFn, 
> Row> {
> @ProcessElement
> @SuppressWarnings({"ConstantConditions", "unused"})
> public void processElement(ProcessContext c) {
> GenericRecord record = c.element().getValue();
> final org.apache.avro.Schema avroSchema = record.getSchema();
> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>
> Object x = record.get("X");
> Object y = record.get("Y");
> Object z = record.get("Z");
> Row row = Row.withSchema(schema).addValues(x, y, z).build();
> c.output(row);
> }
> }
> }
>
>
> Thanks
>
> Sigalit
>
>


Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-21 Thread Reuven Lax via user
Cloud Storage subscriptions are a reasonable way to backup data to storage,
and you can then run a batch pipeline over the GCS files. Keep in mind that
these files might contain duplicates (the storage subscriptions do not
guarantee exactly-once writes). If this is a problem, you should add a
deduplication stage to the batch job that processes these files.

On Sun, Jan 21, 2024 at 2:45 AM Alex Van Boxel  wrote:

> There are some valid use cases where you want to handle data going over
> Pubsub to handle in batch. It's way too expensive to run a simple daily
> extract from the data over Pubsub; batch is a lot cheaper.
>
> What we do is backup the data to Cloud Storage; Pubsub has recently added
> a nice feature that can help you:
>
>- https://cloud.google.com/pubsub/docs/cloudstorage
>-
>
> https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#subscription_properties
>
> This reduced our cost dramatically. We had a Dataflow doing the backup to
> Cloud Storage, but the above feature is way cheaper. Use the export to Avro
> (the schema is in the second link), and then your batch beam pipeline input
> is a bounded input.
>
>  _/
> _/ Alex Van Boxel
>
>
> On Fri, Jan 19, 2024 at 12:18 AM Reuven Lax via user 
> wrote:
>
>> Some comments here:
>>1. All messages in a PubSub topic is not a well-defined statement, as
>> there can always be more messages published. You may know that nobody will
>> publish any more messages, but the pipeline does not.
>>2. While it's possible to read from Pub/Sub in batch, it's usually not
>> recommended. For one thing I don't think that the batch runner can maintain
>> exactly-once processing when reading from Pub/Sub.
>>3. In Java you can turn an unbounded source (Pub/Sub) into a bounded
>> source that can in theory be used for batch jobs. However this is done by
>> specifying either the max time to read or the max number of messages. I
>> don't think there's any way to automatically read the Pub/Sub topic until
>> there are no more messages in it.
>>
>> Reuven
>>
>> On Thu, Jan 18, 2024 at 2:25 AM Sumit Desai via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I want to create a Dataflow pipeline using Pub/sub as an input connector
>>> but I want to run it in batch mode and not streaming mode. I know it's not
>>> possible in Python but how can I achieve this in Java? Basically, I want my
>>> pipeline to read all messages in a Pubsub topic, process and terminate.
>>> Please suggest.
>>>
>>> Thanks & Regards,
>>> Sumit Desai
>>>
>>


Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-18 Thread Reuven Lax via user
Some comments here:
   1. All messages in a PubSub topic is not a well-defined statement, as
there can always be more messages published. You may know that nobody will
publish any more messages, but the pipeline does not.
   2. While it's possible to read from Pub/Sub in batch, it's usually not
recommended. For one thing I don't think that the batch runner can maintain
exactly-once processing when reading from Pub/Sub.
   3. In Java you can turn an unbounded source (Pub/Sub) into a bounded
source that can in theory be used for batch jobs. However this is done by
specifying either the max time to read or the max number of messages. I
don't think there's any way to automatically read the Pub/Sub topic until
there are no more messages in it.

Reuven

On Thu, Jan 18, 2024 at 2:25 AM Sumit Desai via user 
wrote:

> Hi all,
>
> I want to create a Dataflow pipeline using Pub/sub as an input connector
> but I want to run it in batch mode and not streaming mode. I know it's not
> possible in Python but how can I achieve this in Java? Basically, I want my
> pipeline to read all messages in a Pubsub topic, process and terminate.
> Please suggest.
>
> Thanks & Regards,
> Sumit Desai
>


Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-12 Thread Reuven Lax via user
Are you setting the enable_custom_pubsub_source experiment by any chance?

On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin  wrote:

> Hi all,
>
> When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to
> 2.52.0, an incompatibility warning is surfaced that prevents pipeline
> upgrade:
>
>
>> The Coder or type for step .../PubsubUnboundedSource has changed
>
>
> Was there an intentional coder change introduced for PubsubMessage in
> 2.52.0?  I didn't note anything in the release notes
> https://beam.apache.org/blog/beam-2.52.0/ nor recent changes in
> PubsubMessageWithAttributesCoder[1].  Specifically the step uses
> `PubsubMessageWithAttributesCoder` via
> `PubsubIO.readMessagesWithAttributes()`
>
> Thanks!
>
>
> [1]
> https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36
>


Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread Reuven Lax via user
This is the stack trace of the rethrown exception. The log should also
contain a "caused by" log somewhere detailing the original exception. Do
you happen to have that?

On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com  wrote:

> Here is the complete stacktrace  It doesn't even hit my code and it
> happens consistently!
>
> Error message from worker: java.lang.RuntimeException:
> java.lang.IllegalStateException
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
> Caused by: java.lang.IllegalStateException
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430)
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)
>
> Regards,
> Siyuan
>
> On Wed, Dec 6, 2023 at 10:12 AM Ahmed Abualsaud 
> wrote:
>
>> Hey, can you provide the full stack trace for the error you're seeing?
>> Also is this happening consistently?
>>
>> *+1* to raising a Google ticket where we'll have more visibility.
>>
>> On Wed, Dec 6, 2023 at 11:33 AM John Casey 
>> wrote:
>>
>>> Hmm. It may be best if you raise a ticket with 

Re: simplest way to do exponential moving average?

2023-10-02 Thread Reuven Lax via user
On Mon, Oct 2, 2023 at 2:00 AM Jan Lukavský  wrote:

> Hi,
>
> this depends on how exactly you plan to calculate the average. The
> original definition is based on exponentially decreasing weight of more
> distant (older if time is on the x-axis) data points. This (technically)
> means that this average at any point X1 depends on all values X0 <= X1.
> This would therefore require buffering (using GroupByKey) all elements in
> global window, doing the sorting manually and then computing the new value
> of the average triggering after each element. This is probably the
> technically correct, but most computationally intensive variant.
>

To clarify - you would probably buffer the elements in OrderedListState,
and set periodic event-time timers to fetch them and compute the average.
OrderedListState will return the elements in order, so you wouldn't have to
sort. This is assuming you are talking about streaming pipelines.


> If the average is done over time intervals, then an other option could be
> to define a cut-off interval T, i.e. set the exponentially vanishing weight
> of value of data points to be zero at some T0 < T1 - T. If the data points
> come at some discrete time-intervals (minutes, hours, days), then this
> could mean you can split the data into time sliding windows (window
> interval being the cut-off interval, and slide the update interval) and
> assign weight for each data point in the particular time interval - i.e.
> how much weight does the data point have at the time of end of the sliding
> window. With this you could then using CombineFn to count and sum the
> weighted averages, which would be much more efficient.
>
> Best,
>
>  Jan
> On 9/30/23 17:08, Balogh, György wrote:
>
> Hi,
> I want to calculate the exponential moving average of a signal using beam
> in java.
> I understand there is no time order guarantee on incoming data. What would
> be the simplest solution for this?
> Thank you,
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>


Re: [QUESTION] Why no auto labels?

2023-10-01 Thread Reuven Lax via user
Are you talking about transform names? The main reason was because for
runners that support updating pipelines in place, the only way to do so
safely is if the runner can perfectly identify which transforms in the new
graph match the ones in the old graph. There's no good way to auto generate
names that will stay stable across updates - even small changes to the
pipeline might change the order of nodes in the graph, which could result
in a corrupted update.

However, if you don't care about update, Beam can auto generate these names
for you! When you call PCollection.apply (if using BeamJava), simply omit
the name parameter and Beam will auto generate a unique name for you.

Reuven

On Sat, Sep 30, 2023 at 11:54 AM Joey Tran 
wrote:

> After writing a few pipelines now, I keep getting tripped up from
> accidentally have duplicate labels from using multiple of the same
> transforms without labeling them. I figure this must be a common complaint,
> so I was just curious, what the rationale behind this design was? My naive
> thought off the top of my head is that it'd be more user friendly to just
> auto increment duplicate transforms, but I figure I must be missing
> something
>
> Cheers,
> Joey
>


Re: @FieldAccess parameter types not being enforced vs corresponding schema types in Java DoFn

2023-09-18 Thread Reuven Lax via user
Good question - I know it will be enforced at runtime (I think you'll get a
ClassCastException if things don't match) but I'd have to check to see if
we enforce it at graph-submission time.

If we don't have this validation in place, adding it would be an
improvement.

On Mon, Sep 18, 2023 at 3:04 PM Varun Golani 
wrote:

> I've been looking at the following session from the 2019 Beam Summit
> covering Schema-aware PCollections (18:00 -
> https://www.youtube.com/watch?v=aRIZXtQiCHw). The specific part of the
> video mentions that for fields accessed via the @FieldAccess annotation,
> Beam checks for both the existence of the field AND matches the type during
> pipeline construction time.
>
> When trying this out on my application locally, I do see that Beam flags
> up incorrectly specified fields that don't exist in the PCollection's
> schema. However it seems as if it doesn't enforce/check the types of the
> parameter variable match the types specified in the Schema.
>
> For instance if I have a PCollection containing an attribute "field1" with
> type String, when referring to this attribute in my DoFn using 
> *@FieldAccess("field1")
> Double field1 *it does NOT flag this String/Double type mismatch during
> pipeline construction similar to how it would do if I refer to an
> attribute that doesn't exist in the schema.
>
> Is this expected behaviour? If so, would there be any way in which I could
> write a custom validator which strongly enforces @FieldAccess parameter
> types against the types in the corresponding schema?
>


Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Correct - I was referring to Java.

On Fri, Sep 15, 2023 at 9:55 AM Robert Bradshaw  wrote:

> On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user 
> wrote:
>
>> Creating composite DoFns is tricky today due to how they are implemented
>> (via annotated methods).
>>
>
> Note that this depends on the language. This should be really easy to do
> from Python.
>
>
>> However providing such a method to compose DoFns would be very useful IMO.
>>
>
> +1
>
>
>> On Fri, Sep 15, 2023 at 9:33 AM Joey Tran 
>> wrote:
>>
>>> Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
>>> sounds like the likely solution, was just hoping there'd be one that could
>>> wrap at the PTransform level but I realize now the PTransform abstraction
>>> is too general as you mentioned to do something like that.
>>>
>>> (2) will be likely what we do, though now I'm wondering if it might be
>>> possible to create a ParDo wrapper that can take a ParDo, extract it's
>>> dofn, wrap it, and return a new ParDo
>>>
>>> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> +1 to looking at composite transforms. You could even have a composite
>>>> transform that takes another transform as one of its construction arguments
>>>> and whose expand method does pre- and post-processing to the inputs/outputs
>>>> before/after applying the transform in question. (You could even implement
>>>> this as a Python decorator if you really wanted, either decorating the
>>>> expand method itself or the full class...)
>>>>
>>>> One of the difficulties is that for a general transform there isn't
>>>> necessarily a 1:N relationship between outputs and inputs as one has for a
>>>> DoFn (especially if there is any aggregation involved). There are, however,
>>>> two partial solutions that might help.
>>>>
>>>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
>>>> returns at most N elements. You could do this with a CombinePerKey if you
>>>> can come up with a reasonable key (e.g. the id of your input elements) that
>>>> the limit should be a applied to. Note that this may cause a lot of data to
>>>> be shuffled (though due to combiner lifting, no more than N per bundle).
>>>>
>>>> (2) You could have a DoFn that limits to N per bundle by initializing a
>>>> counter in its start_bundle and passing elements through until the counter
>>>> reaches a threshold. (Again, one could do this per id if one is available.)
>>>> It wouldn't stop production of the elements, but if things get fused it
>>>> would still likely be fairly cheap.
>>>>
>>>> Both of these could be prepended to the problematic consuming
>>>> PTransform as well.
>>>>
>>>> - Robert
>>>>
>>>>
>>>>
>>>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran 
>>>> wrote:
>>>>
>>>>> I'm aware of composite transforms and of the distributed nature of
>>>>> PTransforms. I'm not suggesting limiting the entire set and my example was
>>>>> more illustrative than the actual use case.
>>>>>
>>>>> My actual use case is basically: I have multiple PTransforms, and
>>>>> let's say most of them average ~100 generated outputs for a single input.
>>>>> Most of these PTransforms will occasionally run into an input though that
>>>>> might output maybe 1M outputs. This can cause issues if for example there
>>>>> are transforms that follow it that require a lot of compute per input.
>>>>>
>>>>> The simplest way to deal with this is to modify the `DoFn`s in our
>>>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>>>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>>>>> our transforms, but it'd be much cleaner if we could lift up this limiting
>>>>> logic out of the application logic and have some generic wrapper that
>>>>> extends our transforms.
>>>>>
>>>>> Thanks for the discussion!
>>>>>
>>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>>>> aromanenko@gmail.com> wrote:
>>>>>
>>>>>> I don’t think it’s possible to extend in a way that you are 

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Creating composite DoFns is tricky today due to how they are implemented
(via annotated methods). However providing such a method to compose DoFns
would be very useful IMO.

On Fri, Sep 15, 2023 at 9:33 AM Joey Tran  wrote:

> Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
> sounds like the likely solution, was just hoping there'd be one that could
> wrap at the PTransform level but I realize now the PTransform abstraction
> is too general as you mentioned to do something like that.
>
> (2) will be likely what we do, though now I'm wondering if it might be
> possible to create a ParDo wrapper that can take a ParDo, extract it's
> dofn, wrap it, and return a new ParDo
>
> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> +1 to looking at composite transforms. You could even have a composite
>> transform that takes another transform as one of its construction arguments
>> and whose expand method does pre- and post-processing to the inputs/outputs
>> before/after applying the transform in question. (You could even implement
>> this as a Python decorator if you really wanted, either decorating the
>> expand method itself or the full class...)
>>
>> One of the difficulties is that for a general transform there isn't
>> necessarily a 1:N relationship between outputs and inputs as one has for a
>> DoFn (especially if there is any aggregation involved). There are, however,
>> two partial solutions that might help.
>>
>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
>> returns at most N elements. You could do this with a CombinePerKey if you
>> can come up with a reasonable key (e.g. the id of your input elements) that
>> the limit should be a applied to. Note that this may cause a lot of data to
>> be shuffled (though due to combiner lifting, no more than N per bundle).
>>
>> (2) You could have a DoFn that limits to N per bundle by initializing a
>> counter in its start_bundle and passing elements through until the counter
>> reaches a threshold. (Again, one could do this per id if one is available.)
>> It wouldn't stop production of the elements, but if things get fused it
>> would still likely be fairly cheap.
>>
>> Both of these could be prepended to the problematic consuming PTransform
>> as well.
>>
>> - Robert
>>
>>
>>
>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran 
>> wrote:
>>
>>> I'm aware of composite transforms and of the distributed nature of
>>> PTransforms. I'm not suggesting limiting the entire set and my example was
>>> more illustrative than the actual use case.
>>>
>>> My actual use case is basically: I have multiple PTransforms, and let's
>>> say most of them average ~100 generated outputs for a single input. Most of
>>> these PTransforms will occasionally run into an input though that might
>>> output maybe 1M outputs. This can cause issues if for example there are
>>> transforms that follow it that require a lot of compute per input.
>>>
>>> The simplest way to deal with this is to modify the `DoFn`s in our
>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>>> our transforms, but it'd be much cleaner if we could lift up this limiting
>>> logic out of the application logic and have some generic wrapper that
>>> extends our transforms.
>>>
>>> Thanks for the discussion!
>>>
>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 I don’t think it’s possible to extend in a way that you are asking
 (like, Java classes “*extend*"). Though, you can create your own
 composite PTransform that will incorporate one or several others inside
 *“expand()”* method. Actually, most of the Beam native PTransforms are
 composite transforms. Please, take a look on doc and examples [1]

 Regarding your example, please, be aware that all PTransforms are
 supposed to be executed in distributed environment and the order of records
 is not guaranteed. So, limiting the whole output by fixed number of records
 can be challenging - you’d need to make sure that it will be processed on
 only one worker, that means that you’d need to shuffle all your records by
 the same key and probably sort the records in way that you need.

 Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for
 that? [2]

 If it doesn’t work for you, could you provide more details of your use
 case? Then we probably can propose the more suitable solutions for that.

 [1]
 https://beam.apache.org/documentation/programming-guide/#composite-transforms
 [2]
 https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html

 —
 Alexey

 On 15 Sep 2023, at 14:22, Joey Tran  wrote:

 Is there a way to extend already defined PTransforms? My question is
 probably 

Re: How can we get multiple side inputs from a single pipeline ?

2023-08-28 Thread Reuven Lax via user
This looks fine. One caveat: there currently appears to be a bug in Beam
when you apply a combiner followed by View.asSingleton. I would
recommend replacing these lines:

.apply(Latest.globally())
.apply(View.asSingleton())

With the following:
.apply(Reify.timestamps())
.apply(Combine.globally(Latest.combineFn()).asSingletonView())

On Mon, Aug 28, 2023 at 8:30 AM Sachin Mittal  wrote:

> Hi,
>
> I was checking the code for side input patterns :
>
> https://beam.apache.org/documentation/patterns/side-inputs/
> Basically I need multiple side inputs from a  Slowly updating global
> window side inputs.
>
> So as per example pipeline is something like this:
>
> PCollectionView map =
> p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))   
>  .apply(ParDo.of(new DoFn Map>() {  @ProcessElement 
>  public void process(@Element Long input, @Timestamp Instant timestamp, 
> OutputReceiver> o) {o.output(/* 
> output a map */);// also output another map and a 
> list, is this possible ?  }}))
> .apply(Window.>into(new 
> GlobalWindows())
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>.discardingFiredPanes())
> .apply(Latest.globally()).apply(View.asSingleton());
>
>
> So as an extension of this example from the same DoFn which fetches the
> side input, alongside the map, I may also need another Map and another List.
> Reason I need to perform this in the same DoFn is that from this function
> we query external sources to get the side input and the other side inputs
> are also built from the same source.
>
> So I would like to avoid querying external sources multiple times to
> generate multiple side inputs from different DoFn and want to use the same
> function to generate multiple side inputs.
>
>  Can I achieve this by using  "Tags for multiple outputs" ?
>
> Thanks
> Sachin
>
>
>
>
>
>


Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
Oh - in that case it's possible that the problem may be the direct runner's
implementation of the pubsub source - especially the watermark. For a
direct-runner test, I recommend using TestStream (which allows you to
advance the watermark manually, so you can test windowing).

On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar  wrote:

> I'm developing with direct runner. but should go to dataflow when
> deployed.
>
>
>  Original Message 
> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
>
>
> What runner are you using to run this pipeline?
>
> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar 
> wrote:
>
>> Same result:
>> PCollection result = p
>> .apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>> .apply("Transform", ParDo.of(new MyTransformer()))
>> .apply("Windowing",
>> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>> .triggering(AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30
>> .withAllowedLateness(Duration.standardMinutes(1))
>> .discardingFiredPanes());
>>
>> PCollection insert = result.apply("Inserting",
>> JdbcIO.write()
>>
>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> .withStatement("INSERT INTO person (first_name,
>> last_name) VALUES (?, 'doe')")
>> .withPreparedStatementSetter((element,
>> preparedStatement) -> {
>> log.info("Preparing statement to insert");
>> preparedStatement.setString(1, element);
>> })
>> .withResults()
>> );
>> result.apply(Wait.on(insert))
>>     .apply("Selecting", new SomeTransform())
>> .apply("PubsubMessaging", ParDo.of(new
>> NextTransformer()));
>> p.run();
>>
>> updated the github repo as wqell.
>>
>> --- Original Message ---
>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>
>> > The other problem you have here is that you have not set a window.
>> Wait.on waits for the end of the current window before triggering. The
>> default Window is the GlobalWindow, so as written Wait.on will wait for the
>> end of time (or until you drain the pipeline, which will also trigger the
>> GlobalWindow).
>> > Try adding a 1-minute fixed window to the results you read from PubSub.
>> >
>> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar 
>> wrote:
>> >
>> > > writeVoid() and write() plus withResults() return the same
>> PCollection AFAIK. In any case i updated the code and same thing
>> happens
>> > >
>> > > PCollection result = p.
>> > > apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>> > > .apply("Transform", ParDo.of(new MyTransformer()));
>> > >
>> > > PCollection insert = result.apply("Inserting",
>> > > JdbcIO.write()
>> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
>> 'doe')")
>> > > .withPreparedStatementSetter((element, preparedStatement) -> {
>> > > log.info("Preparing statement to insert");
>> > > preparedStatement.setString(1, element);
>> > > })
>> > > .withResults()
>> > > );
>> > > result.apply(Wait.on(insert))
>> > > .apply("Selecting", new SomeTransform())
>> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> > >
>> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>> > >
>> > > --- Original Message ---
>> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>> > >
>> > >

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
What runner are you using to run this pipeline?

On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar  wrote:

> Same result:
> PCollection result = p
> .apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> .apply("Transform", ParDo.of(new MyTransformer()))
> .apply("Windowing",
> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
> .triggering(AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30
> .withAllowedLateness(Duration.standardMinutes(1))
> .discardingFiredPanes());
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
> .withPreparedStatementSetter((element,
> preparedStatement) -> {
> log.info("Preparing statement to insert");
> preparedStatement.setString(1, element);
> })
> .withResults()
> );
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> p.run();
>
> updated the github repo as wqell.
>
> --- Original Message ---
> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > The other problem you have here is that you have not set a window.
> Wait.on waits for the end of the current window before triggering. The
> default Window is the GlobalWindow, so as written Wait.on will wait for the
> end of time (or until you drain the pipeline, which will also trigger the
> GlobalWindow).
> > Try adding a 1-minute fixed window to the results you read from PubSub.
> >
> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar 
> wrote:
> >
> > > writeVoid() and write() plus withResults() return the same
> PCollection AFAIK. In any case i updated the code and same thing
> happens
> > >
> > > PCollection result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection insert = result.apply("Inserting",
> > > JdbcIO.write()
> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
> 'doe')")
> > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > log.info("Preparing statement to insert");
> > > preparedStatement.setString(1, element);
> > > })
> > > .withResults()
> > > );
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > >
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > >
> > > --- Original Message ---
> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
> > >
> > >
> > > > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> > > >
> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar 
> wrote:
> > > >
> > > > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > > > >
> > > > > PCollection result = p.
> > > > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > >
> > > > > PCollection insert = result.apply("Inserting",
> > > > > J

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
The other problem you have here is that you have not set a window. Wait.on
waits for the end of the current window before triggering. The default
Window is the GlobalWindow, so as written Wait.on will wait for the end of
time (or until you drain the pipeline, which will also trigger the
GlobalWindow).

Try adding a 1-minute fixed window to the results you read from PubSub.

On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar  wrote:

> writeVoid() and write() plus withResults() return the same
> PCollection AFAIK. In any case i updated the code and same thing
> happens
>
>  PCollection result = p.
> apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> .apply("Transform", ParDo.of(new MyTransformer()));
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
> .withPreparedStatementSetter((element,
> preparedStatement) -> {
> log.info("Preparing statement to insert");
> preparedStatement.setString(1, element);
> })
> .withResults()
> );
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>
> --- Original Message ---
> On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> >
> > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar 
> wrote:
> >
> > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > >
> > > PCollection result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection insert = result.apply("Inserting",
> > > JdbcIO.writeVoid()
> > > .withDataSourceProviderFn(/*...*/)
> > > .withStatement(/*...*/)
> > > .withPreparedStatementSetter(/*...*/)
> > > );
> > >
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > p.run();
> > >
> > > In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
> > >
> > > I've tried adding logging messages in my transforms to track the
> progress and identify where it's getting stuck, but I haven't been able to
> pinpoint the issue. I've searched for solutions online, but none of them
> provided a successful resolution for my problem.
> > >
> > > Can anyone provide any insights or suggestions on how to debug and
> resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > >
> > > You can find the sample code at: https://github.com/j1cs/app-beam
> > >
> > > Thank you for your help and support.
> > >
> > > Best regards,
> > >
> > > Juan Cuzmar.
>


Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
I believe you have to call withResults() on the JdbcIO transform in order
for this to work.

On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar  wrote:

> I hope you all are doing well. I am facing an issue with an Apache Beam
> pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
>
> PCollection result = p.
> apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> .apply("Transform", ParDo.of(new MyTransformer()));
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.writeVoid()
> .withDataSourceProviderFn(/*...*/)
> .withStatement(/*...*/)
> .withPreparedStatementSetter(/*...*/)
> );
>
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> p.run();
>
> In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
>
> I've tried adding logging messages in my transforms to track the progress
> and identify where it's getting stuck, but I haven't been able to pinpoint
> the issue. I've searched for solutions online, but none of them provided a
> successful resolution for my problem.
>
> Can anyone provide any insights or suggestions on how to debug and resolve
> this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
>
> You can find the sample code at: https://github.com/j1cs/app-beam
>
> Thank you for your help and support.
>
> Best regards,
>
> Juan Cuzmar.
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-18 Thread Reuven Lax via user
Jeff - does setting the global default work for you, or do you need
per-operator control? Seems like it would be to add this to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
wrote:

> Yeah, I don't think we have a good per-operator API for this. If we were
> to add it, it probably belongs in ResourceHints.
>
> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:
>
>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>> set. I believe this sets the default parallelism for all Flink operators.
>>
>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>>
>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>
>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>>> wrote:
>>>
>>>> To a (small) degree Sparks “new” AQE might be able to help depending on
>>>> what kind of operations Beam is compiling it down to.
>>>>
>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>
>>>>
>>>>
>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> I see. Robert - what is the story for parallelism controls on GBK with
>>>>> the Spark or Flink runners?
>>>>>
>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:
>>>>>
>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>> operators.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>>> level. And the input size of the operator is unknown at compiling 
>>>>>>>>>> stage if
>>>>>>>>>> it is not a source
>>>>>>>>>>  operator,
>>>>>>>>>>
>>>>>>>>>> Here's an example of flink
>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>> and reduceByKey):
>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of 
>>>>>>>>>>> keys in
>>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>>
>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>> dynamically
>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>> parallelism of
>>>>>>>>>>> the execution.
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>> execution
>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and 
>>>>>>>>>>>> join in
>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>


Re: Loosing records when using BigQuery IO Connector

2023-04-17 Thread Reuven Lax via user
What version of Beam are you using? There are no known data-loss bugs in
the connector, however if there has been a regression we would like to
address it with high priority.

On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van  wrote:

> Hi,
>
> I have a job that uses BigQuery IO Connector to write to a BigQuery table.
> When I test it with a small number of records (100) it works as expected
> but when I tested it with a larger number of records (1), I don’t see
> all of the records written to the output table but only part of it. It
> changes from run to run but no more than 1000 records as I have seen so far.
>
> There are WARNING log entries in the job log like this
>
> by client #0 failed with error, operations will be retried.
> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: 
> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>
> And one log entry like this
>
> Finalize of stream [stream-name] finished with row_count: 683
>
> If I sum all the numbers reported in the WARNING message with the one in
> the finalize of stream above, I get 1, which is exactly the number of
> input records.
>
> My pipeline uses 1 worker and it looks like this
>
> WriteResult writeResult = inputCollection.apply(
> "Save Events To BigQuery",
> BigQueryIO.write()
> .to(options.getTable())
> .withFormatFunction(TableRowMappers::toRow)
> .withMethod(Method.STORAGE_WRITE_API)
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> .withExtendedErrorInfo());
>
> writeResult
> .getFailedStorageApiInserts()
> .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>
> There are no log entries for the failed inserts.
>
> Is there anything wrong with my pipeline code or is it a bug in BigQuery
> IO Connector?
>
> Thanks
> -Binh
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-17 Thread Reuven Lax via user
Looking at FlinkPipelineOptions, there is a parallelism option you can set.
I believe this sets the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:

> Thanks Holden, this would work for Spark, but Flink doesn't have such kind
> of mechanism, so I am looking for a general solution on the beam side.
>
> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
> wrote:
>
>> To a (small) degree Sparks “new” AQE might be able to help depending on
>> what kind of operations Beam is compiling it down to.
>>
>> Have you tried setting spark.sql.adaptive.enabled &
>> spark.sql.adaptive.coalescePartitions.enabled
>>
>>
>>
>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>> I see. Robert - what is the story for parallelism controls on GBK with
>>> the Spark or Flink runners?
>>>
>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:
>>>
>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>
>>>>
>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>>>>
>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>>>>> and Flink - dynamically modifies the parallelism as the operator runs, so
>>>>> there is no need to have such controls. In fact these specific controls
>>>>> wouldn't make much sense for the way Dataflow implements these operators.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:
>>>>>
>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:
>>>>>>>
>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>> level. And the input size of the operator is unknown at compiling 
>>>>>>>> stage if
>>>>>>>> it is not a source
>>>>>>>>  operator,
>>>>>>>>
>>>>>>>> Here's an example of flink
>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>> and reduceByKey):
>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>>>>> your data. If you do a GroupByKey for example, the number of keys in 
>>>>>>>>> your
>>>>>>>>> data determines the maximum parallelism.
>>>>>>>>>
>>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to 
>>>>>>>>> automatically
>>>>>>>>> determine the parallelism (e.g. work will be dynamically split and 
>>>>>>>>> moved
>>>>>>>>> around between workers, the number of workers will autoscale, etc.), 
>>>>>>>>> so
>>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>> execution
>>>>>>>>>> engine, but it is very common to set parallelism like group by and 
>>>>>>>>>> join in
>>>>>>>>>> both spark & flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
I see. Robert - what is the story for parallelism controls on GBK with the
Spark or Flink runners?

On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:

> No, I don't use dataflow, I use Spark & Flink.
>
>
> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>
>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>> and Flink - dynamically modifies the parallelism as the operator runs, so
>> there is no need to have such controls. In fact these specific controls
>> wouldn't make much sense for the way Dataflow implements these operators.
>>
>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:
>>
>>> Just for performance tuning like in Spark and Flink.
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> What are you trying to achieve by setting the parallelism?
>>>>
>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:
>>>>
>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>> it is not a source
>>>>>  operator,
>>>>>
>>>>> Here's an example of flink
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>> and reduceByKey):
>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>
>>>>>
>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>>> data determines the maximum parallelism.
>>>>>>
>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>
>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:
>>>>>>
>>>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>>>> parallelism for individual operators like group by and join? I
>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>> engine, but it is very common to set parallelism like group by and join 
>>>>>>> in
>>>>>>> both spark & flink.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
Are you running on the Dataflow runner? If so, Dataflow - unlike Spark and
Flink - dynamically modifies the parallelism as the operator runs, so there
is no need to have such controls. In fact these specific controls wouldn't
make much sense for the way Dataflow implements these operators.

On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:

> Just for performance tuning like in Spark and Flink.
>
>
> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> What are you trying to achieve by setting the parallelism?
>>
>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:
>>
>>> Thanks Reuven, what I mean is to set the parallelism in operator level.
>>> And the input size of the operator is unknown at compiling stage if it is
>>> not a source
>>>  operator,
>>>
>>> Here's an example of flink
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>> Spark also support to set operator level parallelism (see groupByKey
>>> and reduceByKey):
>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> The maximum parallelism is always determined by the parallelism of your
>>>> data. If you do a GroupByKey for example, the number of keys in your data
>>>> determines the maximum parallelism.
>>>>
>>>> Beyond the limitations in your data, it depends on your execution
>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>> around between workers, the number of workers will autoscale, etc.), so
>>>> there's no need to explicitly set the parallelism of the execution.
>>>>
>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:
>>>>
>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>> parallelism for individual operators like group by and join? I
>>>>> understand the parallelism setting depends on the underlying execution
>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>> both spark & flink.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-15 Thread Reuven Lax via user
The maximum parallelism is always determined by the parallelism of your
data. If you do a GroupByKey for example, the number of keys in your data
determines the maximum parallelism.

Beyond the limitations in your data, it depends on your execution engine.
If you're using Dataflow, Dataflow is designed to automatically determine
the parallelism (e.g. work will be dynamically split and moved around
between workers, the number of workers will autoscale, etc.), so there's no
need to explicitly set the parallelism of the execution.

On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:

> Besides the global parallelism of beam job, is there any way to set
> parallelism for individual operators like group by and join? I
> understand the parallelism setting depends on the underlying execution
> engine, but it is very common to set parallelism like group by and join in
> both spark & flink.
>
>
>
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: major reduction is performance when using schema registry - KafkaIO

2023-04-09 Thread Reuven Lax via user
How are you using the schema registry? Do you have a code sample?

On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov  wrote:

> Hello,
>
> I am trying to understand the effect of schema registry on our pipeline's
> performance. In order to do sowe created a very simple pipeline that reads
> from kafka, runs a simple transformation of adding new field and writes of
> kafka.  the messages are in avro format
>
> I ran this pipeline with 3 different options on same configuration : 1
> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>
> * when i used apicurio as the schema registry i was able to process only
> 2000 messages per second
> * when i used confluent schema registry i was able to process 7000
> messages per second
> * when I did not use any schema registry and used plain avro
> deserializer/serializer i was able to process *30K* messages per second.
>
> I understand that using a schema registry may cause a reduction in
> performance but  in my opinion the difference is too high.
> Any comments or suggestions about these results?
>
> Thanks in advance
> Sigalit
>


Re: Successful Inserts for Storage Write API?

2023-03-21 Thread Reuven Lax via user
FYI, I just checked in WriteResult.getSuccessfulStorageApiInserts, which
should give you what you need. This is now checked into Beam HEAD, and
should be included in the next Beam release.

On Fri, Mar 3, 2023 at 12:51 PM Matthew Ouyang 
wrote:

> I'm currently not using Wait.on.  I have a pipeline that uses BigQuery for
> checkpointing purposes.  I only want records that are
> successfully checkpointed in BigQuery to be eligible for the next stage in
> my pipeline.  With streaming inserts, I can use getSuccessfulInserts to
> achieve this and I was looking for something similar with Storage Write.
>
> On Thu, Mar 2, 2023 at 4:48 PM Reuven Lax via user 
> wrote:
>
>> Are you trying to do this in order to use Wait.on? getSuccessfulInserts
>> is not currently supported for Storage Write API.
>>
>> On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang 
>> wrote:
>>
>>> Thank you to Ahmed and Reuven for the tip on
>>> WriteResult::getFailedStorageApiInserts.
>>>
>>> When I tried to get the successful inserts through the Storage Write
>>> API, I received an error message saying that "Retrieving successful inserts
>>> is only supported for streaming inserts. Make sure
>>> withSuccessfulInsertsPropagation is correctly configured for
>>> BigQueryIO.Write object."  Did I make a mistake, or is there a reason why
>>> this is not possible?  I tried setting triggeringFrequency +
>>> numStorageWriteApiStreams as required by Storage Write, and I tried to set
>>> successfulInsertsPropagation as directed in the error message.
>>>
>>


Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Reuven Lax via user
In Apache Beam, Flatten is a union operation - it takes multiple
PCollections (of the same type) and merges them into a single PCollection.

On Mon, Mar 13, 2023 at 11:32 AM Godefroy Clair 
wrote:

> Hi,
> I am wondering about the way `Flatten()` and `FlatMap()` are implemented
> in Apache Beam Python.
> In most functional languages, FlatMap() is the same as composing
> `Flatten()` and `Map()` as indicated by the name, so Flatten() and
> Flatmap() have the same input.
> But in Apache Beam, Flatten() is using _iterable of PCollections_ while
> FlatMap() is working with _PCollection of Iterables_.
>
> If I am not wrong, the signature of Flatten, Map and FlatMap are :
> ```
> Flatten:: Iterable[PCollections[A]] -> PCollection[A]
> Map:: (PCollection[A], (A-> B)) -> PCollection[B]
> FlatMap:: (PCollection[Iterable[A]], (A->B)) -> [A]
> ```
>
> So my question is is there another "Flatten-like" function  with this
> signature :
> ```
> anotherFlatten:: PCollection[Iterable[A]] -> PCollection[A]
> ```
>
> One of the reason this would be useful, is that when you just want to
> "flatten" a `PCollection` of `iterable` you have to use `FlatMap()`with an
> identity function.
>
> So instead of writing:
> `FlatMap(lambda e: e)`
> I would like to use a function
> `anotherFlatten()`
>
> Thanks,
> Godefroy
>


Re: Successful Inserts for Storage Write API?

2023-03-02 Thread Reuven Lax via user
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is
not currently supported for Storage Write API.

On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang 
wrote:

> Thank you to Ahmed and Reuven for the tip on
> WriteResult::getFailedStorageApiInserts.
>
> When I tried to get the successful inserts through the Storage Write API,
> I received an error message saying that "Retrieving successful inserts is
> only supported for streaming inserts. Make sure
> withSuccessfulInsertsPropagation is correctly configured for
> BigQueryIO.Write object."  Did I make a mistake, or is there a reason why
> this is not possible?  I tried setting triggeringFrequency +
> numStorageWriteApiStreams as required by Storage Write, and I tried to set
> successfulInsertsPropagation as directed in the error message.
>


Re: Deduplicate usage

2023-03-02 Thread Reuven Lax via user
State is per-key, and keys are distributed across workers. Two workers
should not be working on the same state.

On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van  wrote:

> Thank you Ankur,
>
> This is the current source code of Deduplicate transform.
>
>   Boolean seen = seenState.read();
>   // Seen state is either set or not set so if it has been set then it 
> must be true.
>   if (seen == null) {
> // We don't want the expiry timer to hold up watermarks.
> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
> seenState.write(true);
> receiver.output(element);
>   }
>
> Could you please explain the synchronization for the following scenario?
>
>- There are two workers.
>- Both workers read the same state at the same time and the state was
>not set yet. In this case, both will get null in the response (I
>believe)
>- Both of them will try to set the state and send the output out.
>
> What will happen in this scenario?
>
> Thank you
> -Binh
>
> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka 
> wrote:
>
>> Hi Binh, The Deduplicate transform uses state api to do the
>> de-duplication which should do the needful operations to work across
>> multiple concurrent workers.
>>
>> Thanks,
>> Ankur
>>
>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>>
>>> Hi,
>>>
>>> I am writing a pipeline and want to apply deduplication. I look at
>>> Deduplicate transform that Beam provides and wonder about its usage. Do
>>> I need to shuffle input collection by key before calling this
>>> transformation? I look at its source code and it doesn’t do any shuffle so
>>> wonder how it works when let’s say there are duplicates and the duplicated
>>> elements are processed concurrently on multiple workers.
>>>
>>> Thank you
>>> -Binh
>>>
>>


Re: getFailedInsertsWithErr and Storage Write API

2023-03-01 Thread Reuven Lax via user
Correct, however if you are using a recent version of Beam you can call
WriteResult.getFailedStorageApiInserts

On Wed, Mar 1, 2023 at 3:00 PM Matthew Ouyang 
wrote:

> The documentation says WriteResult.getFailedInserts won’t return anything
> when used with the Storage Write API (
> https://beam.apache.org/documentation/io/built-in/google-bigquery/) Is it
> the same for WriteResult.getFailedInsertsWithErr?
>


Re: Beam saves filepaths in Flink's state

2022-12-08 Thread Reuven Lax via user
This doesn't sound ideal to me. For contrast, Dataflow doesn't save any of
these things (coders, transforms, configs) in state, which makes it easier
for Dataflow to update pipelines.

On Thu, Dec 8, 2022 at 7:48 AM Cristian Constantinescu 
wrote:

> Hi everyone,
>
> I noticed that the Flink state contains KafkaIO's consumer config
> properties.
>
> When restarting the Beam pipeline (Java SDK) from state, the Flink
> Runner translation layer will deserialize the KafkaUnboudedReader (via
> UnboundedSourceWrapper) from Flink's state. This happens *before* the
> user written KafkaIO builder code is executed. Effectively what this
> means is that if the user has code that feeds KafkaIO correct file
> paths (probably fetched from configs), Beam still tries to use the
> ones that were saved in the Flink state and those may be outdated,
> hence preventing the pipeline from starting up properly.
>
> This is problematic if files get moved around on disk, or if we move
> the Flink state to another machine that may have different file
> configurations.
>
> Has anyone seen this problem before?
>
> Also, could anyone give me a quick overview of why Beam saves so many
> things in the Flink state (I'm aware of coders, transforms and
> transform configs) when those things can be materialized from the user
> code just like when the pipeline is started without a state. It would
> help me find a workaround for this issue.
>
> Thanks,
> Cristian
>


Re: Single side input to multiple transforms

2022-11-07 Thread Reuven Lax via user
Is this a Python job?

On Mon, Nov 7, 2022 at 12:38 AM Binh Nguyen Van  wrote:

> Hi,
>
> I am writing a pipeline where I have one singleton side input that I want
> to use in multiple different transforms. When I run the pipeline in Google
> Dataflow I see multiple entries in the logs that have a message like this
>
> Deduplicating side input tags, found non-unique side input key
> org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.:1204#4663620f501c9270
>
> Is this something that I should avoid? If so how can I do that?
>
> Thanks
> -Binh
>


Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Reuven Lax via user
PCollections's usually are persistent within a pipeline, so you can reuse
them in other parts of a pipeline with no problem.

There is no notion of state across pipelines - every pipeline is
independent. If you want state across pipelines you can write the
PCollection out to a set of files which are read back in in the new
pipeline.

On Tue, Oct 18, 2022 at 11:45 PM Ravi Kapoor  wrote:

> Hi Team,
> Can we stage a PCollection or  PCollection data? Lets say
> to save  the expensive operations between two complex BQ tables time and
> again and materialize it in some temp view which will be deleted after the
> session.
>
> Is it possible to do that in the Beam Pipeline?
> We can later use the temp view in another pipeline to read the data from
> and do processing.
>
> Or In general I would like to know Do we ever stage the PCollection.
> Let's say I want to create another instance of the same job which has
> complex processing.
> Does the pipeline re perform the computation or would it pick the already
> processed data in the previous instance that must be staged somewhere?
>
> Like in spark we do have notions of createOrReplaceTempView which is used
> to create temp table from a spark dataframe or dataset.
>
> Please advise.
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorrav...@gmail.com
>


Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
The default max file size is 4Tib. BigQuery supports files up to 5Tib, but
there might be some slop in our file-size estimation which is why Beam set
a slightly lower limit. In any case, you won't be able to increase that
value by too much, or BigQuery will reject the load job.

The default max bytes per partition maybe can be increased. When the code
was written, BigQuery's max limit was 12 Tib, but if it's now 15 TiB that
would be a reason to increase it.

BigQuery does not provide guarantees on scheduling load jobs (especially if
you don't have reserved slots). Some other ideas for how to improve things:
- If you are running in streaming mode, then consider increasing the
triggering duration so you generate load jobs less often.
- By default, files are written out in json format. This is inefficient
and tends to create many more files. There is currently partial support for
writing files in a more-efficient AVRO format, but it requires you to call
withAvroWriter to pass in a function that converts your records into AVRO.
- I would also recommend trying the storage API write method. This does
not have the same issues with scheduling that load jobs have.

Reuven

On Thu, Sep 29, 2022 at 1:02 PM Julien Phalip  wrote:

> Hi all,
>
> Thanks for the replies.
>
> @Ahmed, you mentioned that one could hardcode another value
> for DEFAULT_MAX_FILE_SIZE. How may I do that from my own code?
>
> @Reuven, to give you more context on my use case: I'm running into an
> issue where a job that writes to BQ is taking an unexpectedly long time. It
> looks like things are slowing down on the BQ load job side of things. My
> theory is that the pipeline might generate too many BQ load job requests
> for BQ to handle in a timely manner. So I was thinking that this could be
> mitigated by increasing the file size, and therefore reducing the number of
> load job requests.
>
> That said, now that you've pointed at withMaxBytesPerPartition(), maybe
> that's what I should use instead? I see this defaults to 11TiB but perhaps
> I could try increasing it  to something closer to BQ's limit (15TiB)?
>
> Thanks,
>
> Julien
>
> On Thu, Sep 29, 2022 at 11:01 AM Ahmed Abualsaud via user <
> user@beam.apache.org> wrote:
>
>> That's right, if maxFileSize is made too small you may hit the default
>> maximum files per partition (10,000), in which case copy jobs will be
>> triggered. With that said though, BigQueryIO already has a public
>> withMaxBytesPerPartition() [1] method that controls the partition byte
>> size, which is arguably more influential in triggering this other codepath.
>>
>> [1]
>> https://github.com/apache/beam/blob/028c564b8ae1ba1ffa6aadb8212ec03555dd63b6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2623
>>
>> On Thu, Sep 29, 2022 at 12:24 PM Reuven Lax  wrote:
>>
>>> It's not public because it was added for use in unit tests, and
>>> modifying this value can have very unexpected results (e.g. making it
>>> smaller can trigger a completely different codepath that is triggered when
>>> there are too many files, leading to unexpected cost increases in the
>>> pipeline).
>>>
>>> Out of curiosity, what is your use case for needing to control this file
>>> size?
>>>
>>> On Thu, Sep 29, 2022 at 8:01 AM Ahmed Abualsaud <
>>> ahmedabuals...@google.com> wrote:
>>>
>>>> Hey Julien,
>>>>
>>>> I don't see a problem with exposing that method. That part of the code
>>>> was committed ~6 years ago, my guess is it wasn't requested to be public.
>>>>
>>>> One workaround is to hardcode another value for DEFAULT_MAX_FILE_SIZE [1].
>>>> Would this work temporarily? @Chamikara Jayalath 
>>>>  @Reuven Lax  other thoughts?
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/17453e71a81ba774ab451ad141fc8c21ea8770c9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L109
>>>>
>>>> Best,
>>>> Ahmed
>>>>
>>>> On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'd like to control the size of files written to GCS when using
>>>>> BigQueryIO's FILE_LOAD write method.
>>>>>
>>>>> However, it looks like the withMaxFileSize method (
>>>>> https://github.com/apache/beam/blob/948af30a5b665fe74b7052b673e95ff5f5fc426a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L25

Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
It's not public because it was added for use in unit tests, and modifying
this value can have very unexpected results (e.g. making it smaller can
trigger a completely different codepath that is triggered when there are
too many files, leading to unexpected cost increases in the pipeline).

Out of curiosity, what is your use case for needing to control this file
size?

On Thu, Sep 29, 2022 at 8:01 AM Ahmed Abualsaud 
wrote:

> Hey Julien,
>
> I don't see a problem with exposing that method. That part of the code was
> committed ~6 years ago, my guess is it wasn't requested to be public.
>
> One workaround is to hardcode another value for DEFAULT_MAX_FILE_SIZE [1].
> Would this work temporarily? @Chamikara Jayalath  
> @Reuven
> Lax  other thoughts?
>
> [1]
> https://github.com/apache/beam/blob/17453e71a81ba774ab451ad141fc8c21ea8770c9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L109
>
> Best,
> Ahmed
>
> On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip  wrote:
>
>> Hi,
>>
>> I'd like to control the size of files written to GCS when using
>> BigQueryIO's FILE_LOAD write method.
>>
>> However, it looks like the withMaxFileSize method (
>> https://github.com/apache/beam/blob/948af30a5b665fe74b7052b673e95ff5f5fc426a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2597)
>> is not public.
>>
>> Is that intentional? Is there a workaround to control the file size?
>>
>> Thanks,
>>
>> Julien
>>
>
> On Wed, Sep 28, 2022 at 4:55 PM Julien Phalip  wrote:
>
>> Hi,
>>
>> I'd like to control the size of files written to GCS when using
>> BigQueryIO's FILE_LOAD write method.
>>
>> However, it looks like the withMaxFileSize method (
>> https://github.com/apache/beam/blob/948af30a5b665fe74b7052b673e95ff5f5fc426a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2597)
>> is not public.
>>
>> Is that intentional? Is there a workaround to control the file size?
>>
>> Thanks,
>>
>> Julien
>>
>


Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
If you are using an exactly-once runner, it will guarantee every message is
consumed once (though the mechanism might not be obvious).

Generally what happens is that the messages are consumed into the system in
order. However if you have downstream ParDos, there is no guarantee that
they process the messages in the same order (especially if there is a
shuffle operation, such as GroupByKey, in between).

Now a future version of the source might decide to split the Kafka
partition if it's too large to handle on one thread (e.g. split it in half
where the first half is bounded and the second half is the growing
unbounded tail of the partition). In this case the source would keep two
checkpoints for the current position in each half of the partition. (this
mode of operation probably wouldn't be compatible with checkpointing
offsets back to the broker though.). The source doesn't do this today, I'm
just mentioning it to point out another way in which things could be
consumed out of order.

On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva 
wrote:

> Hi Reuven,
> Thanks for those clarifications.
>
> For the 4th question that I raised, if A gets failed and B is committed,
> will those messages(A) get consumed again from Kafka or will the messages
> get recovered from the checkpoint and retried in that specific operator?
>
> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user 
> wrote:
>
>>
>>
>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have started using KafkaIO to read a data stream and have the
>>> following questions. Appreciate it if you could provide a few
>>> clarifications on the following.
>>>
>>>
>> 1. Does KafkaIO ignore the offset stored in the broker and uses the
>>> offset stored during checkpointing when consuming messages?
>>>
>>
>> Generally yes, as that's the only way to guarantee consistency (we can't
>> atomically commit to the runner and to Kafka). However when starting a new
>> pipeline, you should be able to start reading at the broker checkpoint.
>>
>>
>>> 2. How many threads will be used by the Kafka consumer?
>>>
>>
>> This depends somewhat on the runner, but you can expect one thread per
>> partition.
>>
>>
>>> 3. If the consumer polls a set of messages A and then later B while A is
>>> still being processed, is there a possibility of set B finishing before A?
>>> Does parallelism control this?
>>>
>>
>> yes. Beam doesn't currently have any notion of ordering. All messages are
>> independent and can be processed at different times (the source also
>> reserves the right to process different ranges of a single Kafka partition
>> on different threads, though it doesn't currently do this).
>>
>>
>>> 4. In the above scenario if B is committed back to the broker and
>>> somehow A failed, upon a restart is there any way we can consume A again
>>> without losing data?
>>>
>>
>> Data should never be lost. If B is processed, then you can assume that
>> the A data is checkpointed inside the Beam runner and will be processed to.
>>
>>
>>
>>>
>>> Thank you.
>>>
>>>
>>>
>>


Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva 
wrote:

> Hi all,
>
> I have started using KafkaIO to read a data stream and have the following
> questions. Appreciate it if you could provide a few clarifications on the
> following.
>
>
1. Does KafkaIO ignore the offset stored in the broker and uses the offset
> stored during checkpointing when consuming messages?
>

Generally yes, as that's the only way to guarantee consistency (we can't
atomically commit to the runner and to Kafka). However when starting a new
pipeline, you should be able to start reading at the broker checkpoint.


> 2. How many threads will be used by the Kafka consumer?
>

This depends somewhat on the runner, but you can expect one thread per
partition.


> 3. If the consumer polls a set of messages A and then later B while A is
> still being processed, is there a possibility of set B finishing before A?
> Does parallelism control this?
>

yes. Beam doesn't currently have any notion of ordering. All messages are
independent and can be processed at different times (the source also
reserves the right to process different ranges of a single Kafka partition
on different threads, though it doesn't currently do this).


> 4. In the above scenario if B is committed back to the broker and somehow
> A failed, upon a restart is there any way we can consume A again without
> losing data?
>

Data should never be lost. If B is processed, then you can assume that the
A data is checkpointed inside the Beam runner and will be processed to.



>
> Thank you.
>
>
>


Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-30 Thread Reuven Lax via user
Snapshots are expected to happen nearly instantaneously. While processing
is paused while the snapshot is in progress, the pause should usually be
very brief. It's true that Dataflow does not support automated snapshots -
you would have to create them yourself using a cron.

Checkpoints on Flink aren't simply automated snapshot mechanism.
Checkpoints are how Flink implements consistent, exactly-once processing.
Dataflow on the other hand continuously checkpoints records, so doesn't
need global checkpoints for exactly-once processing.

Reuven

On Tue, Aug 30, 2022 at 5:10 AM Will Baker  wrote:

> I looked into snapshots and they do seem useful for providing a means
> to save state and resume, however they aren't as seamless as I was
> hoping for with the automatic checkpointing that is supported by other
> runners. It looked like snapshots would be user initiated and would
> pause the pipeline while the snapshot was being created. I could
> imagine how this would be set up on an automated schedule, but would
> still prefer something more light-weight like checkpoints.
>
> On Mon, Aug 29, 2022 at 8:11 PM Reuven Lax  wrote:
> >
> > Google Cloud Dataflow does support snapshots. Is this what you were
> looking for?
> >
> > On Mon, Aug 29, 2022 at 4:04 PM Kenneth Knowles  wrote:
> >>
> >> Hi Will, David,
> >>
> >> I think you'll find the best source of answer for this sort of question
> on the user@beam list. I've put that in the To: line with a BCC: to the
> dev@beam list so everyone knows they can find the thread there. If I have
> misunderstood, and your question has to do with building Beam itself, feel
> free to move it back.
> >>
> >> Kenn
> >>
> >> On Mon, Aug 29, 2022 at 2:24 PM Will Baker  wrote:
> >>>
> >>> Hello!
> >>>
> >>> I am wondering about using checkpoints with Beam running on Google
> >>> Cloud Dataflow.
> >>>
> >>> The docs indicate that checkpoints are not supported by Google Cloud
> >>> Dataflow:
> https://beam.apache.org/documentation/runners/capability-matrix/additional-common-features-not-yet-part-of-the-beam-model/
> >>>
> >>> Is there a recommended approach to handling checkpointing on Google
> >>> Cloud Dataflow when using streaming sources like Kinesis and Kafka, so
> >>> that a pipeline could be resumed from where it left off if it needs to
> >>> be stopped or crashes for some reason?
> >>>
> >>> Thanks!
> >>> Will Baker
>


Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-29 Thread Reuven Lax via user
Google Cloud Dataflow does support snapshots
. Is this
what you were looking for?

On Mon, Aug 29, 2022 at 4:04 PM Kenneth Knowles  wrote:

> Hi Will, David,
>
> I think you'll find the best source of answer for this sort of question on
> the user@beam list. I've put that in the To: line with a BCC: to the
> dev@beam list so everyone knows they can find the thread there. If I have
> misunderstood, and your question has to do with building Beam itself, feel
> free to move it back.
>
> Kenn
>
> On Mon, Aug 29, 2022 at 2:24 PM Will Baker  wrote:
>
>> Hello!
>>
>> I am wondering about using checkpoints with Beam running on Google
>> Cloud Dataflow.
>>
>> The docs indicate that checkpoints are not supported by Google Cloud
>> Dataflow:
>> https://beam.apache.org/documentation/runners/capability-matrix/additional-common-features-not-yet-part-of-the-beam-model/
>>
>> Is there a recommended approach to handling checkpointing on Google
>> Cloud Dataflow when using streaming sources like Kinesis and Kafka, so
>> that a pipeline could be resumed from where it left off if it needs to
>> be stopped or crashes for some reason?
>>
>> Thanks!
>> Will Baker
>>
>


Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
That would be a nice feature, though maybe some work to implement.

On Thu, Aug 4, 2022 at 2:49 PM Brian Hulette  wrote:

> In some places (e.g. in AutoValueSchema) we assume that nested
> schema-inferred types are of the same "class". I filed [1] to track this a
> while back - I think we should support mixing and matching SchemaProviders
> for nested types.
>
> [1] https://github.com/apache/beam/issues/20359
>
> On Thu, Aug 4, 2022 at 2:45 PM Reuven Lax via user 
> wrote:
>
>> We do have JavaBeanSchema which might work, depending on whether your
>> thrift class conforms to java beans.
>>
>> On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van 
>> wrote:
>>
>>> Hi,
>>>
>>> I have an AutoValue class and it looks like this
>>>
>>> @AutoValue
>>> @DefaultSchema( AutoValueSchema.class )
>>> public abstract class MyClass {
>>> public abstract String getField1();
>>> public abstract MyThriftClass getField2();
>>> public static Builder Builder() {
>>> return new AutoValue_MyClass.Builder();
>>> }
>>>
>>> @AutoValue.Builder
>>> public static abstract class Builder() {
>>> public abstract Builder setField1(String field1);
>>> public abstract Builder setField2(MyThriftClass field2);
>>> public abstract MyClass build();
>>> }
>>> }
>>>
>>> MyThriftClass is not an AutoValue class and it inherits from
>>> org.apache.thrift.TBase class.
>>>
>>> When I run a pipeline with a PCollection of elements that are instances
>>> of this class, I got this error java.lang.IllegalStateException:
>>> AutoValue generated class not found: com.foo.bar.AutoValue_MyThriftClass
>>> .
>>>
>>> My question is, is it possible to use a non-AutoValue member in an
>>> AutoValue class like what I am doing now? If yes then how can I do it? If
>>> no then what would be the alternatives?
>>>
>>> Thank you
>>>
>>> -Binh
>>>
>>>
>>>


Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
We do have JavaBeanSchema which might work, depending on whether your
thrift class conforms to java beans.

On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van  wrote:

> Hi,
>
> I have an AutoValue class and it looks like this
>
> @AutoValue
> @DefaultSchema( AutoValueSchema.class )
> public abstract class MyClass {
> public abstract String getField1();
> public abstract MyThriftClass getField2();
> public static Builder Builder() {
> return new AutoValue_MyClass.Builder();
> }
>
> @AutoValue.Builder
> public static abstract class Builder() {
> public abstract Builder setField1(String field1);
> public abstract Builder setField2(MyThriftClass field2);
> public abstract MyClass build();
> }
> }
>
> MyThriftClass is not an AutoValue class and it inherits from
> org.apache.thrift.TBase class.
>
> When I run a pipeline with a PCollection of elements that are instances of
> this class, I got this error java.lang.IllegalStateException: AutoValue
> generated class not found: com.foo.bar.AutoValue_MyThriftClass.
>
> My question is, is it possible to use a non-AutoValue member in an
> AutoValue class like what I am doing now? If yes then how can I do it? If
> no then what would be the alternatives?
>
> Thank you
>
> -Binh
>
>
>


Re: GroupIntoBatches not working on Flink?

2022-07-26 Thread Reuven Lax via user
This might be a bug in the Flink runner, because it is implemented here

.

On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantinescu 
wrote:

> Hi everyone,
>
> Quick question about GroupIntoBatches.
>
> When running on Flink, eventually it hits an unsupported
> exception "Canceling a timer by ID is not yet supported." on this line [1].
> The source inputs are AVRO files for testing (batch) but will use kafka
> topics (streaming) when deployed.
>
> This happens when the batch is filled (10 items) and the max buffering
> time timer needs to be cancelled.
>
> Anyone else observed this issue?
>
> On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses
> InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I
> would guess there's a difference between batch and streaming requirements?
>
> Thank you,
> Cristian
>
>
> [1]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157
> [2]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136
> [3]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314
>


Re: Generating Hearbeats Using Looping Timer

2022-07-09 Thread Reuven Lax via user
On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra 
wrote:

> Maybe the previous post was too verbose so I will try to summarize my
> question -
> If one instance of DoFn tries to set a timer for a time which is behind
> the pipeline's watermark, can this cause the pipeline to stall for other
> keys as well?
> "stall" meaning here - other keys' timers will start lagging behind.
>

It depends on the runner, but in general timers should be independent.
However practically every worker has only so many threads to process and
timers are processed in order, so if a large number of these "old"
timers are set and they take a long time to process, this could cause some
delays.


> say there are 1 million DoFns running in a steady state(behaving as
> expected), where timers are firing at 5 min boundaries.
>

Do you mean 1 million keys? What do you mean by 1 million DoFns?


> 1 bad key comes which sets its timer to a time which is 1 hour older than
> the current watermark. What happens here? my understanding here is this -
>  the looping timer will fire back to back in quick succession for this bad
> key 12 times and after that this key also joins the group of 1 million keys
> which were firing regularly at 5 min boundaries.
>

Where does the number 12 come from?


> PS - Above DoFn is using default Global Windows and default trigger.
>
>
> On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>
>> Hello,
>> I have a pipeline which is generating heartbeats using looping timers in
>> a stateful dofn. Following is pseudo code for the process element and
>> onTimer methods
>>
>> StateSpec> lastSeenMsg = StateSpecs.value(...);
>> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>
>> processElemnt(input) {
>> // read event time from the message
>> Instant currentEventTime = input.getEventTimeEpoc();
>> if(input.state == ONLINE) {
>>lastSeenMsg.write(input);
>>// calculate start of looping timer
>>// which will be next 5 min boundary
>>long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000;
>>long offset = currentEventTimeEpocSeconds % 300;
>>long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
>>loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
>> }
>> else {
>>  // stop hearbeats when entity offline
>>   loopingTimer.clear();
>>}
>> }
>>
>>
>> onTimer() {
>> // emit out the lastSeenMsg
>> output(lastSeenMsg.read());
>>
>>
>> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
>> }
>>
>>
>> The above pipeline works well in low load scenarios. But on one of my
>> heavy traffic deployment the pipeline seems to be not able to keep up with
>> the load. Input msg from pubsub are state change events for an entity -
>>  Entity Online or Entity Offline messages. Once a entity comes Online we
>> start generating heartbeat every 5 min as long as we do not encounter
>> Offline message for that entity. Number of online entities can be fairly
>> large, more than 10 Million entities can be Online at a given time.
>>
>> I am seeing this particular DoFn starts lagging behind as soon as it gets
>> started. The timers are firing pretty late. The lag went up to 48 hours
>> before I restarted the pipeline. Is there something wrong in what I am
>> doing.
>> Note - I am reading the eventTime embedded in the msg. Intent for this is
>> fire a bunch of timers in quick succession if needed and fill up the DB
>> with heartbeats till current time.
>> So say a msg comes with state = Online and time = 10.02 AM. and current
>> watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
>> expect to fire immediately since the watermark is already ahead of this
>> time? (Or this is wrong understanding). Similarly the subsequent call to
>> onTimer method will set next timer to fire at 10:10 and that I also expect
>> to fire immediately. After this point this DoFn should start emitting at
>> same time with all other instances of this DoFn. Is there a mistake in this
>> implementaion?
>> Another thing I am noticing is that this pipeline is running a single
>> dataflow worker and not scaling up automatically. For such a large key
>> space (10 million DoFns and their timers) i expected the pipeline to use a
>> lot of CPU near the 5 minute boudaries and scale up but that is also not
>> happening.
>>
>


Re: Building a Schema from a file

2021-06-22 Thread Reuven Lax
By proto, I meant using the messages in
beam/model/pipeline/src/proto/schema.proto to define a schema. You can then
use the classes in SchemaTranslation to convert that to a schema.

On Tue, Jun 22, 2021 at 8:06 PM Matthew Ouyang 
wrote:

> I am currently using BigQueryUtils to convert a BigQuery TableSchema to a
> Beam Schema but I am looking to either switch off that approach because I'm
> looking for nullable arrays (BigQueryUtils always makes arrays not
> nullable) and ability to add my own logical types (one of my fields was
> unstructured JSON).
>
> I'm open to using proto or Avro since I would like to avoid the worst case
> scenario of building my own.  However it doesn't look like either has
> support to add logical types, and proto appears to be missing support for
> the Beam Row type.
>
> On Fri, Jun 18, 2021 at 1:56 PM Brian Hulette  wrote:
>
>> Are the files in some special format that you need to parse and
>> understand? Or could you opt to store the schemas as proto descriptors or
>> Avro avsc?
>>
>> On Fri, Jun 18, 2021 at 10:40 AM Matthew Ouyang 
>> wrote:
>>
>>> Hello Brian.  Thank you for the clarification request.  I meant the
>>> first case.  I have files that define field names and types.
>>>
>>> On Fri, Jun 18, 2021 at 12:12 PM Brian Hulette 
>>> wrote:
>>>
>>>> Could you clarify what you mean? I could interpret this two different
>>>> ways:
>>>> 1) Have a separate file that defines the literal schema (field names
>>>> and types).
>>>> 2) Infer a schema from data stored in some file in a structurerd format
>>>> (e.g csv or parquet).
>>>>
>>>> For (1) Reuven's suggestion would work. You could also use an Avro avsc
>>>> file here, which we also support.
>>>> For (2) we don't have anything like this in the Java SDK. In the Python
>>>> SDK the DataFrame API can do this though. When you use one of the pandas
>>>> sources with the Beam DataFrame API [1] we peek at the file and infer the
>>>> schema so you don't need to specify it. You'd just need to use
>>>> to_pcollection to convert the dataframe to a schema-aware PCollection.
>>>>
>>>> Brian
>>>>
>>>> [1]
>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html
>>>> [2]
>>>> https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection
>>>>
>>>> On Fri, Jun 18, 2021 at 7:50 AM Reuven Lax  wrote:
>>>>
>>>>> There is a proto format for Beam schemas. You could define it as a
>>>>> proto in a file and then parse it.
>>>>>
>>>>> On Fri, Jun 18, 2021 at 7:28 AM Matthew Ouyang <
>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>
>>>>>> I was wondering if there were any tools that would allow me to build
>>>>>> a Beam schema from a file?  I looked for it in the SDK but I couldn't 
>>>>>> find
>>>>>> anything that could do it.
>>>>>>
>>>>>


Re: RenameFields behaves differently in DirectRunner

2021-06-07 Thread Reuven Lax
FYI - this should be fixed by https://github.com/apache/beam/pull/14960

On Thu, Jun 3, 2021 at 10:00 AM Reuven Lax  wrote:

> Correct.
>
> On Thu, Jun 3, 2021 at 9:51 AM Kenneth Knowles  wrote:
>
>> I still don't quite grok the details of how this succeeds or fails in
>> different situations. The invalid row succeeds in serialization because the
>> coder is not sensitive to the way in which it is invalid?
>>
>> Kenn
>>
>> On Wed, Jun 2, 2021 at 2:54 PM Brian Hulette  wrote:
>>
>>> > One thing that's been on the back burner for a long time is making
>>> CoderProperties into a CoderTester like Guava's EqualityTester.
>>>
>>> Reuven's point still applies here though. This issue is not due to a bug
>>> in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode.
>>> I'm assuming a CoderTester would require manually generating inputs right?
>>> These input Rows represent an illegal state that we wouldn't test with.
>>> (That being said I like the idea of a CoderTester in general)
>>>
>>> Brian
>>>
>>> On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles  wrote:
>>>
>>>> Mutability checking might catch that.
>>>>
>>>> I meant to suggest not putting the check in the pipeline, but offering
>>>> a testing discipline that will catch such issues. One thing that's been on
>>>> the back burner for a long time is making CoderProperties into a
>>>> CoderTester like Guava's EqualityTester. Then it can run through all the
>>>> properties without a user setting up test suites. Downside is that the test
>>>> failure signal gets aggregated.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette 
>>>> wrote:
>>>>
>>>>> Could the DirectRunner just do an equality check whenever it does an
>>>>> encode/decode? It sounds like it's already effectively performing
>>>>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>>>>> the equality check.
>>>>>
>>>>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>>>>>
>>>>>> There is no bug in the Coder itself, so that wouldn't catch it. We
>>>>>> could insert CoderProperties.coderDecodeEncodeEqual in a subsequent 
>>>>>> ParDo,
>>>>>> but if the Direct runner already does an encode/decode before that ParDo,
>>>>>> then that would have fixed the problem before we could see it.
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> Would it be caught by CoderProperties?
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> I don't think this bug is schema specific - we created a Java
>>>>>>>> object that is inconsistent with its encoded form, which could happen 
>>>>>>>> to
>>>>>>>> any transform.
>>>>>>>>
>>>>>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>>>>>> makes it hard to test using PAssert, as I believe that puts everything 
>>>>>>>> in a
>>>>>>>> side input, forcing an encoding/decoding.
>>>>>>>>
>>>>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> +dev 
>>>>>>>>>
>>>>>>>>> > I bet the DirectRunner is encoding and decoding in between,
>>>>>>>>> which fixes the object.
>>>>>>>>>
>>>>>>>>> Do we need better testing of schema-aware (and potentially other
>>>>>>>>> built-in) transforms in the face of fusion to root out issues like 
>>>>>>>>> this?
>>>>>>>>>
>>>>>>>>> Brian
>>>>>>>>>
>>>>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have some other work-related th

Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't create fields as nullable

2021-06-07 Thread Reuven Lax
I understand. However if the field is marked as optional in the proto
definition, Beam should create a nullable field.

On Mon, Jun 7, 2021 at 10:54 AM Andrew Kettmann 
wrote:

> Unfortunately, the problem is that we are taking serialized protobuf
> messages from pubsub and writing them to Avro, so I was taking the message,
> using the payload to create the object, then converting -> (Beam) Row ->
> GenericRecord (Avro) -> Write to storage. I was using the
> ProtoMessageSchema.schemaFor to go from protobuf generated code object ->
> beam Row and any nulled fields make it complain. Was hoping to use schemas
> and the like to not have to write manual conversion code. Sadly, just not
> the case due to the java nullable issues and naming of fields and the like
> (trying to access getIde24 instead of getIdE24).
>
> Writing up some wrapper classes to deal with this for now.
> ------
> *From:* Reuven Lax 
> *Sent:* Monday, June 7, 2021 11:27 AM
> *To:* user 
> *Subject:* Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't
> create fields as nullable
>
> That's why separate has_xxx methods are generated to test whether the
> specified field is present or not.
>
> On Mon, Jun 7, 2021 at 9:17 AM Thomas Fredriksen(External) <
> thomas.fredrik...@cognite.com> wrote:
>
> The problem is that protobuf primitives are represented in Java as
> primitives, which are not nullable.
>
> Ideally, they should be objects instead, but alas - no.
>
> The wrapper is a decent (but not perfect) workaround.
>
> On Mon, Jun 7, 2021, 18:01 Reuven Lax  wrote:
>
> I believe that as of proto 3.12, optional fields are supported directly -
> https://github.com/pseudomuto/protoc-gen-doc/issues/422
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fpseudomuto%2Fprotoc-gen-doc%2Fissues%2F422=04%7C01%7Cakettmann%40evolve24.com%7C8291eeb75fb943e9eec408d929d12e1d%7Ce36287f1b44849498093fe543a560976%7C0%7C0%7C637586801101192145%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=BLA%2F3zHnVOtA%2Bbhp8%2FodJ200D%2FZxzfIF7k%2FhDaDBhkU%3D=0>
> .  _think_ this should be supported by Beam (assuming Beam uses a
> new-enough proto library), but I'm not sure if it's been tested.
>
> On Mon, Jun 7, 2021 at 8:53 AM Andrew Kettmann 
> wrote:
>
> Thanks that looks like it is the issue, I appreciate the help.
> --
> *From:* Thomas Fredriksen(External) 
> *Sent:* Monday, June 7, 2021 12:53 AM
> *To:* user@beam.apache.org 
> *Subject:* Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't
> create fields as nullable
>
> Hi Andrew,
>
> From the documentation (
> https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.html
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.19.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fextensions%2Fprotobuf%2FProtoSchemaTranslator.html=04%7C01%7Cakettmann%40evolve24.com%7C8291eeb75fb943e9eec408d929d12e1d%7Ce36287f1b44849498093fe543a560976%7C0%7C0%7C637586801101202139%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=Chqfxu3Q7gaClBvffZeCzR%2BiKdWQxv8YT9HdbHZn%2BxM%3D=0>
> ):
>
> Protobuf wrapper classes are translated to nullable types, as follows.
>
>- google.protobuf.Int32Value maps to a nullable FieldType.INT32
>- google.protobuf.Int64Value maps to a nullable FieldType.INT64
>- google.protobuf.UInt32Value maps to a nullable
>FieldType.logicalType(new UInt32())
>- google.protobuf.UInt64Value maps to a nullable Field.logicalType(new
>UInt64())
>- google.protobuf.FloatValue maps to a nullable FieldType.FLOAT
>- google.protobuf.DoubleValue maps to a nullable FieldType.DOUBLE
>- google.protobuf.BoolValue maps to a nullable FieldType.BOOLEAN
>- google.protobuf.StringValue maps to a nullable FieldType.STRING
>- google.protobuf.BytesValue maps to a nullable FieldType.BYTES
>
> This means that you should use the google wrapper-types in order to
> achieve nullable fields.
> The wrapper is available here:
> https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/wrappers.proto
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fprotocolbuffers%2Fprotobuf%2Fblob%2Fmaster%2Fsrc%2Fgoogle%2Fprotobuf%2Fwrappers.proto=04%7C01%7Cakettmann%40evolve24.com%7C8291eeb75fb943e9eec408d929d12e1d%7Ce36287f1b44849498093fe543a560976%7C0%7C0%7C637586801101212136%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=zen27PRZD3G

Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't create fields as nullable

2021-06-07 Thread Reuven Lax
That's why separate has_xxx methods are generated to test whether the
specified field is present or not.

On Mon, Jun 7, 2021 at 9:17 AM Thomas Fredriksen(External) <
thomas.fredrik...@cognite.com> wrote:

> The problem is that protobuf primitives are represented in Java as
> primitives, which are not nullable.
>
> Ideally, they should be objects instead, but alas - no.
>
> The wrapper is a decent (but not perfect) workaround.
>
> On Mon, Jun 7, 2021, 18:01 Reuven Lax  wrote:
>
>> I believe that as of proto 3.12, optional fields are supported directly -
>> https://github.com/pseudomuto/protoc-gen-doc/issues/422 .  _think_ this
>> should be supported by Beam (assuming Beam uses a new-enough proto
>> library), but I'm not sure if it's been tested.
>>
>> On Mon, Jun 7, 2021 at 8:53 AM Andrew Kettmann 
>> wrote:
>>
>>> Thanks that looks like it is the issue, I appreciate the help.
>>> --
>>> *From:* Thomas Fredriksen(External) 
>>> *Sent:* Monday, June 7, 2021 12:53 AM
>>> *To:* user@beam.apache.org 
>>> *Subject:* Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't
>>> create fields as nullable
>>>
>>> Hi Andrew,
>>>
>>> From the documentation (
>>> https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.html
>>> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.19.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fextensions%2Fprotobuf%2FProtoSchemaTranslator.html=04%7C01%7Cakettmann%40evolve24.com%7C4bbc2c07b6594eaaf30308d929789be6%7Ce36287f1b44849498093fe543a560976%7C0%7C0%7C637586421013163228%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=iFaa1RyJbbRVIU%2FqnDoxvvJzXuxSrkMwilc8jYFh%2Btc%3D=0>
>>> ):
>>>
>>> Protobuf wrapper classes are translated to nullable types, as follows.
>>>
>>>- google.protobuf.Int32Value maps to a nullable FieldType.INT32
>>>- google.protobuf.Int64Value maps to a nullable FieldType.INT64
>>>- google.protobuf.UInt32Value maps to a nullable
>>>FieldType.logicalType(new UInt32())
>>>- google.protobuf.UInt64Value maps to a nullable
>>>Field.logicalType(new UInt64())
>>>- google.protobuf.FloatValue maps to a nullable FieldType.FLOAT
>>>- google.protobuf.DoubleValue maps to a nullable FieldType.DOUBLE
>>>- google.protobuf.BoolValue maps to a nullable FieldType.BOOLEAN
>>>- google.protobuf.StringValue maps to a nullable FieldType.STRING
>>>- google.protobuf.BytesValue maps to a nullable FieldType.BYTES
>>>
>>> This means that you should use the google wrapper-types in order to
>>> achieve nullable fields.
>>> The wrapper is available here:
>>> https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/wrappers.proto
>>> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fprotocolbuffers%2Fprotobuf%2Fblob%2Fmaster%2Fsrc%2Fgoogle%2Fprotobuf%2Fwrappers.proto=04%7C01%7Cakettmann%40evolve24.com%7C4bbc2c07b6594eaaf30308d929789be6%7Ce36287f1b44849498093fe543a560976%7C0%7C0%7C637586421013173227%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=ym8iX%2F4BnQsnbDfsj4sSa6F4xJEIinWSsPGnDsIc8ng%3D=0>
>>> .
>>>
>>> Hope it helps :)
>>>
>>> On Thu, Jun 3, 2021 at 6:48 PM Andrew Kettmann 
>>> wrote:
>>>
>>> Using org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema to
>>> create a beam schema from generated protobuf3 classes. However,
>>> org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator#beamFieldTypeFromSingularProtoField
>>> doesn't apply nullable to fields in the message. My understanding is that
>>> by default protobuf fields ARE optional, is that incorrect? Converting from
>>> a serialized message without values for some fields crashes when it tries
>>> to cast them to a Row since the Row is not expecting a field as nullable.
>>>
>>> Anyone have any advice regarding this? Modify the schema after it is
>>> generated by ProtoMessageSchema or is there another method/option I am
>>> missing?
>>>
>>>
>>> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.evolve24.com%2F=04%7C01%7Cakettmann%40evolve24.com%7C4bbc2c07b6594eaaf30308d929789be6%7Ce36287f1b44849498093fe543a560976%7C0%7C0%7C637586421013183214%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI

Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't create fields as nullable

2021-06-07 Thread Reuven Lax
I believe that as of proto 3.12, optional fields are supported directly -
https://github.com/pseudomuto/protoc-gen-doc/issues/422 .  _think_ this
should be supported by Beam (assuming Beam uses a new-enough proto
library), but I'm not sure if it's been tested.

On Mon, Jun 7, 2021 at 8:53 AM Andrew Kettmann 
wrote:

> Thanks that looks like it is the issue, I appreciate the help.
> --
> *From:* Thomas Fredriksen(External) 
> *Sent:* Monday, June 7, 2021 12:53 AM
> *To:* user@beam.apache.org 
> *Subject:* Re: [2.28.0] [Java] [protobuf] ProtoMessageSchema doesn't
> create fields as nullable
>
> Hi Andrew,
>
> From the documentation (
> https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.html
> 
> ):
>
> Protobuf wrapper classes are translated to nullable types, as follows.
>
>- google.protobuf.Int32Value maps to a nullable FieldType.INT32
>- google.protobuf.Int64Value maps to a nullable FieldType.INT64
>- google.protobuf.UInt32Value maps to a nullable
>FieldType.logicalType(new UInt32())
>- google.protobuf.UInt64Value maps to a nullable Field.logicalType(new
>UInt64())
>- google.protobuf.FloatValue maps to a nullable FieldType.FLOAT
>- google.protobuf.DoubleValue maps to a nullable FieldType.DOUBLE
>- google.protobuf.BoolValue maps to a nullable FieldType.BOOLEAN
>- google.protobuf.StringValue maps to a nullable FieldType.STRING
>- google.protobuf.BytesValue maps to a nullable FieldType.BYTES
>
> This means that you should use the google wrapper-types in order to
> achieve nullable fields.
> The wrapper is available here:
> https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/wrappers.proto
> 
> .
>
> Hope it helps :)
>
> On Thu, Jun 3, 2021 at 6:48 PM Andrew Kettmann 
> wrote:
>
> Using org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema to create
> a beam schema from generated protobuf3 classes. However,
> org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator#beamFieldTypeFromSingularProtoField
> doesn't apply nullable to fields in the message. My understanding is that
> by default protobuf fields ARE optional, is that incorrect? Converting from
> a serialized message without values for some fields crashes when it tries
> to cast them to a Row since the Row is not expecting a field as nullable.
>
> Anyone have any advice regarding this? Modify the schema after it is
> generated by ProtoMessageSchema or is there another method/option I am
> missing?
>
>
> 
>  *Andrew
> Kettmann*
> DevOps Engineer
> P: 1.314.596.2836 <(314)%20596-2836>
> [image: LinkedIn]
> 
>  [image:
> Twitter]
> 
>  [image:
> Instagram]
> 

Re: Merging two rows

2021-06-03 Thread Reuven Lax
Do you want them to be flattened, or as two subschemas of a top-level
schema?

On Thu, Jun 3, 2021 at 12:28 PM Matthew Ouyang 
wrote:

> I know there is a method to merge two Beam Schemas into a new Schema.  (
> https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/schemas/SchemaUtils.html#mergeWideningNullable-org.apache.beam.sdk.schemas.Schema-org.apache.beam.sdk.schemas.Schema-
> ).
>
> Is there a similar method for Beam Rows?
>


Re: RenameFields behaves differently in DirectRunner

2021-06-03 Thread Reuven Lax
Correct.

On Thu, Jun 3, 2021 at 9:51 AM Kenneth Knowles  wrote:

> I still don't quite grok the details of how this succeeds or fails in
> different situations. The invalid row succeeds in serialization because the
> coder is not sensitive to the way in which it is invalid?
>
> Kenn
>
> On Wed, Jun 2, 2021 at 2:54 PM Brian Hulette  wrote:
>
>> > One thing that's been on the back burner for a long time is making
>> CoderProperties into a CoderTester like Guava's EqualityTester.
>>
>> Reuven's point still applies here though. This issue is not due to a bug
>> in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode.
>> I'm assuming a CoderTester would require manually generating inputs right?
>> These input Rows represent an illegal state that we wouldn't test with.
>> (That being said I like the idea of a CoderTester in general)
>>
>> Brian
>>
>> On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles  wrote:
>>
>>> Mutability checking might catch that.
>>>
>>> I meant to suggest not putting the check in the pipeline, but offering a
>>> testing discipline that will catch such issues. One thing that's been on
>>> the back burner for a long time is making CoderProperties into a
>>> CoderTester like Guava's EqualityTester. Then it can run through all the
>>> properties without a user setting up test suites. Downside is that the test
>>> failure signal gets aggregated.
>>>
>>> Kenn
>>>
>>> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette 
>>> wrote:
>>>
>>>> Could the DirectRunner just do an equality check whenever it does an
>>>> encode/decode? It sounds like it's already effectively performing
>>>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>>>> the equality check.
>>>>
>>>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>>>>
>>>>> There is no bug in the Coder itself, so that wouldn't catch it. We
>>>>> could insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo,
>>>>> but if the Direct runner already does an encode/decode before that ParDo,
>>>>> then that would have fixed the problem before we could see it.
>>>>>
>>>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> Would it be caught by CoderProperties?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> I don't think this bug is schema specific - we created a Java object
>>>>>>> that is inconsistent with its encoded form, which could happen to any
>>>>>>> transform.
>>>>>>>
>>>>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>>>>> makes it hard to test using PAssert, as I believe that puts everything 
>>>>>>> in a
>>>>>>> side input, forcing an encoding/decoding.
>>>>>>>
>>>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +dev 
>>>>>>>>
>>>>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>>>>> fixes the object.
>>>>>>>>
>>>>>>>> Do we need better testing of schema-aware (and potentially other
>>>>>>>> built-in) transforms in the face of fusion to root out issues like 
>>>>>>>> this?
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I have some other work-related things I need to do this week, so I
>>>>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>>>>> explanation.  It makes perfect sense now.
>>>>>>>>>
>>>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>&g

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Reuven Lax
There is no bug in the Coder itself, so that wouldn't catch it. We could
insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
the Direct runner already does an encode/decode before that ParDo, then
that would have fixed the problem before we could see it.

On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:

> Would it be caught by CoderProperties?
>
> Kenn
>
> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>
>> I don't think this bug is schema specific - we created a Java object that
>> is inconsistent with its encoded form, which could happen to any transform.
>>
>> This does seem to be a gap in DirectRunner testing though. It also makes
>> it hard to test using PAssert, as I believe that puts everything in a side
>> input, forcing an encoding/decoding.
>>
>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette  wrote:
>>
>>> +dev 
>>>
>>> > I bet the DirectRunner is encoding and decoding in between, which
>>> fixes the object.
>>>
>>> Do we need better testing of schema-aware (and potentially other
>>> built-in) transforms in the face of fusion to root out issues like this?
>>>
>>> Brian
>>>
>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
>>> wrote:
>>>
>>>> I have some other work-related things I need to do this week, so I will
>>>> likely report back on this over the weekend.  Thank you for the
>>>> explanation.  It makes perfect sense now.
>>>>
>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>>>
>>>>> Some more context - the problem is that RenameFields outputs (in this
>>>>> case) Java Row objects that are inconsistent with the actual schema.
>>>>> For example if you have the following schema:
>>>>>
>>>>> Row {
>>>>>field1: Row {
>>>>>   field2: string
>>>>> }
>>>>> }
>>>>>
>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>
>>>>> Row {
>>>>>   field1: Row {
>>>>>  renamed: string
>>>>>}
>>>>> }
>>>>>
>>>>> However the Java object for the _nested_ row will return the old
>>>>> schema if getSchema() is called on it. This is because we only update the
>>>>> schema on the top-level row.
>>>>>
>>>>> I think this explains why your test works in the direct runner. If the
>>>>> row ever goes through an encode/decode path, it will come back correct. 
>>>>> The
>>>>> original incorrect Java objects are no longer around, and new (consistent)
>>>>> objects are constructed from the raw data and the PCollection schema.
>>>>> Dataflow tends to fuse ParDos together, so the following ParDo will see 
>>>>> the
>>>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>>>>> between, which fixes the object.
>>>>>
>>>>> You can validate this theory by forcing a shuffle after RenameFields
>>>>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>>>>> work on a fix to RenameFields.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>>>>
>>>>>> Aha, yes this indeed another bug in the transform. The schema is set
>>>>>> on the top-level Row but not on any nested rows.
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>>>> respond to all feedback in a single message rather than messages per 
>>>>>>> person.
>>>>>>>
>>>>>>>- NeedsRunner - The tests are run eventually, so obviously all
>>>>>>>good on my end.  I was trying to run the smallest subset of test 
>>>>>>> cases
>>>>>>>possible and didn't venture beyond `gradle test`.
>>>>>>>- Stack Trace - There wasn't any unfortunately because no
>>>>>>>exception thrown in the code.  The Beam Row was translated into a BQ
>>>>>>>TableRow and an insertion was attempted.  The error "message" was 
>>>>&

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Reuven Lax
I don't think this bug is schema specific - we created a Java object that
is inconsistent with its encoded form, which could happen to any transform.

This does seem to be a gap in DirectRunner testing though. It also makes it
hard to test using PAssert, as I believe that puts everything in a side
input, forcing an encoding/decoding.

On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette  wrote:

> +dev 
>
> > I bet the DirectRunner is encoding and decoding in between, which fixes
> the object.
>
> Do we need better testing of schema-aware (and potentially other built-in)
> transforms in the face of fusion to root out issues like this?
>
> Brian
>
> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
> wrote:
>
>> I have some other work-related things I need to do this week, so I will
>> likely report back on this over the weekend.  Thank you for the
>> explanation.  It makes perfect sense now.
>>
>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>
>>> Some more context - the problem is that RenameFields outputs (in this
>>> case) Java Row objects that are inconsistent with the actual schema.
>>> For example if you have the following schema:
>>>
>>> Row {
>>>field1: Row {
>>>   field2: string
>>> }
>>> }
>>>
>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>
>>> Row {
>>>   field1: Row {
>>>  renamed: string
>>>}
>>> }
>>>
>>> However the Java object for the _nested_ row will return the old schema
>>> if getSchema() is called on it. This is because we only update the schema
>>> on the top-level row.
>>>
>>> I think this explains why your test works in the direct runner. If the
>>> row ever goes through an encode/decode path, it will come back correct. The
>>> original incorrect Java objects are no longer around, and new (consistent)
>>> objects are constructed from the raw data and the PCollection schema.
>>> Dataflow tends to fuse ParDos together, so the following ParDo will see the
>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>>> between, which fixes the object.
>>>
>>> You can validate this theory by forcing a shuffle after RenameFields
>>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>>> work on a fix to RenameFields.
>>>
>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>>
>>>> Aha, yes this indeed another bug in the transform. The schema is set on
>>>> the top-level Row but not on any nested rows.
>>>>
>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang 
>>>> wrote:
>>>>
>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>> respond to all feedback in a single message rather than messages per 
>>>>> person.
>>>>>
>>>>>- NeedsRunner - The tests are run eventually, so obviously all
>>>>>good on my end.  I was trying to run the smallest subset of test cases
>>>>>possible and didn't venture beyond `gradle test`.
>>>>>- Stack Trace - There wasn't any unfortunately because no
>>>>>exception thrown in the code.  The Beam Row was translated into a BQ
>>>>>TableRow and an insertion was attempted.  The error "message" was part 
>>>>> of
>>>>>the response JSON that came back as a result of a request against the 
>>>>> BQ
>>>>>API.
>>>>>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>>>field0_1.nestedStringField is what I am looking for.
>>>>>- Info Logging Findings (In Lieu of a Stack Trace)
>>>>>   - The Beam Schema was as expected with all renames applied.
>>>>>   - The example I provided was heavily stripped down in order to
>>>>>   isolate the problem.  My work example which a bit impractical 
>>>>> because it's
>>>>>   part of some generic tooling has 4 levels of nesting and also 
>>>>> produces the
>>>>>   correct output too.
>>>>>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>>>>   in DirectRunner.  In DataflowRunner however, only the top-level 
>>>>> renames
>>>>>   were reflected in the TableRow and all renames in the nested fields 
>>>>> weren't.
>&g

Re: RenameFields behaves differently in DirectRunner

2021-06-01 Thread Reuven Lax
Some more context - the problem is that RenameFields outputs (in this case)
Java Row objects that are inconsistent with the actual schema. For example
if you have the following schema:

Row {
   field1: Row {
  field2: string
}
}

And rename field1.field2 -> renamed, you'll get the following schema

Row {
  field1: Row {
 renamed: string
   }
}

However the Java object for the _nested_ row will return the old schema if
getSchema() is called on it. This is because we only update the schema on
the top-level row.

I think this explains why your test works in the direct runner. If the row
ever goes through an encode/decode path, it will come back correct. The
original incorrect Java objects are no longer around, and new (consistent)
objects are constructed from the raw data and the PCollection schema.
Dataflow tends to fuse ParDos together, so the following ParDo will see the
incorrect Row object. I bet the DirectRunner is encoding and decoding in
between, which fixes the object.

You can validate this theory by forcing a shuffle after RenameFields using
Reshufflle. It should fix the issue If it does, let me know and I'll work
on a fix to RenameFields.

On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:

> Aha, yes this indeed another bug in the transform. The schema is set on
> the top-level Row but not on any nested rows.
>
> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang 
> wrote:
>
>> Thank you everyone for your input.  I believe it will be easiest to
>> respond to all feedback in a single message rather than messages per person.
>>
>>- NeedsRunner - The tests are run eventually, so obviously all good
>>on my end.  I was trying to run the smallest subset of test cases possible
>>and didn't venture beyond `gradle test`.
>>- Stack Trace - There wasn't any unfortunately because no exception
>>thrown in the code.  The Beam Row was translated into a BQ TableRow and an
>>insertion was attempted.  The error "message" was part of the response 
>> JSON
>>that came back as a result of a request against the BQ API.
>>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>field0_1.nestedStringField is what I am looking for.
>>- Info Logging Findings (In Lieu of a Stack Trace)
>>   - The Beam Schema was as expected with all renames applied.
>>   - The example I provided was heavily stripped down in order to
>>   isolate the problem.  My work example which a bit impractical because 
>> it's
>>   part of some generic tooling has 4 levels of nesting and also produces 
>> the
>>   correct output too.
>>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
>>   DirectRunner.  In DataflowRunner however, only the top-level renames 
>> were
>>   reflected in the TableRow and all renames in the nested fields weren't.
>>   - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>   uses the Row.schema to get the field names.  This makes sense to me, 
>> but if
>>   a value is actually a Row then its schema appears to be inconsistent 
>> with
>>   the top-level schema
>>- My Current Workaround - I forked RenameFields and replaced the
>>attachValues in expand method to be a "deep" rename.  This is obviously
>>inefficient and I will not be submitting a PR for that.
>>- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>
>>
>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>>
>>> This transform is the same across all runners. A few comments on the
>>> test:
>>>
>>>   - Using attachValues directly is error prone (per the comment on the
>>> method). I recommend using the withFieldValue builders instead.
>>>   - I recommend capturing the RenameFields PCollection into a local
>>> variable of type PCollection and printing out the schema (which you
>>> can get using the PCollection.getSchema method) to ensure that the output
>>> schema looks like you expect.
>>>- RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>>> flatten, then the better transform would be
>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>
>>> This all being said, eyeballing the implementation of RenameFields makes
>>> me think that it is buggy in the case where you specify a top-level field
>>> multiple times like you do. I think it is simply adding the top-level field
>>> into the output schema multiple times, and the second time is with t

Re: RenameFields behaves differently in DirectRunner

2021-06-01 Thread Reuven Lax
Aha, yes this indeed another bug in the transform. The schema is set on the
top-level Row but not on any nested rows.

On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang 
wrote:

> Thank you everyone for your input.  I believe it will be easiest to
> respond to all feedback in a single message rather than messages per person.
>
>- NeedsRunner - The tests are run eventually, so obviously all good on
>my end.  I was trying to run the smallest subset of test cases possible and
>didn't venture beyond `gradle test`.
>- Stack Trace - There wasn't any unfortunately because no exception
>thrown in the code.  The Beam Row was translated into a BQ TableRow and an
>insertion was attempted.  The error "message" was part of the response JSON
>that came back as a result of a request against the BQ API.
>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>field0_1.nestedStringField is what I am looking for.
>- Info Logging Findings (In Lieu of a Stack Trace)
>   - The Beam Schema was as expected with all renames applied.
>   - The example I provided was heavily stripped down in order to
>   isolate the problem.  My work example which a bit impractical because 
> it's
>   part of some generic tooling has 4 levels of nesting and also produces 
> the
>   correct output too.
>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
>   DirectRunner.  In DataflowRunner however, only the top-level renames 
> were
>   reflected in the TableRow and all renames in the nested fields weren't.
>   - BigQueryUtils.toTableRow(Row) recurses on the Row values and uses
>   the Row.schema to get the field names.  This makes sense to me, but if a
>   value is actually a Row then its schema appears to be inconsistent with 
> the
>   top-level schema
>- My Current Workaround - I forked RenameFields and replaced the
>attachValues in expand method to be a "deep" rename.  This is obviously
>inefficient and I will not be submitting a PR for that.
>- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>
>
> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>
>> This transform is the same across all runners. A few comments on the test:
>>
>>   - Using attachValues directly is error prone (per the comment on the
>> method). I recommend using the withFieldValue builders instead.
>>   - I recommend capturing the RenameFields PCollection into a local
>> variable of type PCollection and printing out the schema (which you
>> can get using the PCollection.getSchema method) to ensure that the output
>> schema looks like you expect.
>>- RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>> flatten, then the better transform would be
>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>
>> This all being said, eyeballing the implementation of RenameFields makes
>> me think that it is buggy in the case where you specify a top-level field
>> multiple times like you do. I think it is simply adding the top-level field
>> into the output schema multiple times, and the second time is with the
>> field0_1 base name; I have no idea why your test doesn't catch this in the
>> DirectRunner, as it's equally broken there. Could you file a JIRA about
>> this issue and assign it to me?
>>
>> Reuven
>>
>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette 
>>> wrote:
>>>
>>>> Hi Matthew,
>>>>
>>>> > The unit tests also seem to be disabled for this as well and so I
>>>> don’t know if the PTransform behaves as expected.
>>>>
>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>> framework. NeedsRunner indicates that a test suite can't be executed with
>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>> don't run the test when we're verifying the SDK by itself in the
>>>> :sdks:java:core:test task. The test is still run in other tasks where we
>>>> have a runner, most notably in the Java PreCommit [1], where we run it as
>>>> part of the :runners:direct-java:test task.
>>>>
>>>> That being said, we may only run these tests continuously with the
>>>> DirectRunner, I'm not sure if we test them on all the runners like we do
>>>> with ValidatesRunner tests.
>>>>
>>>
>>> T

Re: RenameFields behaves differently in DirectRunner

2021-06-01 Thread Reuven Lax
This transform is the same across all runners. A few comments on the test:

  - Using attachValues directly is error prone (per the comment on the
method). I recommend using the withFieldValue builders instead.
  - I recommend capturing the RenameFields PCollection into a local
variable of type PCollection and printing out the schema (which you
can get using the PCollection.getSchema method) to ensure that the output
schema looks like you expect.
   - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
nestedStringField results in field0_1.nestedStringField; if you wanted to
flatten, then the better transform would be
Select.fieldNameAs("field0_1.field1_0", nestedStringField).

This all being said, eyeballing the implementation of RenameFields makes me
think that it is buggy in the case where you specify a top-level field
multiple times like you do. I think it is simply adding the top-level field
into the output schema multiple times, and the second time is with the
field0_1 base name; I have no idea why your test doesn't catch this in the
DirectRunner, as it's equally broken there. Could you file a JIRA about
this issue and assign it to me?

Reuven

On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles  wrote:

>
>
> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette  wrote:
>
>> Hi Matthew,
>>
>> > The unit tests also seem to be disabled for this as well and so I don’t
>> know if the PTransform behaves as expected.
>>
>> The exclusion for NeedsRunner tests is just a quirk in our testing
>> framework. NeedsRunner indicates that a test suite can't be executed with
>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>> don't run the test when we're verifying the SDK by itself in the
>> :sdks:java:core:test task. The test is still run in other tasks where we
>> have a runner, most notably in the Java PreCommit [1], where we run it as
>> part of the :runners:direct-java:test task.
>>
>> That being said, we may only run these tests continuously with the
>> DirectRunner, I'm not sure if we test them on all the runners like we do
>> with ValidatesRunner tests.
>>
>
> That is correct. The tests are tests _of the transform_ so they run only
> on the DirectRunner. They are not tests of the runner, which is only
> responsible for correctly implementing Beam's primitives. The transform
> should not behave differently on different runners, except for fundamental
> differences in how they schedule work and checkpoint.
>
> Kenn
>
>
>> > The error message I’m receiving, : Error while reading data, error
>> message: JSON parsing error in row starting at position 0: No such field:
>> nestedField.field1_0, suggests the BigQuery is trying to use the
>> original name for the nested field and not the substitute name.
>>
>> Is there a stacktrace associated with this error? It would be helpful to
>> see where the error is coming from.
>>
>> Brian
>>
>>
>> [1]
>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>
>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang 
>> wrote:
>>
>>> I’m trying to use the RenameFields transform prior to inserting into
>>> BigQuery on nested fields.  Insertion into BigQuery is successful with
>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields
>>>  The error message I’m receiving, : Error while reading data, error
>>> message: JSON parsing error in row starting at position 0: No such field:
>>> nestedField.field1_0, suggests the BigQuery is trying to use the
>>> original name for the nested field and not the substitute name.
>>>
>>> The code for RenameFields seems simple enough but does it behave
>>> differently in different runners?  Will a deep attachValues be necessary in
>>> order get the nested renames to work across all runners? Is there something
>>> wrong in my code?
>>>
>>>
>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>
>>> The unit tests also seem to be disabled for this as well and so I don’t
>>> know if the PTransform behaves as expected.
>>>
>>>
>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>
>>>
>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>
>>> package ca.loblaw.cerebro.PipelineControl;

 import com.google.api.services.bigquery.model.TableReference;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
 ;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.RenameFields;
 import 

Re: Why is GroupBy involved in the file save operation?

2021-05-21 Thread Reuven Lax
On Fri, May 21, 2021 at 4:35 PM Tao Li  wrote:

> Reuven thanks for your response.  GroupBy is not involved if we are not
> specifying fixed number of files, correct?
>

Correct.

>
>
> And what’s the implication of not specifying the shard number? Is the
> parallelism determined by the number of spark executors that hold data to
> save? This is assuming we are using spark runner.
>

Each bundle becomes a file. I'm not entirely sure how the spark runner
determines what the bundles should be.


>
>
> What would be the best practice? Specifying the fixed shard number or
> asking beam to figure it out for us?
>
>
>
> *From: *Reuven Lax 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Friday, May 21, 2021 at 4:27 PM
> *To: *user 
> *Cc: *Lian Jiang 
> *Subject: *Re: Why is GroupBy involved in the file save operation?
>
>
>
> What you describe is what happens (at least in the Dataflow runner) if
> auto sharding is specified in batch. This mechanism tries to split the
> PColllection to fully utilize every worker, so is not appropriate when a
> fixed number of shards is desired. A GroupByKey is also necessary in
> streaming in order to split an unbounded PColllection using
> windows/triggers, as windows and triggers are applied during GroupByKey.
>
>
>
> On Fri, May 21, 2021 at 4:16 PM Tao Li  wrote:
>
> Hi Beam community,
>
>
>
> I wonder why a GroupBy operation is involved in WriteFiles:
> https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/WriteFiles.html
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.29.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FWriteFiles.html=04%7C01%7Ctaol%40zillow.com%7Cb15e8362b9f84ca3794508d91cb00d54%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637572364724252236%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=rOcYW37RAqNkyPX%2BiuA60jBsMyk9OtNMy8y5D4hhNL8%3D=0>
>
>
>
> This doc mentioned “ The exact parallelism of the write stage can be
> controlled using withNumShards(int)
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.29.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FWriteFiles.html%23withNumShards-int-=04%7C01%7Ctaol%40zillow.com%7Cb15e8362b9f84ca3794508d91cb00d54%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637572364724262189%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Id%2BDaRGTvkksknEAOF5HZDhiOlaSsNluxfc38oGvoTA%3D=0>,
> typically used to control how many files are produced or to globally limit
> the number of workers connecting to an external service. However, this
> option can often hurt performance: it adds an additional GroupByKey
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.29.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Ftransforms%2FGroupByKey.html=04%7C01%7Ctaol%40zillow.com%7Cb15e8362b9f84ca3794508d91cb00d54%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637572364724262189%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=unk2nGlUp5ey77DPFwEdceM%2FMQ30RjK0TtxgQzc7g4Q%3D=0>
>  to
> the pipeline.”
>
>
>
> When we are saving the PCollection into multiple files, why can’t we
> simply split the PCollection without a key and save each split as a file?
>
>
>
> Thanks!
>
>


Re: Why is GroupBy involved in the file save operation?

2021-05-21 Thread Reuven Lax
What you describe is what happens (at least in the Dataflow runner) if auto
sharding is specified in batch. This mechanism tries to split the
PColllection to fully utilize every worker, so is not appropriate when a
fixed number of shards is desired. A GroupByKey is also necessary in
streaming in order to split an unbounded PColllection using
windows/triggers, as windows and triggers are applied during GroupByKey.

On Fri, May 21, 2021 at 4:16 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I wonder why a GroupBy operation is involved in WriteFiles:
> https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/WriteFiles.html
>
>
>
> This doc mentioned “ The exact parallelism of the write stage can be
> controlled using withNumShards(int)
> ,
> typically used to control how many files are produced or to globally limit
> the number of workers connecting to an external service. However, this
> option can often hurt performance: it adds an additional GroupByKey
> 
>  to
> the pipeline.”
>
>
>
> When we are saving the PCollection into multiple files, why can’t we
> simply split the PCollection without a key and save each split as a file?
>
>
>
> Thanks!
>


Re: Batch load with BigQueryIO fails because of a few bad records.

2021-05-07 Thread Reuven Lax
ignoreUnknownValues is supported for BATCH_LOADS as well.

On Fri, May 7, 2021 at 7:08 AM Matthew Ouyang 
wrote:

> Thank you for responding Evan.  It looks like these options will only work
> for STREAMING_INSERTS.  Are there any options for BATCH_LOADS, and if not
> are there any plans for it?
>
> On Thu, May 6, 2021 at 6:11 PM Evan Galpin  wrote:
>
>> Hey Matthew,
>>
>> I believe you might also need to use the “ignoreUnknownValues”[1] or
>> skipInvalidRows[2] options depending on your use case if your goal is to
>> allow valid entities to succeed even if invalid entities exist and
>> separately process failed entities via “getFailedResults”. You could also
>> consider introducing additional validation before the Write step if the
>> failure modes are predictable.
>>
>> Thanks,
>> Evan
>>
>> [1]
>>
>> https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#ignoreUnknownValues--
>> [2]
>>
>> https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#skipInvalidRows--
>>
>> On Thu, May 6, 2021 at 18:01 Matthew Ouyang 
>> wrote:
>>
>>> I am loading a batch load of records with BigQueryIO.Write, but because
>>> some records don't match the target table schema the entire and the write
>>> step fails and nothing gets written to the table.  Is there a way for
>>> records that do match the target table schema to be inserted, and the
>>> records that don't match don't cause the entire step to fail?  I noticed
>>> BigQueryIO.Write returns a WriteResult that has a method getFailedInserts.
>>> Will that meet my needs?
>>>
>>


Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Reuven Lax
You can definitely group by processing time. The way to do this in Beam is
as follows

Window.into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
.discardingFiredPanes());

The syntax is a bit unfortunately wordy, but the idea is that you are
creating a single event-time window that encompasses all time, and
"triggering" an aggregation every 30 seconds based on processing time.

On Fri, Apr 23, 2021 at 8:14 AM Tao Li  wrote:

> Thanks @Kenneth Knowles . I understand we need to
> specify a window for groupby so that the app knowns when processing is
> “done” to output result.
>
>
>
> Is it possible to specify a event arrival/processing time based window for
> groupby? The purpose is to avoid dropping of late events. With a event
> processing time based window, the app will periodically output the result
> based on all events that arrived in that window, and a late arriving event
> will fall into whatever window covers its arrival time and thus that late
> data will not get lost.
>
>
>
> Does Beam support this kind of mechanism? Thanks.
>
>
>
> *From: *Kenneth Knowles 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Thursday, April 22, 2021 at 1:49 PM
> *To: *user 
> *Cc: *Kelly Smith , Lian Jiang <
> li...@zillowgroup.com>
> *Subject: *Re: Question on late data handling in Beam streaming mode
>
>
>
> Hello!
>
>
>
> In a streaming app, you have two choices: wait forever and never have any
> output OR use some method to decide that aggregation is "done".
>
>
>
> In Beam, the way you decide that aggregation is "done" is the watermark.
> When the watermark predicts no more data for an aggregation, then the
> aggregation is done. For example GROUP BY  is "done" when no more
> data will arrive for that minute. At this point, your result is produced.
> More data may arrive, and it is ignored. The watermark is determined by the
> IO connector to be the best heuristic available. You can configure "allowed
> lateness" for an aggregation to allow out of order data.
>
>
>
> Kenn
>
>
>
> On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:
>
> Hi Beam community,
>
>
>
> I am wondering if there is a risk of losing late data from a Beam stream
> app due to watermarking?
>
>
>
> I just went through this design doc and noticed the “droppable” definition
> there:
> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
> 
>
>
>
> Can you please confirm if it’s possible for us to lose some data in a
> stream app in practice? If that’s possible, what would be the best practice
> to avoid data loss? Thanks!
>
>
>
>


Re: Running averages -- but global window state might not be enough

2021-04-21 Thread Reuven Lax
Can you store the history in an OrderedListState? This state should allow
you to efficiently delete old versions.

On Tue, Apr 20, 2021 at 9:58 AM Raman Gupta  wrote:

> I have a running average problem. As I understand it, the traditional Beam
> solution is state in a global window, but I'm not quite sure how to
> approach it for my use case, which is a bit more complex.
>
> I have a "score" output every 5 minutes based on a timer, up to a maximum
> of 1 hour after some time, depending on the arrival times of a few input
> events per day.
>
> The output of this initial part of the pipeline is 1) versioned, so when
> running the pipeline in batch mode, or dealing with up to 3-day late
> inputs, the score in the output system is continuously updated (and outputs
> from out-of-order inputs are ignored) and 2) aggregated into a daily score
> along with inputs coming from other pipeline branches, which is also
> continuously updated part-way through the day with early and late triggers.
>
> Now, I need to calculate the running average of the individual scores
> output every 5 minutes multiple times per day, and factor those into the
> overall aggregated daily score. The running average should consider only
> the highest version score for each day on which there are scores.
>
> I don't see how I can do this with global windows without keeping a full
> history of the latest score and version on every previous day, which will
> grow without bound. Or am I missing something?
>
> Thanks,
> Raman
>
>


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Reuven Lax
I believe that the Wait transform turns this output into a side input, so
outputting the input PCollection might be problematic.

On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am just
> reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn I see a big problem:
>
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
>
> @ProcessElement
>   public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>   writer.mutate(c.element());
> }
>
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element as
> "has been written". And you cannot output from the @TearDown method which
> is just for cleaning up resources.
>
> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>
>> How about a PCollection containing every element which was successfully
>> written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully written
>> to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:
>>>
 +dev

 Since we all agree that we should return something different than
 PDone the real question is what should we return.

>>>
>>> My proposal is that one returns a PCollection that consists,
>>> internally, of something contentless like nulls. This is future compatible
>>> with returning something more maningful based on the source source or write
>>> process itself, but at least this would be followable.
>>>
>>>
 As a reminder we had a pretty interesting discussion about this
 already in the past but uniformization of our return values has not
 happened.
 This thread is worth reading for Vincent or anyone who wants to
 contribute Write transforms that return.

 https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>

 > Returning PDone is an anti-pattern that should be avoided, but
 changing it now would be backwards incompatible.

 Periodic reminder most IOs are still Experimental so I suppose it is
 worth to the maintainers to judge if the upgrade to return someething
 different of PDone is worth, in that case we can deprecate and remove
 the previous signature in short time (2 releases was the average for
 previous cases).


 On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
  wrote:
 >
 > I thought that was said about returning a PCollection of write
 results as it’s done in other IOs (as I mentioned as examples) that have
 _additional_ write methods, like “withWriteResults()” etc, that return
 PTransform<…, PCollection>.
 > In this case, we keep backwards compatibility and just add new
 funtionality. Though, we need to follow the same pattern for user API and
 maybe even naming for this feature across different IOs (like we have for
 "readAll()” methods).
 >
 >  I agree that we have to avoid returning PDone for such cases.
 >
 > On 24 Mar 2021, at 20:05, Robert Bradshaw 
 wrote:
 >
 > Returning PDone is an anti-pattern that should be avoided, but
 changing it now would be backwards incompatible. PRs to add non-PDone
 returning variants (probably as another option to the builders) that
 compose well with Wait, etc. would be welcome.
 >
 > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
 aromanenko@gmail.com> wrote:
 >>
 >> In this way, I think “Wait” PTransform should work for you but, as
 it was mentioned before, it doesn’t work with PDone, only with PCollection
 as a signal.
 >>
 >> Since you already adjusted your own writer for that, it would be
 great to contribute it back to Beam in the way as it was done for other IOs
 (for example, JdbcIO [1] or BigtableIO [2])
 >>
 >> In general, I think we need to have it for all IOs, at least to use
 with “Wait” because this pattern it's quite often required.
 >>
 >> [1]
 https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
 >> [2]
 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Reuven Lax
Does that work if cassandra returns a PDone?

On Wed, Mar 24, 2021 at 10:00 AM Chamikara Jayalath 
wrote:

> If you want to wait for all records are written (per window) to Cassandra
> before writing that window to PubSub, you should be able to use the Wait
> transform:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
>
> Thanks,
> Cham
>
> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko 
> wrote:
>
>> Do you want to wait for ALL records are written for Cassandra and then
>> write all successfully written records to PubSub or it should be performed
>> "record by record"?
>>
>> On 24 Mar 2021, at 04:58, Vincent Marquez 
>> wrote:
>>
>> I have a common use case where my pipeline looks like this:
>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>>
>> I do NOT want my pipeline to look like the following:
>>
>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>  |
>>   ->
>> PubsubIO.write
>>
>> Because I need to ensure that only items written to Pubsub have
>> successfully finished a (quorum) write.
>>
>> Since CassandraIO.write is a PTransform I can't actually use it
>> here so I often roll my own 'writer', but maybe there is a recommended way
>> of doing this?
>>
>> Thanks in advance for any help.
>>
>> *~Vincent*
>>
>>
>>


Re: Merging panes of a window

2021-01-29 Thread Reuven Lax
I think it's possible to accomplish this without a custom WindowFn. Your
naming function in FileIO should be able to access the element itself, not
just the window. I would have your stateful DoFn attach a sequence number
to each element (store an extra ValueState or CombiningState, and increment
it on every element). Then attach this number to the output element. You
could do this by outputting a KV (so your output would be KV>) or you could use Beam schemas for your output type - your
choice. Then have your naming function use this sequence number to name the
output files.

On Fri, Jan 29, 2021 at 2:32 AM Pradyumna Achar 
wrote:

>
> Thank you, that worked.
> One small glitch was that the trigger is not guaranteed to fire for every
> element. (AfterPane.elementCountAtLeast(1) might fire after one element, or
> more). When it fires for more than one of those iterables, I get
> double/triple/etc of the intended file sizes, which is undesirable.
> To overcome this, I created another window class, similar to the
> IntervalWindow (called it BespokeIntervalWindow, which is the same thing as
> the IntervalWindow except that there is an extra UUID field in it, in
> addition to the "start" and "end" fields, that keeps each such window
> distinct), and I have a WindowFn that gets the existing IntervalWindow from
> the AssignContext and assigns an equivalent BespokeIntervalWindow to each
> iterable in its assignWindow method. I then set
> the AfterPane.elementCountAtLeast(1) trigger on this WindowFn to make it
> not wait till the end of the interval to emit the pane.
>
>
>


Re: Merging panes of a window

2021-01-28 Thread Reuven Lax
You should be able to simply rewindow _after_ the Stateful DoFn. In this
window you do want to trigger on every element, as now each element is an
iterable.

On Thu, Jan 28, 2021 at 10:20 PM Pradyumna Achar 
wrote:

> Thank you. I tried it out and it does pass elements to the stateful DoFn
> as and when they arrive instead of waiting for the window to finish, the
> way you describe.
>
> However, with this, the FileIO does not write the file until the whole
> window is done, even though the stateful DoFn outputs iterables in chunks
> of 100MB. It only receives a single ON_TIME pane and writes a big file of
> the entire window.
>
> Wondering if there is a way to tell that each iterable is a separate pane
> or something like that..
>
>>


Re: Merging panes of a window

2021-01-28 Thread Reuven Lax
On Thu, Jan 28, 2021 at 8:42 PM Pradyumna Achar 
wrote:

> hmm, no I need the triggers on the window for two reasons:
>
>   1. Say I get ~6GB of data for each of those hourly windows, and I let
> the window fire only after the watermark naturally crosses, I would need to
> store that 6GB in memory. Whereas if I let early firings happen often, and
> let the stateful DoFn output whenever it has received 100MB worth of data,
> the memory requirement comes down significantly.
>

I think you're misunderstanding. stateful DoFns receive elements as they
arrive, regardless of triggering and windowing. The only semantic meaning
of windows for stateful DoFns is garbage collection -  there is a separate
state per window, and when a window expires, the state for that window is
garbage collected. The stateful DoFn will not wait until the end of the
window to process the elements.

The behaviour you describe is how GroupByKey (and similar aggregating
transforms, such as count and other combiners) work. However it is not true
that all that data needs to be stored in memory. Generally runners shuffle
the data using disk or an external shuffle service. The grouped data is
then read by downstream transforms, but generally does not have to fit in
memory.


>
>  2. The window size could be larger than an hour, maybe a day. Early
> firings would let 100MB-pieces of the data be written and get picked up by
> downstream systems at a reduced latency instead of waiting for everything
> to arrive.
>
> GroupIntoBatches doesn't work on sizes (in terms of bytes) AFAIK
>


Re: Merging panes of a window

2021-01-28 Thread Reuven Lax
There is no need for a trigger here at all. The stateful DoFn will process
elements as they arrive, so you don't need to set any triggering.

Also, have you seen the GroupIntoBaches transform? It may already do what
you are trying to do.

On Thu, Jan 28, 2021 at 8:00 PM Pradyumna Achar 
wrote:

> Hello,
>
> I am running into a strange situation trying to use windows and FileIO
> properly.
>
> I have a KafkaIO source, followed by a DoFn that assigns timestamps based
> on a field in the input record using outputWithTimestamp. After that, I
> apply FixedWindows of 1 hour duration on these elements. I need to write
> these windows to disk in parts, with a constraint that each part be of a
> certain size (except the last part).
>
> So, I made the FixedWindow trigger repeatedly, once per element, and
> implemented a stateful DoFn that collects these elements until the size
> limit is reached and outputs an Iterable.
>
> However, now I see that these elements are in separate panes. FileIO's
> behavior is that "writing happens by default per window and pane" (per
> javadoc), and this is what I am observing too. I get a bunch of files,
> instead of one for the Iterable.
>
> Is there any way I can make FileIO write that Iterable to a single file?
>
> Thank you
> - Pradyumna
>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Reuven Lax
Correct, but if you then have a subsequent DoFn write a new file, you might
be surprised if a zombie execution of the first DoFn reexecutes and deletes
that file!

On Wed, Jan 27, 2021 at 5:23 PM Robert Bradshaw  wrote:

> Fortunately making deleting files idempotent is much easier than writing
> them :). But one needs to handle the case of concurrent execution as well
> as sequential re-execution due to possible zombie workers.
>
> On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax  wrote:
>
>> Keep in mind thatt DoFns might be reexecuted (even if you think they have
>> completed successfully). This makes DoFns with side effects such as
>> deleting files tricky to write correctly.
>>
>> On Wed, Jan 27, 2021 at 4:36 PM Tao Li  wrote:
>>
>>> Thanks @Chamikara Jayalath  I think it’s a good
>>> idea to define a DoFn for this deletion operation, or maybe a composite
>>> PTransform that does deletion first followed by ParquetIO.Write.
>>>
>>>
>>>
>>> *From: *Chamikara Jayalath 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Wednesday, January 27, 2021 at 3:45 PM
>>> *To: *user 
>>> *Cc: *Alexey Romanenko 
>>> *Subject: *Re: Overwrite support from ParquetIO
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:
>>>
>>> @Alexey Romanenko  thanks for your response.
>>> Regarding your questions:
>>>
>>>
>>>
>>>1. Yes I can purge this directory (e.g. using s3 client from aws
>>>sdk) before using ParquetIO to save files. The caveat is that this 
>>> deletion
>>>operation is not part of the beam pipeline, so it will kick off before 
>>> the
>>>pipeline starts. More ideally, this purge operation could be baked into 
>>> the
>>>write operation with ParquetIO so we will have the deletion happen right
>>>before the files writes.
>>>2. Regarding the naming strategy, yes the old files will be
>>>overwritten by the new files if they have the same file names. However 
>>> this
>>>does not always guarantee that all the old files in this directory are
>>>wiped out (which is actually my requirement). For example we may change 
>>> the
>>>shard count (through withNumShards() method) in different pipeline runs 
>>> and
>>>there could be old files from previous run that won’t get overwritten in
>>>the current run.
>>>
>>>
>>>
>>> In general, Beam file-based sinks are intended  for writing new files.
>>> So I don't think existing file-based sinks (including Parquet) will work
>>> out of the box for replacing existing files or for appending to such files.
>>>
>>> But you should be able to delete existing files separately, for example.
>>>
>>> (1) As a function that is performed before executing the pipeline.
>>>
>>> (2) As a function that is performed from a ParDo step that is executed
>>> before the ParquetIO.Write step. Also you will have to make sure that the
>>> runner does not fuse the ParDo step and the Write step. Usually, this can
>>> be done by performing it in a side-input step (to a ParDo that precedes
>>> sink) or by adding a GBK/Reshuffle between the two steps.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Cham
>>>
>>>
>>>
>>>
>>>
>>>
>>>1.
>>>
>>>
>>>
>>> Please let me know if this makes sense to you. Thanks!
>>>
>>>
>>>
>>>
>>>
>>> *From: *Alexey Romanenko 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Wednesday, January 27, 2021 at 9:10 AM
>>> *To: *"user@beam.apache.org" 
>>> *Subject: *Re: Overwrite support from ParquetIO
>>>
>>>
>>>
>>> What do you mean by “wipe out all existing parquet files before a write
>>> operation”? Are these all files that already exist in the same output
>>> directory? Can you purge this directory before or just use a new output
>>> directory for every pipeline run?
>>>
>>>
>>>
>>> To write Parquet files you need to use ParquetIO.sink()
>>> with FileIO.write() and I don’t think it will clean up the output directory
>>> before write. Though, if there are the name collisions between existing and
>>> new output files (it depends on used naming strategy) then I think the old
>>> files will be overwritten by new ones.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>>>
>>>
>>>
>>> Hi Beam community,
>>>
>>>
>>>
>>> Does ParquetIO support an overwrite behavior when saving files? More
>>> specifically, I would like to wipe out all existing parquet files before a
>>> write operation. Is there a ParquetIO API to support that? Thanks!
>>>
>>>
>>>
>>>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Reuven Lax
Keep in mind thatt DoFns might be reexecuted (even if you think they have
completed successfully). This makes DoFns with side effects such as
deleting files tricky to write correctly.

On Wed, Jan 27, 2021 at 4:36 PM Tao Li  wrote:

> Thanks @Chamikara Jayalath  I think it’s a good
> idea to define a DoFn for this deletion operation, or maybe a composite
> PTransform that does deletion first followed by ParquetIO.Write.
>
>
>
> *From: *Chamikara Jayalath 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 27, 2021 at 3:45 PM
> *To: *user 
> *Cc: *Alexey Romanenko 
> *Subject: *Re: Overwrite support from ParquetIO
>
>
>
>
>
>
>
> On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:
>
> @Alexey Romanenko  thanks for your response.
> Regarding your questions:
>
>
>
>1. Yes I can purge this directory (e.g. using s3 client from aws sdk)
>before using ParquetIO to save files. The caveat is that this deletion
>operation is not part of the beam pipeline, so it will kick off before the
>pipeline starts. More ideally, this purge operation could be baked into the
>write operation with ParquetIO so we will have the deletion happen right
>before the files writes.
>2. Regarding the naming strategy, yes the old files will be
>overwritten by the new files if they have the same file names. However this
>does not always guarantee that all the old files in this directory are
>wiped out (which is actually my requirement). For example we may change the
>shard count (through withNumShards() method) in different pipeline runs and
>there could be old files from previous run that won’t get overwritten in
>the current run.
>
>
>
> In general, Beam file-based sinks are intended  for writing new files. So
> I don't think existing file-based sinks (including Parquet) will work out
> of the box for replacing existing files or for appending to such files.
>
> But you should be able to delete existing files separately, for example.
>
> (1) As a function that is performed before executing the pipeline.
>
> (2) As a function that is performed from a ParDo step that is executed
> before the ParquetIO.Write step. Also you will have to make sure that the
> runner does not fuse the ParDo step and the Write step. Usually, this can
> be done by performing it in a side-input step (to a ParDo that precedes
> sink) or by adding a GBK/Reshuffle between the two steps.
>
>
>
> Thanks,
>
> Cham
>
>
>
>
>
>
>1.
>
>
>
> Please let me know if this makes sense to you. Thanks!
>
>
>
>
>
> *From: *Alexey Romanenko 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 27, 2021 at 9:10 AM
> *To: *"user@beam.apache.org" 
> *Subject: *Re: Overwrite support from ParquetIO
>
>
>
> What do you mean by “wipe out all existing parquet files before a write
> operation”? Are these all files that already exist in the same output
> directory? Can you purge this directory before or just use a new output
> directory for every pipeline run?
>
>
>
> To write Parquet files you need to use ParquetIO.sink()
> with FileIO.write() and I don’t think it will clean up the output directory
> before write. Though, if there are the name collisions between existing and
> new output files (it depends on used naming strategy) then I think the old
> files will be overwritten by new ones.
>
>
>
>
>
>
>
> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>
>
>
> Hi Beam community,
>
>
>
> Does ParquetIO support an overwrite behavior when saving files? More
> specifically, I would like to wipe out all existing parquet files before a
> write operation. Is there a ParquetIO API to support that? Thanks!
>
>
>
>


Re: Is there an array explode function/transform?

2021-01-14 Thread Reuven Lax
And the result is essentially a cross product of all the different array
elements?

On Thu, Jan 14, 2021 at 11:25 AM Robert Bradshaw 
wrote:

> I think it makes sense to allow specifying more than one, if desired. This
> is equivalent to just stacking multiple Unnests. (Possibly one could even
> have a special syntax like "*" for all array fields.)
>
> On Thu, Jan 14, 2021 at 10:05 AM Reuven Lax  wrote:
>
>> Should Unnest be allowed to specify multiple array fields, or just one?
>>
>> On Wed, Jan 13, 2021 at 11:59 PM Manninger, Matyas <
>> matyas.mannin...@veolia.com> wrote:
>>
>>> I would also not unnest arrays nested in arrays just the top-level array
>>> of the specified fields.
>>>
>>> On Wed, 13 Jan 2021 at 20:58, Reuven Lax  wrote:
>>>
>>>> Nested fields are not part of standard SQL AFAIK. Beam goes further and
>>>> supports array of array, etc.
>>>>
>>>> On Wed, Jan 13, 2021 at 11:42 AM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> Just the fields specified, IMO. When in doubt, copy SQL. (and I mean
>>>>> SQL generally, not just Beam SQL)
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:
>>>>>
>>>>>> Definitely could be a top-level transform. Should it automatically
>>>>>> unnest all arrays, or just the fields specified?
>>>>>>
>>>>>> We do have to define the semantics for nested arrays as well.
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
>>>>>> wrote:
>>>>>>
>>>>>>> Ah, thanks for the clarification. UNNEST does sound like what you
>>>>>>> want here, and would likely make sense as a top-level relational 
>>>>>>> transform
>>>>>>> as well as being supported by SQL.
>>>>>>>
>>>>>>> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>>>>>>>
>>>>>>>> @Kyle Weaver  sure thing! So the input/output
>>>>>>>> definition for the Flatten.Iterables
>>>>>>>> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
>>>>>>>> is:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Input: PCollection
>>>>>>>>
>>>>>>>> Output: PCollection
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> The input/output for a explode transform would look like this:
>>>>>>>>
>>>>>>>> Input:  PCollection The row schema has a field which is an
>>>>>>>> array of T
>>>>>>>>
>>>>>>>> Output: PCollection The array type field from input schema is
>>>>>>>> replaced with a new field of type T. The elements from the array type 
>>>>>>>> field
>>>>>>>> are flattened into multiple rows in the new table (other fields of 
>>>>>>>> input
>>>>>>>> table are just duplicated.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Hope this clarification helps!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Kyle Weaver 
>>>>>>>> *Reply-To: *"user@beam.apache.org" 
>>>>>>>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>>>>>>>> *To: *"user@beam.apache.org" 
>>>>>>>> *Cc: *Reuven Lax 
>>>>>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> @Reuven Lax  yes I am aware of that transform,
>>>>>>>> but that’s different from the explode operation I was referring to:
>>>>>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd

Re: Is there an array explode function/transform?

2021-01-14 Thread Reuven Lax
Should Unnest be allowed to specify multiple array fields, or just one?

On Wed, Jan 13, 2021 at 11:59 PM Manninger, Matyas <
matyas.mannin...@veolia.com> wrote:

> I would also not unnest arrays nested in arrays just the top-level array
> of the specified fields.
>
> On Wed, 13 Jan 2021 at 20:58, Reuven Lax  wrote:
>
>> Nested fields are not part of standard SQL AFAIK. Beam goes further and
>> supports array of array, etc.
>>
>> On Wed, Jan 13, 2021 at 11:42 AM Kenneth Knowles  wrote:
>>
>>> Just the fields specified, IMO. When in doubt, copy SQL. (and I mean SQL
>>> generally, not just Beam SQL)
>>>
>>> Kenn
>>>
>>> On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:
>>>
>>>> Definitely could be a top-level transform. Should it automatically
>>>> unnest all arrays, or just the fields specified?
>>>>
>>>> We do have to define the semantics for nested arrays as well.
>>>>
>>>> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Ah, thanks for the clarification. UNNEST does sound like what you want
>>>>> here, and would likely make sense as a top-level relational transform as
>>>>> well as being supported by SQL.
>>>>>
>>>>> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>>>>>
>>>>>> @Kyle Weaver  sure thing! So the input/output
>>>>>> definition for the Flatten.Iterables
>>>>>> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
>>>>>> is:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Input: PCollection
>>>>>>
>>>>>> Output: PCollection
>>>>>>
>>>>>>
>>>>>>
>>>>>> The input/output for a explode transform would look like this:
>>>>>>
>>>>>> Input:  PCollection The row schema has a field which is an array
>>>>>> of T
>>>>>>
>>>>>> Output: PCollection The array type field from input schema is
>>>>>> replaced with a new field of type T. The elements from the array type 
>>>>>> field
>>>>>> are flattened into multiple rows in the new table (other fields of input
>>>>>> table are just duplicated.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hope this clarification helps!
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Kyle Weaver 
>>>>>> *Reply-To: *"user@beam.apache.org" 
>>>>>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>>>>>> *To: *"user@beam.apache.org" 
>>>>>> *Cc: *Reuven Lax 
>>>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>>>
>>>>>>
>>>>>>
>>>>>> @Reuven Lax  yes I am aware of that transform, but
>>>>>> that’s different from the explode operation I was referring to:
>>>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191408293%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=IjXWhmHTGsbpgbxa1gJ5LcOFI%2BoiGIDYBwXPnukQfxk%3D=0>
>>>>>>
>>>>>>
>>>>>>
>>>>>> How is it different? It'd help if you could provide the signature
>>>>>> (input and output PCollection types) of the transform you have in mind.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>>>>>>
>>>>>> @Reuven Lax  yes I am aware of that transform, but
>>>>>> that’s different from the explode operation I was referring to:
>>>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191418249%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=XuUUmNB3fgBasjDj0Dq1Z2g6%2Bc5fbvluf%2BnAp2m8cuE%3D=0>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Reuven Lax 
>>>>>> *Reply-To: *"user@beam.apache.org" 
>>>>>> *Date: *Tuesday, January 12, 2021 at 2:04 PM
>>>>>> *To: *user 
>>>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Have you tried Flatten.iterables
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>>>>>>
>>>>>> Hi community,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is there a beam function to explode an array (similarly to spark
>>>>>> sql’s explode())? I did some research but did not find anything.
>>>>>>
>>>>>>
>>>>>>
>>>>>> BTW I think we can potentially use FlatMap to implement the explode
>>>>>> functionality, but a Beam provided function would be very handy.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks a lot!
>>>>>>
>>>>>>


Re: Is there an array explode function/transform?

2021-01-13 Thread Reuven Lax
Nested fields are not part of standard SQL AFAIK. Beam goes further and
supports array of array, etc.

On Wed, Jan 13, 2021 at 11:42 AM Kenneth Knowles  wrote:

> Just the fields specified, IMO. When in doubt, copy SQL. (and I mean SQL
> generally, not just Beam SQL)
>
> Kenn
>
> On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:
>
>> Definitely could be a top-level transform. Should it automatically unnest
>> all arrays, or just the fields specified?
>>
>> We do have to define the semantics for nested arrays as well.
>>
>> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
>> wrote:
>>
>>> Ah, thanks for the clarification. UNNEST does sound like what you want
>>> here, and would likely make sense as a top-level relational transform as
>>> well as being supported by SQL.
>>>
>>> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>>>
>>>> @Kyle Weaver  sure thing! So the input/output
>>>> definition for the Flatten.Iterables
>>>> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
>>>> is:
>>>>
>>>>
>>>>
>>>> Input: PCollection
>>>>
>>>> Output: PCollection
>>>>
>>>>
>>>>
>>>> The input/output for a explode transform would look like this:
>>>>
>>>> Input:  PCollection The row schema has a field which is an array
>>>> of T
>>>>
>>>> Output: PCollection The array type field from input schema is
>>>> replaced with a new field of type T. The elements from the array type field
>>>> are flattened into multiple rows in the new table (other fields of input
>>>> table are just duplicated.
>>>>
>>>>
>>>>
>>>> Hope this clarification helps!
>>>>
>>>>
>>>>
>>>> *From: *Kyle Weaver 
>>>> *Reply-To: *"user@beam.apache.org" 
>>>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>>>> *To: *"user@beam.apache.org" 
>>>> *Cc: *Reuven Lax 
>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>
>>>>
>>>>
>>>> @Reuven Lax  yes I am aware of that transform, but
>>>> that’s different from the explode operation I was referring to:
>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191408293%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=IjXWhmHTGsbpgbxa1gJ5LcOFI%2BoiGIDYBwXPnukQfxk%3D=0>
>>>>
>>>>
>>>>
>>>> How is it different? It'd help if you could provide the signature
>>>> (input and output PCollection types) of the transform you have in mind.
>>>>
>>>>
>>>>
>>>> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>>>>
>>>> @Reuven Lax  yes I am aware of that transform, but
>>>> that’s different from the explode operation I was referring to:
>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191418249%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=XuUUmNB3fgBasjDj0Dq1Z2g6%2Bc5fbvluf%2BnAp2m8cuE%3D=0>
>>>>
>>>>
>>>>
>>>> *From: *Reuven Lax 
>>>> *Reply-To: *"user@beam.apache.org" 
>>>> *Date: *Tuesday, January 12, 2021 at 2:04 PM
>>>> *To: *user 
>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>
>>>>
>>>>
>>>> Have you tried Flatten.iterables
>>>>
>>>>
>>>>
>>>> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>>>>
>>>> Hi community,
>>>>
>>>>
>>>>
>>>> Is there a beam function to explode an array (similarly to spark sql’s
>>>> explode())? I did some research but did not find anything.
>>>>
>>>>
>>>>
>>>> BTW I think we can potentially use FlatMap to implement the explode
>>>> functionality, but a Beam provided function would be very handy.
>>>>
>>>>
>>>>
>>>> Thanks a lot!
>>>>
>>>>


Re: Is there an array explode function/transform?

2021-01-13 Thread Reuven Lax
Definitely could be a top-level transform. Should it automatically unnest
all arrays, or just the fields specified?

We do have to define the semantics for nested arrays as well.

On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
wrote:

> Ah, thanks for the clarification. UNNEST does sound like what you want
> here, and would likely make sense as a top-level relational transform as
> well as being supported by SQL.
>
> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>
>> @Kyle Weaver  sure thing! So the input/output
>> definition for the Flatten.Iterables
>> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
>> is:
>>
>>
>>
>> Input: PCollection
>>
>> Output: PCollection
>>
>>
>>
>> The input/output for a explode transform would look like this:
>>
>> Input:  PCollection The row schema has a field which is an array of
>> T
>>
>> Output: PCollection The array type field from input schema is
>> replaced with a new field of type T. The elements from the array type field
>> are flattened into multiple rows in the new table (other fields of input
>> table are just duplicated.
>>
>>
>>
>> Hope this clarification helps!
>>
>>
>>
>> *From: *Kyle Weaver 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>> *To: *"user@beam.apache.org" 
>> *Cc: *Reuven Lax 
>> *Subject: *Re: Is there an array explode function/transform?
>>
>>
>>
>> @Reuven Lax  yes I am aware of that transform, but
>> that’s different from the explode operation I was referring to:
>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191408293%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=IjXWhmHTGsbpgbxa1gJ5LcOFI%2BoiGIDYBwXPnukQfxk%3D=0>
>>
>>
>>
>> How is it different? It'd help if you could provide the signature (input
>> and output PCollection types) of the transform you have in mind.
>>
>>
>>
>> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>>
>> @Reuven Lax  yes I am aware of that transform, but
>> that’s different from the explode operation I was referring to:
>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191418249%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=XuUUmNB3fgBasjDj0Dq1Z2g6%2Bc5fbvluf%2BnAp2m8cuE%3D=0>
>>
>>
>>
>> *From: *Reuven Lax 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Tuesday, January 12, 2021 at 2:04 PM
>> *To: *user 
>> *Subject: *Re: Is there an array explode function/transform?
>>
>>
>>
>> Have you tried Flatten.iterables
>>
>>
>>
>> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>>
>> Hi community,
>>
>>
>>
>> Is there a beam function to explode an array (similarly to spark sql’s
>> explode())? I did some research but did not find anything.
>>
>>
>>
>> BTW I think we can potentially use FlatMap to implement the explode
>> functionality, but a Beam provided function would be very handy.
>>
>>
>>
>> Thanks a lot!
>>
>>


Re: Is there an array explode function/transform?

2021-01-12 Thread Reuven Lax
Have you tried Flatten.iterables

On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:

> Hi community,
>
>
>
> Is there a beam function to explode an array (similarly to spark sql’s
> explode())? I did some research but did not find anything.
>
>
>
> BTW I think we can potentially use FlatMap to implement the explode
> functionality, but a Beam provided function would be very handy.
>
>
>
> Thanks a lot!
>


Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Reuven Lax
Talat, are you interested in writing a proposal and sending it to
d...@beam.apache.org? We could help advise on the options.

Reuven

On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud  wrote:

> We could support EXPECT statements in proposal 2 as long as we restricted
> it to known fields.
>
> We are getting into implementation details now. Making unknown fields just
> a normal column introduces a number of problems. ZetaSQL doesn't support
> Map type. All our IOs would need to explicitly deal with that special
> column. There would be a lack of consistency between the various types
> (Avro, Proto, Json) which should all support this.
>
> We might also want something even more invasive: everything is an unknown
> field unless it is referenced in the SQL query. All of these options are
> possible. I guess we need someone who has time to work on it to write a
> proposal.
>
> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax  wrote:
>
>> I'm not sure that we could support EXCEPT statements, as that would
>> require introspecting the unknown fields (what if the EXCEPT statement
>> matches a field that later is added as an unknown field?). IMO this sort of
>> behavior only makes sense on true pass-through queries. Anything that
>> modifies the input record would be tricky to support.
>>
>> Nested rows would work for proposal 2. You would need to make sure that
>> the unknown-fields map is recursively added to all nested rows, and you
>> would do this when you infer a schema from the avro schema.
>>
>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud 
>> wrote:
>>
>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>> statements, which returns all columns except specific ones. Adding an
>>> unknown field does seem like a reasonable way to handle this. It probably
>>> needs to be something that is native to the Row type, so columns added to
>>> nested rows also work.
>>>
>>> Andrew
>>>
>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax  wrote:
>>>
>>>> There's a difference between a fully dynamic schema and simply being
>>>> able to forward "unknown" fields to the output.
>>>>
>>>> A fully-dynamic schema is not really necessary unless we also had
>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>
>>>> However, if you have a SELECT * FROM WHERE  statement that does no
>>>> aggregation, there's fundamentally no reason we couldn't forward the
>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>> the input PCollection, which would necessarily forward the new fields. In
>>>> practice I believe that we convert the input messages to Beam Row objects
>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>> output those messages. I believe this is where we "lose" the unknown
>>>> messages,but this is an implementation artifact - in theory we could output
>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>> schema, since you can't really do anything with these extra fields except
>>>> forward them to your output.
>>>>
>>>> I see two possible ways to address this.
>>>>
>>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>>> This might be very expensive though - we risk having to keep two copies of
>>>> every message around, one in the original Avro format and one in Row 
>>>> format.
>>>>
>>>> 2. The other way would be to do what protocol buffers do. We could add
>>>> one extra field to the inferred Beam schema to store new, unknown fields
>>>> (probably this would be a map-valued field). This extra field would simply
>>>> store the raw bytes of these unknown fields, and then when converting back
>>>> to Avro they would be added to the output message. This might also add some
>>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette 
>>>> wrote:
>>>>
>>>>> Reuven, could you clarify what you have in mind? I know multiple times
>>>>> we've discussed the possibility of adding update compatibility support to
>

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Reuven Lax
I'm not sure that we could support EXCEPT statements, as that would require
introspecting the unknown fields (what if the EXCEPT statement matches a
field that later is added as an unknown field?). IMO this sort of behavior
only makes sense on true pass-through queries. Anything that modifies the
input record would be tricky to support.

Nested rows would work for proposal 2. You would need to make sure that the
unknown-fields map is recursively added to all nested rows, and you would
do this when you infer a schema from the avro schema.

On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud  wrote:

> Proposal 1 would also interact poorly with SELECT * EXCEPT ... statements,
> which returns all columns except specific ones. Adding an unknown field
> does seem like a reasonable way to handle this. It probably needs to be
> something that is native to the Row type, so columns added to nested rows
> also work.
>
> Andrew
>
> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax  wrote:
>
>> There's a difference between a fully dynamic schema and simply being able
>> to forward "unknown" fields to the output.
>>
>> A fully-dynamic schema is not really necessary unless we also had dynamic
>> SQL statements. Since the existing SQL statements do not reference the new
>> fields by name, there's no reason to add them to the main schema.
>>
>> However, if you have a SELECT * FROM WHERE  statement that does no
>> aggregation, there's fundamentally no reason we couldn't forward the
>> messages exactly. In theory we could forward the exact bytes that are in
>> the input PCollection, which would necessarily forward the new fields. In
>> practice I believe that we convert the input messages to Beam Row objects
>> in order to evaluate the WHERE clause, and then convert back to Avro to
>> output those messages. I believe this is where we "lose" the unknown
>> messages,but this is an implementation artifact - in theory we could output
>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>> schema, since you can't really do anything with these extra fields except
>> forward them to your output.
>>
>> I see two possible ways to address this.
>>
>> 1. As I mentioned above, in the case of a SELECT * we could output the
>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>> This might be very expensive though - we risk having to keep two copies of
>> every message around, one in the original Avro format and one in Row format.
>>
>> 2. The other way would be to do what protocol buffers do. We could add
>> one extra field to the inferred Beam schema to store new, unknown fields
>> (probably this would be a map-valued field). This extra field would simply
>> store the raw bytes of these unknown fields, and then when converting back
>> to Avro they would be added to the output message. This might also add some
>> overhead to the pipeline, so might be best to make this behavior opt in.
>>
>> Reuven
>>
>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette  wrote:
>>
>>> Reuven, could you clarify what you have in mind? I know multiple times
>>> we've discussed the possibility of adding update compatibility support to
>>> SchemaCoder, including support for certain schema changes (field
>>> additions/deletions) - I think the most recent discussion was here [1].
>>>
>>> But it sounds like Talat is asking for something a little beyond that,
>>> effectively a dynamic schema. Is that something you think we can support?
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>
>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax  wrote:
>>>
>>>> Thanks. It might be theoretically possible to do this (at least for the
>>>> case where existing fields do not change). Whether anyone currently has
>>>> available time to do this is a different question, but it's something that
>>>> can be looked into.
>>>>
>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Adding new fields is more common than modifying existing fields. But
>>>>> type change is also possible for existing fields, such as regular 
>>>>> mandatory
>>>>> field(string,integer) to union(nullable field). No field deletion.
>>>>>
>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:
>>>>>
>>>>>> And when you say schema

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Reuven Lax
There's a difference between a fully dynamic schema and simply being able
to forward "unknown" fields to the output.

A fully-dynamic schema is not really necessary unless we also had dynamic
SQL statements. Since the existing SQL statements do not reference the new
fields by name, there's no reason to add them to the main schema.

However, if you have a SELECT * FROM WHERE  statement that does no
aggregation, there's fundamentally no reason we couldn't forward the
messages exactly. In theory we could forward the exact bytes that are in
the input PCollection, which would necessarily forward the new fields. In
practice I believe that we convert the input messages to Beam Row objects
in order to evaluate the WHERE clause, and then convert back to Avro to
output those messages. I believe this is where we "lose" the unknown
messages,but this is an implementation artifact - in theory we could output
the original bytes whenever we see a SELECT *. This is not truly a dynamic
schema, since you can't really do anything with these extra fields except
forward them to your output.

I see two possible ways to address this.

1. As I mentioned above, in the case of a SELECT * we could output the
original bytes, and only use the Beam Row for evaluating the WHERE clause.
This might be very expensive though - we risk having to keep two copies of
every message around, one in the original Avro format and one in Row format.

2. The other way would be to do what protocol buffers do. We could add one
extra field to the inferred Beam schema to store new, unknown fields
(probably this would be a map-valued field). This extra field would simply
store the raw bytes of these unknown fields, and then when converting back
to Avro they would be added to the output message. This might also add some
overhead to the pipeline, so might be best to make this behavior opt in.

Reuven

On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette  wrote:

> Reuven, could you clarify what you have in mind? I know multiple times
> we've discussed the possibility of adding update compatibility support to
> SchemaCoder, including support for certain schema changes (field
> additions/deletions) - I think the most recent discussion was here [1].
>
> But it sounds like Talat is asking for something a little beyond that,
> effectively a dynamic schema. Is that something you think we can support?
>
> [1]
> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>
> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax  wrote:
>
>> Thanks. It might be theoretically possible to do this (at least for the
>> case where existing fields do not change). Whether anyone currently has
>> available time to do this is a different question, but it's something that
>> can be looked into.
>>
>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer 
>> wrote:
>>
>>> Adding new fields is more common than modifying existing fields. But
>>> type change is also possible for existing fields, such as regular mandatory
>>> field(string,integer) to union(nullable field). No field deletion.
>>>
>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:
>>>
>>>> And when you say schema changes, are these new fields being added to
>>>> the schema? Or are you making changes to the existing fields?
>>>>
>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>> For sure let me explain a little bit about my pipeline.
>>>>> My Pipeline is actually simple
>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>>> Row to Avro (DoFn)-> Write DB/GCS/GRPC etc
>>>>>
>>>>> On our jobs We have three type sqls
>>>>> - SELECT * FROM PCOLLECTION
>>>>> - SELECT * FROM PCOLLECTION 
>>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>>> PCOLLECTION
>>>>>
>>>>> We know writerSchema for each message. While deserializing avro binary
>>>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>>>> step. It always produces a reader schema's generic record and we convert
>>>>> that generic record to Row.
>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>
>>>>> In the current scenario When we have schema changes first we restart
>>>>> all 15k jobs with the latest updated schema then whenever we are done we

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Reuven Lax
Thanks. It might be theoretically possible to do this (at least for the
case where existing fields do not change). Whether anyone currently has
available time to do this is a different question, but it's something that
can be looked into.

On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer 
wrote:

> Adding new fields is more common than modifying existing fields. But type
> change is also possible for existing fields, such as regular mandatory
> field(string,integer) to union(nullable field). No field deletion.
>
> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:
>
>> And when you say schema changes, are these new fields being added to the
>> schema? Or are you making changes to the existing fields?
>>
>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer 
>> wrote:
>>
>>> Hi,
>>> For sure let me explain a little bit about my pipeline.
>>> My Pipeline is actually simple
>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
>>> to Avro (DoFn)-> Write DB/GCS/GRPC etc
>>>
>>> On our jobs We have three type sqls
>>> - SELECT * FROM PCOLLECTION
>>> - SELECT * FROM PCOLLECTION 
>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>> PCOLLECTION
>>>
>>> We know writerSchema for each message. While deserializing avro binary
>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>> step. It always produces a reader schema's generic record and we convert
>>> that generic record to Row.
>>> While submitting DF job we use latest schema to generate beamSchema.
>>>
>>> In the current scenario When we have schema changes first we restart all
>>> 15k jobs with the latest updated schema then whenever we are done we turn
>>> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
>>> read different versions of the schema and we always produce the latest
>>> schema's record. Without breaking our pipeline we are able to handle
>>> multiple versions of data in the same streaming pipeline. If we can
>>> generate SQL's java code when we get notified wirth latest schema we will
>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>> code. That's why I am looking for some solution. We dont need multiple
>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>> schema on the fly.
>>>
>>> I hope I can explain it :)
>>>
>>> Thanks
>>>
>>> [1]
>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc=>
>>>
>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:
>>>
>>>> Can you explain the use case some more? Are you wanting to change your
>>>> SQL statement as well when the schema changes? If not, what are those new
>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>> clearly didn't reference those fields in a SELECT statement since they
>>>> didn't exist, so what are you missing by not having them unless you are
>>>> also changing the SQL statement?
>>>>
>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>> those fields are included?
>>>>
>>>> Reuven
>>>>
>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> I assume SQL query is not going to change. Changing things is the Row
>>>>> schema by adding new columns or rename columns. if we keep a version
>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>> it is not still doable ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Mon, Dec 7, 2020

Re: About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Reuven Lax
And when you say schema changes, are these new fields being added to the
schema? Or are you making changes to the existing fields?

On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer 
wrote:

> Hi,
> For sure let me explain a little bit about my pipeline.
> My Pipeline is actually simple
> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
> to Avro (DoFn)-> Write DB/GCS/GRPC etc
>
> On our jobs We have three type sqls
> - SELECT * FROM PCOLLECTION
> - SELECT * FROM PCOLLECTION 
> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
> PCOLLECTION
>
> We know writerSchema for each message. While deserializing avro binary we
> use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
> It always produces a reader schema's generic record and we convert that
> generic record to Row.
> While submitting DF job we use latest schema to generate beamSchema.
>
> In the current scenario When we have schema changes first we restart all
> 15k jobs with the latest updated schema then whenever we are done we turn
> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
> read different versions of the schema and we always produce the latest
> schema's record. Without breaking our pipeline we are able to handle
> multiple versions of data in the same streaming pipeline. If we can
> generate SQL's java code when we get notified wirth latest schema we will
> handle all schema changes. The only remaining obstacle is Beam's SQL Java
> code. That's why I am looking for some solution. We dont need multiple
> versions of SQL. We only need to regenerate SQL schema with the latest
> schema on the fly.
>
> I hope I can explain it :)
>
> Thanks
>
> [1]
> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>
> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:
>
>> Can you explain the use case some more? Are you wanting to change your
>> SQL statement as well when the schema changes? If not, what are those new
>> fields doing in the pipeline? What I mean is that your old SQL statement
>> clearly didn't reference those fields in a SELECT statement since they
>> didn't exist, so what are you missing by not having them unless you are
>> also changing the SQL statement?
>>
>> Is this a case where you have a SELECT *, and just want to make sure
>> those fields are included?
>>
>> Reuven
>>
>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer 
>> wrote:
>>
>>> Hi Andrew,
>>>
>>> I assume SQL query is not going to change. Changing things is the Row
>>> schema by adding new columns or rename columns. if we keep a version
>>> information on somewhere for example a KV pair. Key is schema information,
>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>> it is not still doable ?
>>>
>>> Thanks
>>>
>>>
>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud 
>>> wrote:
>>>
>>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>>> recommended practice is to run both the old and new pipeline for some time,
>>>> then pick a window boundary to transition the output from the old pipeline
>>>> to the new one.
>>>>
>>>> Beam doesn't handle changing the format of data sent between
>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>> data between steps of the pipeline. The builtin coders (including the
>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>> schema evolution. They are optimized for performance at all costs.
>>>>
>>>> If you worked around this, the Beam model doesn't support changing the
>>>> structure of the pipeline graph. This would significantly limit the changes
>>>> you can make. It would also require some changes to SQL to try to produce
>>>> the same plan for an updated SQL query.
>>>>
>>>> Andrew
>>>>
>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>> format. We generate our rows based on our Avro schema. Over time the 
>>>>> schema
>>>>> is changing. I believe Beam SQL generates Java code based on what we 
>>>>> define
>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>> generate SQL java code on the fly ?
>>>>>
>>>>> Thanks
>>>>>
>>>>


Re: About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Reuven Lax
Can you explain the use case some more? Are you wanting to change your SQL
statement as well when the schema changes? If not, what are those new
fields doing in the pipeline? What I mean is that your old SQL statement
clearly didn't reference those fields in a SELECT statement since they
didn't exist, so what are you missing by not having them unless you are
also changing the SQL statement?

Is this a case where you have a SELECT *, and just want to make sure those
fields are included?

Reuven

On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer 
wrote:

> Hi Andrew,
>
> I assume SQL query is not going to change. Changing things is the Row
> schema by adding new columns or rename columns. if we keep a version
> information on somewhere for example a KV pair. Key is schema information,
> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
> pipelines. When we have a schema change we restart a 15k DF job which is
> pain. I am looking for a possible way to avoid job restart. Dont you think
> it is not still doable ?
>
> Thanks
>
>
> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud  wrote:
>
>> Unfortunately we don't have a way to generate the SQL Java code on the
>> fly, even if we did, that wouldn't solve your problem. I believe our
>> recommended practice is to run both the old and new pipeline for some time,
>> then pick a window boundary to transition the output from the old pipeline
>> to the new one.
>>
>> Beam doesn't handle changing the format of data sent between intermediate
>> steps in a running pipeline. Beam uses "coders" to serialize data between
>> steps of the pipeline. The builtin coders (including the Schema Row Coder
>> used by SQL) have a fixed data format and don't handle schema evolution.
>> They are optimized for performance at all costs.
>>
>> If you worked around this, the Beam model doesn't support changing the
>> structure of the pipeline graph. This would significantly limit the changes
>> you can make. It would also require some changes to SQL to try to produce
>> the same plan for an updated SQL query.
>>
>> Andrew
>>
>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer 
>> wrote:
>>
>>> Hi,
>>>
>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>> format. We generate our rows based on our Avro schema. Over time the schema
>>> is changing. I believe Beam SQL generates Java code based on what we define
>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>> generate SQL java code on the fly ?
>>>
>>> Thanks
>>>
>>


Re: UnalignedFixedWindows - how to join to streams with unaligned fixed windows / full source code from the Streaming Systems book?

2020-10-02 Thread Reuven Lax
Have you considered using Session windows? The window would start at the
timestamp of the article, and the Session gap duration would be the
(event-time) timeout after which you stop waiting for assets to join that
article.

On Fri, Oct 2, 2020 at 3:05 AM Kaymak, Tobias 
wrote:

> Hello,
>
> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
> there is an example 4-6 on page 111 about custom windowing that deals with
> UnalignedFixedWindows:
>
> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>
> Unfortunately that example is abbreviated and the full source code is not
> published in this repo:
> https://github.com/takidau/streamingbook
>
> I am joining two Kafka Streams and I am currently windowing them by fixed
> time intervals. However the elements in stream one ("articles") are
> published first, then the assets for those articles are being published in
> the "assets" topic. Articles event timestamps are therefore slightly before
> those of assets.
>
> Now when doing a CoGroupByKey this can lead to a situation where an
> article is not being processed together with its assets, as
>
> - the article has a timestamp of 2020-10-02T00:30:29.997Z
> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>
> This is a must in my pipeline as I am relying on them to be processed
> together - otherwise I am publishing an article without it's assets.
>
> My idea was therefore to apply UnalignedFixedWindows instead of fixed
> ones to the streams to circumvent this. What I am currently missing is the
> mergeWindows() implementation or the full source code to understand it. I
> am currently facing a java.lang.IllegalStateException
>
> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
> time 2020-10-02T09:32:03.365Z for window
> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>
> Which gives me the impression that I am doing something wrong or have not
> fully understood the custom windowing topic.
>
> Am I on the wrong track here?
>
>
>
>


Re: Setting AWS endponts per transform

2020-09-25 Thread Reuven Lax
It would also be ideal if someone extended the SqsIO transform to allow
specifying endpoints via a builder method.

On Fri, Sep 25, 2020 at 9:50 AM Luke Cwik  wrote:

> You can create a PipelineOptions interface with a unique endpoint
> identifier for each service you want to use like:
> public interface AwsEndpointOptions extends PipelineOptions {
>   @Default.InstanceFactory(FallbackToAwsOptions.class);
>   String getSnsEndpoint();
>   void setSnsEndpoint();
>
>   @Default.InstanceFactory(FallbackToAwsOptions.class);
>   String getS3Endpoint();
>   void setS3Endpoint();
>   ...
>
>   class FallbackToAwsOptions implements DefaultValueFactory {
> @Override
> public String create(PipelineOptions options) {
>   return options.as(AwsOptions.class).getAwsServiceEndpoint();
> }
>   }
> }
>
> It looks like AWS IOs support passing in a provider[1] and/or configuring
> the endpoint during pipeline construction. Then during pipeline
> creation/execution you can configure each AWS IO instance with the specific
> endpoint from this new PipelineOptions interface.
>
> 1:
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java#L273
>
> On Thu, Sep 24, 2020 at 4:24 PM  wrote:
>
>> We're currently working on getting our Beam app working with localstack
>> (and potentially other AWS regions).  We're using SqsIO and S3 as part of
>> our pipeline (with other AWS components likely to come into the mix).
>> While I could cast the PipelineOptions to AwsOptions and then call
>> AwsOptions.setAwsServiceEndpoint() prior to pipeline construction, that
>> won't work as different AWS services make use of different endpoints --
>> e.g. the endpoint for SqsIO isn't going to work for S3.
>>
>> What I'd really like to do is provide a different set of AwsOptions per
>> AWS service.  What's the best means of accomplishing this?
>>
>> Tim.
>>
>>


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Reuven Lax
Generally you should not rely on PCollection being ordered, though there
have been discussions about adding some time-ordering semantics.



On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:

> Current Beam model does not guarantee an ordering after a GBK (i.e.
> Combine.perKey() in your). So you cannot expect that the C step sees
> elements in a specific order.
>
> As I recall on Dataflow runner, there is very limited ordering support. Hi 
> +Reuven
> Lax  can share your insights about it?
>
>
> -Rui
>
>
>
> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:
>
>> Hi,
>>
>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>> It roughly looks like below:
>> p.apply(KafkaIO.read() ...)   // (A-1)
>>   .apply(WithKeys.of(...).withKeyType(...))
>>   .apply(Window.into(FixedWindows.of(...)))
>>   .apply(Combine.perKey(...))  // (B)
>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats
>> in (C)
>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>> Note that (C) has its own state which is expected to be fetched and
>> updated by window results (B) in order of event-time.
>>
>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>
>>> p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>
>> "text.txt" contains samples having a single key.
>>
>> I get a wrong result and it turns out that window results didn't feed
>> into (C) in order.
>> Is it because (A-2) makes the pipeline a bounded one?
>>
>> Q1. How to prevent this from happening?
>> Q2. How do you guys usually write an integration test for an unbounded
>> one with stateful function?
>>
>> Best,
>>
>> Dongwon
>>
>


Re: Support of per-key state after windowing

2020-08-23 Thread Reuven Lax
Kenn - shouldn't the Reify happen before the rewindow?

On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles  wrote:

>
>
> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim  wrote:
>
>> Hi Reuven,
>>
>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>> need triggers.
>>
>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>> (D) a per-key state.
>> Hope I understand you and Kenneth correctly this time.
>>
>
> That is correct. However, I think you may want:
>
> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))  // (E)
>
>
> .apply(Reify.windowsInValue()
> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>  // (G)
>
>
>>   .apply(ParDo.of(new MyDoFn()))          // (D)
>
>
> This will make the window information from (B) & (C) available to MyDoFn
> in (D)
>
> Kenn
>
>
>>
>> Best,
>>
>> Dongwon
>>
>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>>
>>> You could simply window into GlobalWindows and add a stateful DoFn
>>> afterwards. No need for the triggering and GroupByKey.
>>>
>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
>>> wrote:
>>>
>>>> Hi Kenneth,
>>>>
>>>> According to your suggestion, I modified my pipeline as follows:
>>>>
>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>// (A)
>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>// (B)
>>>>>   .apply(Combine.perKey(new MyCombinFn()))//
>>>>> (C)
>>>>>   .apply(
>>>>> Window
>>>>>   .into(new GlobalWindows())
>>>>>   // (E1)
>>>>>   .triggering(
>>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>   )
>>>>>   .accumulatingFiredPanes()
>>>>>  // (E3)
>>>>>   )
>>>>>   .apply(GroupByKey.create())
>>>>>  // (F)
>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>   // (D)
>>>>
>>>>
>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>> iterate over a list of output records from (C) sharing the same key.
>>>> This way I can achieve the same effect without having a per-key state
>>>> at (D).
>>>>
>>>> Do I understand your intention correctly?
>>>> If not, please advise me with some hints on it.
>>>>
>>>> Thanks,
>>>>
>>>> Dongwon
>>>>
>>>>
>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>> pipeline looks like below:
>>>>>>
>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>>>
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>
>>>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>>>>
>>>>>>
>>>>>> What I want to do is
>>>>>> (1) to group data by key (A) and window (B),
>>>>>> (2) to do some aggregation (C)
>>>>>> (3) to perform the final computation on each group (D)
>>>>>>
>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>> windowing.
>>>>>>
>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>> per-key state and per-key+window state [1].
>>>>>>
>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>
>>>>>
>>>>> You understand correctly - Beam does not include per-key state that
>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>> achieve the same effect by copying the window metadata into the element 
>>>>> and
>>>>> then re-windowing into the global window before (D).
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dongwon
>>>>>>
>>>>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Reuven Lax
You could simply window into GlobalWindows and add a stateful DoFn
afterwards. No need for the triggering and GroupByKey.

On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim  wrote:

> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
> p.apply(WithKeys.of(...).withKeyType(...))
>>  // (A)
>>   .apply(Window.into(FixedWindows.of(...)))
>>  // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>>   .apply(
>> Window
>>   .into(new GlobalWindows())
>>   // (E1)
>>   .triggering(
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>   )
>>   .accumulatingFiredPanes()
>>// (E3)
>>   )
>>   .apply(GroupByKey.create())
>>// (F)
>>   .apply(ParDo.of(new MyDoFn()))
>>   // (D)
>
>
> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
> over a list of output records from (C) sharing the same key.
> This way I can achieve the same effect without having a per-key state at
> (D).
>
> Do I understand your intention correctly?
> If not, please advise me with some hints on it.
>
> Thanks,
>
> Dongwon
>
>
> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:
>
>> Hi Dongwon,
>>
>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>> pipeline looks like below:
>>>
 p.apply(WithKeys.of(...).withKeyType(...)) // (A)
   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> What I want to do is
>>> (1) to group data by key (A) and window (B),
>>> (2) to do some aggregation (C)
>>> (3) to perform the final computation on each group (D)
>>>
>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>> new window for the key is arriving, which gives me a feeling that Beam
>>> seems to support only per-key+window state, not per-key state, after
>>> windowing.
>>>
>>> I usually work with Flink DataStream API and Flink supports both per-key
>>> state and per-key+window state [1].
>>>
>>> Does Beam support per-key states, not per-key+window states, after
>>> windowing (D)? If I miss something, please correct me.
>>>
>>
>> You understand correctly - Beam does not include per-key state that
>> crosses window boundaries. If I understand your goal correctly, you can
>> achieve the same effect by copying the window metadata into the element and
>> then re-windowing into the global window before (D).
>>
>> Kenn
>>
>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>


Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-21 Thread Reuven Lax
Streaming Dataflow relies on high thread count for performance. Streaming
threads spend a high percentage of time blocked on IO, so in order to get
decent CPU utilization we need a lot of threads. Limiting the thread count
risks causing performance issues.

On Fri, Aug 21, 2020 at 8:00 AM Kamil Wasilewski <
kamil.wasilew...@polidea.com> wrote:

> No, I'm not. But thanks anyway, I totally missed that option!
>
> It occurs in a simple pipeline that executes CoGroupByKey over two
> PCollections. Reading from a bounded source, 20 millions and 2 millions
> elements, respectively. One global window. Here's a link to the code, it's
> one of our tests:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
>
>
> On Thu, Aug 20, 2020 at 6:48 PM Luke Cwik  wrote:
>
>> +user 
>>
>> On Thu, Aug 20, 2020 at 9:47 AM Luke Cwik  wrote:
>>
>>> Are you using Dataflow runner v2[1]?
>>>
>>> If so, then you can use:
>>> --number_of_worker_harness_threads=X
>>>
>>> Do you know where/why the OOM is occurring?
>>>
>>> 1:
>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
>>> 2:
>>> https://github.com/apache/beam/blob/017936f637b119f0b0c0279a226c9f92a2cf4f15/sdks/python/apache_beam/options/pipeline_options.py#L834
>>>
>>> On Thu, Aug 20, 2020 at 7:33 AM Kamil Wasilewski <
>>> kamil.wasilew...@polidea.com> wrote:
>>>
 Hi all,

 As I stated in the title, is there an equivalent for
 --numberOfWorkerHarnessThreads in Python SDK? I've got a streaming pipeline
 in Python which suffers from OutOfMemory exceptions (I'm using Dataflow).
 Switching to highmem workers solved the issue, but I wonder if I can set a
 limit of threads that will be used in a single worker to decrease memory
 usage.

 Regards,
 Kamil




Re: DoFn Timer fire multiple times

2020-07-16 Thread Reuven Lax
One thing to keep in mind - any bundle can be executed multiple times, and
that includes timer firings. The runners will guarantee that only one
execution of the timer will "win" (i.e. if you output to another
PCollection from the timer, only one output will end up in that
PCollection). However if you are logging from the timer callback, you can
see repeated logs. The same hold true for any side-effect operation - e.g.
if you call an external RPC from the timer callback, those calls may be
repeatetd.

On Wed, Jul 15, 2020 at 8:23 PM Kenneth Knowles  wrote:

> Hello!
>
> What runner are you using? Does this reproduce on multiple runners? (it is
> very quick to try out your pipeline on DirectRunner and local versions of
> open source runners like Flink, Spark, etc)
>
> If you can produce a complete working reproduction it will be easier for
> someone to debug. I do not see anything wrong with your code. I assumed you
> got the `window` variable out of the ProcessContext\ (you can also make it
> a parameter to @ProcessElement)
>
> Kenn
>
> On Wed, Jul 15, 2020 at 4:38 PM Zhiheng Huang 
> wrote:
>
>> Hi,
>>
>> I am trying to set a timer at window expiration time for my use case and
>> expect it to fire just once per key per window.
>> But instead I observe that the onTimer() method gets called multiple
>> times almost all the time.
>>
>> Here's the relevant code snippet:
>>
>> @TimerId(WIN_EXP)
>> private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>> @StateId(COUNTS)
>> private final StateSpec>> counts =
>> StateSpecs.value();
>>
>> @ProcessElement
>> public void process(
>> ProcessContext context,
>> @StateId(COUNTS) ValueState>
>> countsState,
>> @TimerId(WIN_EXP) Timer winExpTimer) {
>>
>>   ...
>>   Map counts = countsState.read();
>>   if (counts == null) {
>> counts = new HashMap<>();
>> // Only place where I set the timer
>>
>> winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
>>   }
>>   ... // no return here and I do not observe exception in the pipeline
>>   countsState.write(counts);
>>   ...
>> }
>>
>> I tried adding logs in OnTimer:
>>
>> String key = keyState.read();
>> if (key != null && key.equals("xxx")) {
>>   logger.error(String.format("fired for %s.",
>> context.window().maxTimestamp().toDateTime()));
>> }
>>
>> Output:
>>
>> E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
>>
>> Seems like this is not due to some contention, the first log and the last
>> is ~1minute apart. BTW, my allowed
>> lateness is also set to 1 minute.
>>
>> Anyone can let me know if I am missing something here? I am using beam
>> 2.22 and dataflow runner.
>>
>> Thanks!
>>
>>


Re: Unable to read value from state/Unable to fetch data due to token mismatch for key

2020-07-08 Thread Reuven Lax
This error should be benign. It often means that ownership of the work item
was moved to a different worker (possibly caused by autoscaling or other
source of work rebalancing), so the in-progress work item on that worker
failed. However the work item will be processed successfully on the new
worker that owns it. This should not cause a persistent failure.

On Wed, Jul 8, 2020 at 9:53 PM Mohil Khare  wrote:

> Hello,
>
> I am using beam java sdk 2.19.0 (with enableStreamingEngine set as true)
> and very heavily use stateful beam processing model.
> However, sometimes I am seeing the following exception while reading value
> from state for a key (Please note : here my key is a POJO where fields
> create a kind of composite key. Also I am using AvroCoder for this key):
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException: Unable to
> fetch data due to token mismatch for key
> 0ggadot_static@prosimo.ioHaa552bec-25f2-11ea-8705-267acc424a25H9219bdd5-335f-11ea-bd4f-de07a30b09ca
> @ OC-AU sydney
>
>1.
>   1. at
>   
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>   AbstractFuture.getDoneValue (AbstractFuture.java:531
>   
> 
>   )
>   2. at
>   
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>   AbstractFuture.get (AbstractFuture.java:492
>   
> 
>   )
>   3. at
>   
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>   AbstractFuture$TrustedFuture.get (AbstractFuture.java:83
>   
> 
>   )
>   4. at
>   
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>   ForwardingFuture.get (ForwardingFuture.java:62
>   
> 
>   )
>   5. atorg.apache.beam.runners.dataflow.worker.
>   WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:316
>   
> 
>   )
>   6. atorg.apache.beam.runners.dataflow.worker.
>   WindmillStateInternals$WindmillValue.read (
>   WindmillStateInternals.java:385
>   
> 
>   )
>
> Caused by: org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException:
> Unable to fetch data due to token mismatch for key 
>
>1.
>   1. atorg.apache.beam.runners.dataflow.worker.
>   WindmillStateReader.consumeResponse (WindmillStateReader.java:482
>   
> 
>   )
>   2. atorg.apache.beam.runners.dataflow.worker.
>   WindmillStateReader.startBatchAndBlock (WindmillStateReader.java:420
>   
> 
>   )
>   3. atorg.apache.beam.runners.dataflow.worker.
>   WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:313
>   
> 
>   )
>
>
> Any help to fix this issue would be greatly appreciated.
>
> Thanks and Regards
> Mohil
>


Re: Processing data of different sizes

2020-06-26 Thread Reuven Lax
How are you grouping the elements today?

On Fri, Jun 26, 2020 at 10:27 AM André Missaglia <
andre.missag...@arquivei.com.br> wrote:

> Hi everyone
>
> I'm building a pipeline where I group the elements and then execute a
> CPU-intensive function on each group. This function performs a statistical
> analysis over the elements, only to return a single value on the end.
>
> But because each group has a different amount of elements, some groups are
> processed really quickly, others may take up to 30min to run. The problem
> is that the pipeline processes 99% of the groups in a couple of minutes,
> but then spends another 2 hours processing the big groups. The image below
> illustrates what I mean:
> [image: image.png]
>
>
>
> Even worse than that, if I use for example, 20 dataflow instances with 32
> cores, and the big groups end up each on different machines, I'm gonna pay
> for all those instances while the job isn't done.
>
> I know that one optimization would be to split the groups into
> equally-sized groups, but I'm not sure that is possible in this case given
> the calculation I'm performing.
>
> So I was thinking, is there any way I can "tell" the runner how long I
> think the DoFn is going to run, so that it can do a better job scheduling
> those elements?
>
> Thanks!
>
> --
> *André Badawi Missaglia*
> Data Engineer
> (16) 3509-5515 *|* www.arquivei.com.br
> 
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> 
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> 
> 
> 
> 
>


Re: Error restoring Flink checkpoint

2020-06-23 Thread Reuven Lax
Yes, I agree that serializing coders into the checkpoint creates problems.
I'm wondering whether it is possible to serialize the coder URN + args
instead.

On Mon, Jun 22, 2020 at 11:00 PM Ivan San Jose 
wrote:

> Hi again, just replying here in case this could be useful for someone
> as using Flink checkpoints on Beam is not realiable at all right
> now...
> Even I removed class references to the serialized object in AvroCoder,
> finally I couldn't make AvroCoder work as it is inferring schema using
> ReflectData class (through reflection Java API) and that class has a
> lot of problems when dealing with classes that contain generics.
> So we ended up using a Beam Kryo extension available in beam-sdks-java-
> extensions-kryo maven package and it works like a charm. If you use
> Kryo's CompatibleFieldSerializer, you can add fields to your POJOs and
> recover from saved checkpoints containing old POJO versions.
>
> On Mon, 2020-06-08 at 14:37 +, Ivan San Jose wrote:
> > Hi Reuven, as far I've understood, Apache Beam coders are wrapped
> > into
> > Flink's TypeSerializers, so they are being serialized as part of the
> > chceckpoint according to
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95
> >
> > The problem was that this is done using Java serialization, and, as
> > Beam Coders (at least ProtoCoder and AvroCoder) have references to
> > the
> > POJO class to be encoded/decoded, if you evolve that POJO (even
> > following AVRO/Protobuf backward compatibility rules) then you Flink
> > will raise a java.io.InvalidClassException when trying to restore the
> > checkpoint because is using Java serialization to
> > serialize/deserialize
> > the Beam Coder
> >
> > On Mon, 2020-06-08 at 07:25 -0700, Reuven Lax wrote:
> > > Max, can you explain why Flink serializes the coders in
> > > the checkpoint? Dataflow on update uses the new graph, so doesn't
> > > hit
> > > this problem.
> > >
> > > On Mon, Jun 8, 2020 at 7:21 AM Ivan San Jose <
> > > isanj...@theworkshop.com> wrote:
> > > > Finally I've managed to modify Beam's AvroCoder in order not to
> > > > serialize any Class reference of the object to be
> > > > encoded/decoded,
> > > > and
> > > > could successfully restore a checkpoint after adding a field to
> > > > the
> > > > POJO model.
> > > > I think it would be useful for everyone as current AvroCoder is
> > > > not
> > > > really useful when working with checkpoints to be honest. Will
> > > > try
> > > > to
> > > > create a pull request if you don't mind.
> > > >
> > > > Anyway I've hit another issue, now with AVRO and
> > > > java.time.Instant,
> > > > I'm
> > > > getting following error when decoding a POJO which has a
> > > > java.time.Instant field and was encoded with AvroDecoder:
> > > > java.lang.RuntimeException: java.lang.RuntimeException:
> > > > java.lang.NoSuchMethodException: java.time.Instant.()
> > > > at
> > > > org.apache.avro.specific.SpecificData.newInstance(SpecificData.ja
> > > > va
> > > > :473
> > > > )
> > > >
> > > > Is because java.time.Instant doesn't have a public constructor,
> > > > I've
> > > > tried adding @AvroMeta(key = "logicalType", value = "timestamp-
> > > > millis")
> > > > to problematic POJO field, and didn't work. Also tried adding
> > > > time
> > > > conversions coming with AVRO 1.9.2 on AvroCoder:
> > > > reflectData.addLogicalTypeConversion(new
> > > > org.apache.avro.data.TimeConversions.TimestampMillisConversion())
> > > > ;
> > > >
> > > > But is not working neither. Using JODA time works though.
> > > >
> > > > Does it ring any bell for you?
> > > >
> > > > Thanks
> > > >
> > > > On Fri, 2020-06-05 at 13:15 +, Ivan San Jose wrote:
> > > > > Thank you so much for your detailed answers Max, I will try to
> > > > > achieve
> > > > > what you've suggested about creating a custom coder which
> > > > > doesn't
> > > > > have
> > > > > non-transient fields refencening the serialized Java model. My
> > > > skills
> > > > > with Beam are not so advanced though, but will try my be

Re: Error restoring Flink checkpoint

2020-06-08 Thread Reuven Lax
Maybe instead of wapping the serialized Coder in the TypeSerializer, we
could wrap the Coder URN instead?

On Mon, Jun 8, 2020 at 7:37 AM Ivan San Jose 
wrote:

> Hi Reuven, as far I've understood, Apache Beam coders are wrapped into
> Flink's TypeSerializers, so they are being serialized as part of the
> chceckpoint according to
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95
>
> The problem was that this is done using Java serialization, and, as
> Beam Coders (at least ProtoCoder and AvroCoder) have references to the
> POJO class to be encoded/decoded, if you evolve that POJO (even
> following AVRO/Protobuf backward compatibility rules) then you Flink
> will raise a java.io.InvalidClassException when trying to restore the
> checkpoint because is using Java serialization to serialize/deserialize
> the Beam Coder
>
> On Mon, 2020-06-08 at 07:25 -0700, Reuven Lax wrote:
> > Max, can you explain why Flink serializes the coders in
> > the checkpoint? Dataflow on update uses the new graph, so doesn't hit
> > this problem.
> >
> > On Mon, Jun 8, 2020 at 7:21 AM Ivan San Jose <
> > isanj...@theworkshop.com> wrote:
> > > Finally I've managed to modify Beam's AvroCoder in order not to
> > > serialize any Class reference of the object to be encoded/decoded,
> > > and
> > > could successfully restore a checkpoint after adding a field to the
> > > POJO model.
> > > I think it would be useful for everyone as current AvroCoder is not
> > > really useful when working with checkpoints to be honest. Will try
> > > to
> > > create a pull request if you don't mind.
> > >
> > > Anyway I've hit another issue, now with AVRO and java.time.Instant,
> > > I'm
> > > getting following error when decoding a POJO which has a
> > > java.time.Instant field and was encoded with AvroDecoder:
> > > java.lang.RuntimeException: java.lang.RuntimeException:
> > > java.lang.NoSuchMethodException: java.time.Instant.()
> > > at
> > > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java
> > > :473
> > > )
> > >
> > > Is because java.time.Instant doesn't have a public constructor,
> > > I've
> > > tried adding @AvroMeta(key = "logicalType", value = "timestamp-
> > > millis")
> > > to problematic POJO field, and didn't work. Also tried adding time
> > > conversions coming with AVRO 1.9.2 on AvroCoder:
> > > reflectData.addLogicalTypeConversion(new
> > > org.apache.avro.data.TimeConversions.TimestampMillisConversion());
> > >
> > > But is not working neither. Using JODA time works though.
> > >
> > > Does it ring any bell for you?
> > >
> > > Thanks
> > >
> > > On Fri, 2020-06-05 at 13:15 +, Ivan San Jose wrote:
> > > > Thank you so much for your detailed answers Max, I will try to
> > > > achieve
> > > > what you've suggested about creating a custom coder which doesn't
> > > > have
> > > > non-transient fields refencening the serialized Java model. My
> > > skills
> > > > with Beam are not so advanced though, but will try my best hehehe
> > > >
> > > > On Fri, 2020-06-05 at 13:07 +0200, Maximilian Michels wrote:
> > > > > See my answers inline.
> > > > >
> > > > > > Sorry but I'm afraid I'm not understanding well the
> > > scenario...
> > > > > > What's
> > > > > > the point on keeping a reference of the serialized class if
> > > > > > AVRO/Protobuf are using schemas?
> > > > >
> > > > > They keep a reference to the class because they produce that
> > > type
> > > > > when
> > > > > they deserialize data.
> > > > >
> > > > > > I mean, I've understood that AVRO and Protobuf follow schemas
> > > in
> > > > > > order
> > > > > > to serialize/deserialize an object in a binary way. So I'd
> > > expect
> > > > > > to
> > > > > > have following things saved into the checkpoint:
> > > > > >   - AVRO/Protobuf schema of the serialized object stored in
> > > JSON
> > > > > > or
> > > > > > using Java serialization mechanism
> > > > > >   - Object binary serialized according to the schema
> > > > >
> > &g

Re: Error restoring Flink checkpoint

2020-06-08 Thread Reuven Lax
Max, can you explain why Flink serializes the coders in the checkpoint?
Dataflow on update uses the new graph, so doesn't hit this problem.

On Mon, Jun 8, 2020 at 7:21 AM Ivan San Jose 
wrote:

> Finally I've managed to modify Beam's AvroCoder in order not to
> serialize any Class reference of the object to be encoded/decoded, and
> could successfully restore a checkpoint after adding a field to the
> POJO model.
> I think it would be useful for everyone as current AvroCoder is not
> really useful when working with checkpoints to be honest. Will try to
> create a pull request if you don't mind.
>
> Anyway I've hit another issue, now with AVRO and java.time.Instant, I'm
> getting following error when decoding a POJO which has a
> java.time.Instant field and was encoded with AvroDecoder:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.NoSuchMethodException: java.time.Instant.()
> at
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:473
> )
>
> Is because java.time.Instant doesn't have a public constructor, I've
> tried adding @AvroMeta(key = "logicalType", value = "timestamp-millis")
> to problematic POJO field, and didn't work. Also tried adding time
> conversions coming with AVRO 1.9.2 on AvroCoder:
> reflectData.addLogicalTypeConversion(new
> org.apache.avro.data.TimeConversions.TimestampMillisConversion());
>
> But is not working neither. Using JODA time works though.
>
> Does it ring any bell for you?
>
> Thanks
>
> On Fri, 2020-06-05 at 13:15 +, Ivan San Jose wrote:
> > Thank you so much for your detailed answers Max, I will try to
> > achieve
> > what you've suggested about creating a custom coder which doesn't
> > have
> > non-transient fields refencening the serialized Java model. My skills
> > with Beam are not so advanced though, but will try my best hehehe
> >
> > On Fri, 2020-06-05 at 13:07 +0200, Maximilian Michels wrote:
> > > See my answers inline.
> > >
> > > > Sorry but I'm afraid I'm not understanding well the scenario...
> > > > What's
> > > > the point on keeping a reference of the serialized class if
> > > > AVRO/Protobuf are using schemas?
> > >
> > > They keep a reference to the class because they produce that type
> > > when
> > > they deserialize data.
> > >
> > > > I mean, I've understood that AVRO and Protobuf follow schemas in
> > > > order
> > > > to serialize/deserialize an object in a binary way. So I'd expect
> > > > to
> > > > have following things saved into the checkpoint:
> > > >   - AVRO/Protobuf schema of the serialized object stored in JSON
> > > > or
> > > > using Java serialization mechanism
> > > >   - Object binary serialized according to the schema
> > >
> > > By default we serialize the entire Coder because Flink expects it
> > > to
> > > be
> > > present when the savepoint/checkpoint is restored. It would be more
> > > convenient if we could just use the latest serializer instead but
> > > that
> > > does not seem possible because the loading of the
> > > savepoint/checkpoint
> > > is decoupled from loading new serializers.
> > >
> > > > So, then, when the checkpoint is going to be restored, it checks
> > > > if
> > > > new
> > > > generated schema, from object to be restored, is compatible with
> > > > the
> > > > old one, and, if it is compatible, then just read its
> > > > schema+binary
> > > > data saved into the checkpoint restoring the object.
> > >
> > > Compatibility check is up to the coder, Beam itself has nothing to
> > > do
> > > with this. Flink just loads the old coder and tries to use it to
> > > read
> > > t
> > > he checkpoitn data. Flink also has an interface which allows to
> > > perform
> > > a serializer compatibility check. It allows coder migration by
> > > first
> > > reading the data with the old coder and the writing it with the new
> > > one.
> > > We currently do not make use of this because Beam lacks an
> > > interface
> > > to
> > > check compatibility. However, I imagine we could have a list of
> > > coders
> > > for which we implement such a check. That's also how Flink does it
> > > in
> > > their serializers.
> > >
> > > > Also I don't understand when you said that a reference to the
> > > > Beam
> > > > Coder is saved into the checkpoint, because the error I'm getting
> > > > is
> > > > referencing the java model class ("Caused by:
> > > > java.io.InvalidClassException:
> > > > internal.model.dimension.POJOModel;
> > > > local class incompatible: stream classdesc serialVersionUID =
> > > > -223148029368332375, local class serialVersionUID =
> > > > 4489864664852536553"), not the coder itself.
> > >
> > > It's because the coder needs to know the type it produces, so it
> > > keeps a
> > > class reference. Without this, the coder wouldn't be able to
> > > instantiate
> > > the correct type. It
> > >
> > > Both AvroCoder and ProtoCoder reference the class which makes the
> > > coder
> > > unusable if changes occur to the class. You need to use a coder
> > > which
> > > does not do 

Re: Error restoring Flink checkpoint

2020-06-04 Thread Reuven Lax
Could we try using schemas?

On Thu, Jun 4, 2020 at 9:40 AM Maximilian Michels  wrote:

> I was under the assumption that this should work but ProtoCoder keeps a
> reference of the class used to serialize. That causes the snapshot to
> break.
>
> We can fix this by:
>
> a) writing/using coders which do not keep instances of evolving classes
> b) adding an interface to Beam for Coder serialization/deserialization
> c) adding a mode to Flink which allows to use newly supplied coders
>instead of having to load the old coder
>
> From all the options (a) is the most feasible for you. It looks like
> neither ProtoCoder nor AvroCoder fall into this category.
>
> -Max
>
> On 04.06.20 16:22, Ivan San Jose wrote:
> > I've changed my Java model in order to use ProtoCoder (with
> > @DefaultCoder(ProtoCoder.class)), but I'm getting same error when
> > tryingto restore the last taken checkpoint after adding an attribute to
> > that model.
> >
> > What do you think it could happen? It seems that state saved within the
> > checkpoint is still using Java serialization mechanism...
> >
> > On Thu, 2020-06-04 at 13:05 +, Ivan San Jose wrote:
> >> Thanks Max for your response. I'd try with AvroCoder then. But I
> >> still
> >> have a question, I guess AvroCoder is generating the AVRO schema
> >> using
> >> Java reflection, and then that generated schema is saved within the
> >> Flink checkpoint, right?
> >>
> >> On Wed, 2020-06-03 at 18:00 +0200, Maximilian Michels wrote:
> >>> Hi Ivan,
> >>>
> >>> Moving to the new type serializer snapshot interface is not going
> >>> to
> >>> solve this problem because we cannot version the coder through the
> >>> Beam
> >>> coder interface. That is only possible through Flink. However, it
> >>> is
> >>> usually not trivial.
> >>>
> >>> In Beam, when you evolve your data model, the only way you can
> >>> maintain
> >>> compatible is to use a serialization format which can evolve, e.g.
> >>> KafkaIO or Protobuf.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 03.06.20 16:47, Ivan San Jose wrote:
>  Hi, we have a Beam application running with Flink runner and we
>  are
>  struggling using Flink checkpoints. Everytime we evolve the
>  source
>  code
>  modifying a Java model, an exception is thrown when trying to
>  restore
>  last checkpoint taken:
> 
>  Caused by: java.lang.IllegalStateException: Could not Java-
>  deserialize
>  TypeSerializer while restoring checkpoint metadata for serializer
>  snapshot
>  'org.apache.beam.runners.flink.translation.types.CoderTypeSeriali
>  ze
>  r$Le
>  gacySnapshot'. Please update to the TypeSerializerSnapshot
>  interface
>  that removes Java Serialization to avoid this problem in the
>  future.
>  at
>  org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapsho
>  t.
>  rest
>  oreSerializer(TypeSerializerConfigSnapshot.java:138)
>  at
>  org.apache.flink.runtime.state.StateSerializerProvider.previousSc
>  he
>  maSe
>  rializer(StateSerializerProvider.java:189)
>  at
>  org.apache.flink.runtime.state.StateSerializerProvider.currentSch
>  em
>  aSer
>  ializer(StateSerializerProvider.java:164)
>  at
>  org.apache.flink.runtime.state.RegisteredOperatorStateBackendMeta
>  In
>  fo.g
>  etPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo
>  .j
>  ava:
>  113)
>  at
>  org.apache.flink.runtime.state.OperatorStateRestoreOperation.rest
>  or
>  e(Op
>  eratorStateRestoreOperation.java:94)
>  at
>  org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder
>  .b
>  uild
>  (DefaultOperatorStateBackendBuilder.java:83)
>  ... 12 more
>  Caused by: java.io.InvalidClassException:
>  internal.model.dimension.Dimension; local class incompatible:
>  stream
>  classdesc serialVersionUID = -223148029368332375, local class
>  serialVersionUID = 4489864664852536553
> 
>  As you can see the exception is complaining about class was
>  evolved
>  and
>  they are not compatible any more.
> 
>  After checking some documentation and Beam source code...
> 
> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>  (Serializers
>  vs
>  Coders)
> 
> https://github.com/apache/beam/blob/785609f22d013411b7973bbf9e2d15c3c8171fb2/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
> 
>  It seems that Beam coders are wrapped into Flink's
>  TypeSerializers,
>  and, at the end, Beam coders are the ones in charge of
>  serialize/deserialize objects.
> 
>  Also reading
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17
>  , 

Re: Pipeline Processing Time

2020-06-03 Thread Reuven Lax
Note: you need to tag the timestamp parameter to @ProcessElement with
the @Timestamp annotation.

On Mon, Jun 1, 2020 at 3:31 PM Luke Cwik  wrote:

> You can configure KafkaIO to use some data from the record as the elements
> timestamp. See the KafkaIO javadoc around the TimestampPolicy[1], the
> default is current processing time.
> You can access the timestamp of the element by adding
> "org.joda.time.Instant timestamp" as a parameter to your @ProcessElement,
> see this javadoc for additional details[2]. You could then compute now() -
> timestamp to calculate processing time.
>
> 1:
> https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/io/kafka/TimestampPolicy.html
> 2:
> https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html
>
> On Mon, Jun 1, 2020 at 2:00 PM Talat Uyarer 
> wrote:
>
>> Sorry for the late response. Where does the beam set that timestamp field
>> on element ? Is it set whenever KafkaIO reads that element ?
>>
> And also I have a windowing function on my pipeline. Does the timestamp
>> field change for any kind of operation ? On pipeline I have the
>> following steps: KafkaIO -> Format Conversion Pardo -> SQL Filter ->
>> Windowing Step -> Custom Sink. If timestamp set in KafkaIO, Can I see
>> process time by now() - timestamp in Custom Sink ?
>>
>>
> Thanks
>>
>> On Thu, May 28, 2020 at 2:07 PM Luke Cwik  wrote:
>>
>>> Dataflow provides msec counters for each transform that executes. You
>>> should be able to get them from stackdriver and see them from the Dataflow
>>> UI.
>>>
>>> You need to keep track of the timestamp of the element as it flows
>>> through the system as part of data that goes alongside the element. You can
>>> use the element's timestamp[1] if that makes sense (it might not if you
>>> intend to use a timestamp that is from the kafka record itself and the
>>> record's timestamp isn't the same as the ingestion timestamp). Unless you
>>> are writing your own sink, the sink won't track the processing time at all
>>> so you'll need to add a ParDo that goes right before it that writes the
>>> timing information to wherever you want (a counter, your own metrics
>>> database, logs, ...).
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/018e889829e300ab9f321da7e0010ff0011a73b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L257
>>> 
>>>
>>>
>>> On Thu, May 28, 2020 at 1:12 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Yes I am trying to track how long it takes for a single element to be
 ingested into the pipeline until it is output somewhere.

 My pipeline is unbounded. I am using KafkaIO. I did not think about CPU
 time. if there is a way to track it too, it would be useful to improve my
 metrics.

 On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:

> What do you mean by processing time?
>
> Are you trying to track how long it takes for a single element to be
> ingested into the pipeline until it is output somewhere?
> Do you have a bounded pipeline and want to know how long all the
> processing takes?
> Do you care about how much CPU time is being consumed in aggregate for
> all the processing that your pipeline is doing?
>
>
> On Thu, May 28, 2020 at 11:01 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> I am using Dataflow Runner. The pipeline read from kafkaIO and send
>> Http. I could not find any metadata field on the element to set first 
>> read
>> time.
>>
>> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver 
>> wrote:
>>
>>> Which runner are you using?
>>>
>>> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Hi,

 I have a pipeline which has 5 steps. What is the best way to
 measure processing time for my pipeline?

 Thnaks

>>>


Re: Read a file => process => write to multiple files

2020-05-29 Thread Reuven Lax
Which language are you using?

On Fri, May 29, 2020, 6:03 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hi all,
>
> I am looking for a way to read a large file and generate the following 3
> files:
> 1. extract header
> 2. extract column #1 from all lines
> 3.  extract column # 2 from all files
>
> I use DoFn to extract the values. I am looking for a way to redirect the
> output to three different files? My thought was something like this:
>
> One DoFn that iterate on every line and returns:
> 1. header file => returns the first line and empty string for lines 2..n
> 2. column #1 from all lines => return column #1 values fro lines 1..n
> 3.  column # 2 from all files => return column #2 values fro lines 1..n
>
> What is the correct way to split the output between files? should the DoFn
> return tuple? should I process the line in three different DoFn instead of
> one DoFn?
>
> Thanks,
> Eila
>
> --
> Eila
> 
> Meetup 
>


Re: Join daily update Bigquery table with pubsub topic message

2020-05-23 Thread Reuven Lax
How large is the BigQuery table? Does it fit in memory?

On Sat, May 23, 2020 at 7:01 PM 杨胜  wrote:

> Hi everyone,
>
> I am new to apache beam, but I had experiences on spark streaming.
>
> I have a daily updated bigquery table, I want to use this bigquery table
> as a lookup table, read this table into beam  as  bounded
> PCollection and refresh this collection within beam on daily
> basis, I named this variable *bigqueryTableRows*. I also had another
> pubsub topic messages, I want to read this message as unbounded
> PCollection, I named this variable as *pubsubTableRows*. then
> join *bigqueryTableRows* with *pubsubTableRows*. finally write result
> into bigquery.
>
> I have checked all the examples under beam's github repository:
> https://github.com/apache/beam/tree/d906270f243bb4de20a7f0baf514667590c8c494/examples/java/src/main/java/org/apache/beam/examples.
> But none matches my case.
>
> Any suggestion on how I should implement my pipeline?
>
> Many Thanks,
> Steven
>
>
>
>


Re: PubsubUnboundedSink parameters on Dataflow runner

2020-05-19 Thread Reuven Lax
On Tue, May 19, 2020 at 4:14 AM Marcin Kuthan 
wrote:

> I'm looking for the Pubsub publication details on unbounded collections
> when Dataflow runner is used and streaming engine is on.
>
> As I understood correctly the PubsubUnboundedSink transform is overridden
> by internal implementation.
>
>
> https://lists.apache.org/thread.html/26e2bfdb6eaa7319ea3cc65f9d8a0bfeb7be6a6d88f0167ebad0591d%40%3Cuser.beam.apache.org%3E
>

Only the streaming runner.


>
>
> Questions:
>
> 1. Should I expect that parameters maxBatchByteSize, batchSize are
> respected, or Dataflow internal implementation just ignores them?
>

I don't think that Dataflow pays attention to this.


> 2. What about pubsubClientFactory? The default one is
> PubsubJsonClientFactory, and this is somehow important if I want to
> configure maxBatchByteSize under Pubsub 10MB limit. Json factory encodes
> messages using base64, so the limit should be lowered to 10MB * 0.75 (minus
> some safety margin).
>

Similarly, this has no meaning when using Dataflow streaming.


> 3. Should I expect any differences for bounded and unbounded collections?
> There are different defaults in the Beam code: e.g: maxBatchByteSize is
> ~7.5MB for bounded and ~400kB for unbounded collections, batchSize is 100
> for bounded, and 1000 for unbounded. I also don't understand the reasons
> behind default settings.
> 4. How to estimate streaming engine costs for internal shuffling in
> PubsubUnboundedSink, if any? The default PubsubUnboundedSink implementation
> shuffles data before publication but I don't know how how it is done by
> internal implementation. And I don't need to know, as long as it does not
> generate extra costs :)
>

The internal implementation does not add any extra cost. Dataflow charges
for every MB read from Streaming Engine as a "shuffle" charge, and this
includes the records read from PubSub. The external Beam implementation
includes a full shuffle, which would be more expensive as it includes both
a write and a read.



> Many questions about Dataflow internals but it would be nice to know some
> details, the details important from the performance and costs perspective.
>
> Thanks,
> Marcin
>


Re: TextIO. Writing late files

2020-05-18 Thread Reuven Lax
This is still confusing to me - why would the messages be dropped as late
in this case?

On Mon, May 18, 2020 at 6:14 AM Maximilian Michels  wrote:

> All runners which use the Beam reference implementation drop the
> PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's
> why we can observe this behavior not only in Flink but also Spark.
>
> The WriteFilesResult is returned here:
>
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363
>
> GatherBundlesPerWindow will discard the pane information because all
> buffered elements are emitted in the FinishBundle method which always
> has a NO_FIRING (unknown) pane info:
>
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895
>
> So this seems expected behavior. We would need to preserve the panes in
> the Multimap buffer.
>
> -Max
>
> On 15.05.20 18:34, Reuven Lax wrote:
> > Lateness should never be introduced inside a pipeline - generally late
> > data can only come from a source.  If data was not dropped as late
> > earlier in the pipeline, it should not be dropped after the file write.
> > I suspect that this is a bug in how the Flink runner handles the
> > Reshuffle transform, but I'm not sure what the exact bug is.
> >
> > Reuven
> >
> > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek  > <mailto:jozo.vil...@gmail.com>> wrote:
> >
> > Hi Jose,
> >
> > thank you for putting the effort to get example which
> > demonstrate your problem.
> >
> > You are using a streaming pipeline and it seems that watermark in
> > downstream already advanced further, so when your File pane arrives,
> > it is already late. Since you define that lateness is not tolerated,
> > it is dropped.
> > I myself never had requirement to specify zero allowed lateness for
> > streaming. It feels dangerous. Do you have a specific use case?
> > Also, in may cases, after windowed files are written, I usually
> > collect them into global window and specify a different triggering
> > policy for collecting them. Both cases are why I never came across
> > this situation.
> >
> > I do not have an explanation if it is a bug or not. I would guess
> > that watermark can advance further, e.g. because elements can be
> > processed in arbitrary order. Not saying this is the case.
> > It needs someone with better understanding of how watermark advance
> > is / should be handled within pipelines.
> >
> >
> > P.S.: you can add `.withTimestampFn()` to your generate sequence, to
> > get more stable timing, which is also easier to reason about:
> >
> > Dropping element at 1970-01-01T00:00:19.999Z for key
> > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z)
> > since too far behind inputWatermark:1970-01-01T00:00:24.000Z;
> > outputWatermark:1970-01-01T00:00:24
> > .000Z
> >
> >instead of
> >
> > Dropping element at 2020-05-15T08:52:34.999Z for key ...
> > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since
> > too far behind inputWatermark:2020-05-15T08:52:39.318Z;
> > outputWatermark:2020-05-15T08:52:39.318Z
> >
> >
> >
> >
> > In my
> >
> >
> >
> > On Thu, May 14, 2020 at 10:47 AM Jose Manuel  > <mailto:kiuby88@gmail.com>> wrote:
> >
> > Hi again,
> >
> > I have simplify the example to reproduce the data loss. The
> > scenario is the following:
> >
> > - TextIO write files.
> > - getPerDestinationOutputFilenames emits file names
> > - File names are processed by a aggregator (combine, distinct,
> > groupbyKey...) with a window **without allowlateness**
> > - File names are discarded as late
> >
> > Here you can see the data loss in the picture
> > in
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
> >
> > Please, follow README to run the pipeline and find log traces
> > that say data are dropped as late.
> > Remember, you can run the pipeline with another
> > window's  lateness values (check README.md)
> >
> > Kby.
> >
> > El mar., 12 may. 2020 a las 17:16, Jose Manuel
> > (mailto:kiuby88

  1   2   >