Re: DoFn @Setup with PipelineOptions

2021-03-01 Thread Xinyu Liu
The reason for not passing it in directly is that we have a large amount of
configs here at LinkedIn so we use an internal config management framework
to hide the actual detailed configs needed to construct these resources.
Internally we put a config map inside the PipelineOptions and then during
@Setup, we would like to init the config framework with the configs inside
PipelineOptions. The user does not need to be aware of how the configus are
populated. They can use something like

  ConfigFramework.create(PipelineOptions).getInstance(SomeResourceFactory)

to create a resource instance they need.

On the other hand, even without this kind of use case, it seems still
simpler for the users to use parameters in PipelineOptions if we provide it
directly with @setup.

Thanks,
Xinyu

On Mon, Mar 1, 2021 at 4:14 PM Kenneth Knowles  wrote:

> Why not just pass in the arguments to the DoFn constructor or as a
> variable in the containing scope? Do you only know the option after the
> pipeline is completely constructed so you need to make the switch at
> runtime? Makes sense. I think passing options to @Setup is useful and
> harmless.
>
> Kenn
>
> On Mon, Mar 1, 2021 at 3:42 PM Xinyu Liu  wrote:
>
>> Hi, all,
>>
>> Currently the @Setup method signature in DoFn does not support any
>> arguments. This is a bit cumbersome to use for use cases such as creating a
>> db connection, rest client or fetch some resources, where we would like to
>> read the configs from the PipelineOptions during setup. Shall we support
>> adding a DoFn SetupContext that can let the users specify the
>> PipelineOptions in the arguments, similar to @StartBundle? Seems the
>> PipelineOptions should always be available when the DoFnRunner is created.
>> Anyone seeing the downside of it?
>>
>> Thanks,
>> Xinyu
>>
>


Cancel unbounded pipeline on classic Flink runner

2021-03-01 Thread yu . b . zhang
Hi Beam community,

We are running an unbounded pipeline with Flink runner, is there any method we 
can stop/cancel pipeline in pipeline code? Will this be enabled if issue  
https://issues.apache.org/jira/browse/BEAM-593 
 is resolved? Thanks for any 
responses. 

Thanks, 
Yu

Re: DoFn @Setup with PipelineOptions

2021-03-01 Thread Kenneth Knowles
Why not just pass in the arguments to the DoFn constructor or as a variable
in the containing scope? Do you only know the option after the pipeline is
completely constructed so you need to make the switch at runtime? Makes
sense. I think passing options to @Setup is useful and harmless.

Kenn

On Mon, Mar 1, 2021 at 3:42 PM Xinyu Liu  wrote:

> Hi, all,
>
> Currently the @Setup method signature in DoFn does not support any
> arguments. This is a bit cumbersome to use for use cases such as creating a
> db connection, rest client or fetch some resources, where we would like to
> read the configs from the PipelineOptions during setup. Shall we support
> adding a DoFn SetupContext that can let the users specify the
> PipelineOptions in the arguments, similar to @StartBundle? Seems the
> PipelineOptions should always be available when the DoFnRunner is created.
> Anyone seeing the downside of it?
>
> Thanks,
> Xinyu
>


Re: DoFn @Setup with PipelineOptions

2021-03-01 Thread Robert Bradshaw
Any reason not to simply pass these parameters into the DoFn constructor?

On Mon, Mar 1, 2021 at 3:42 PM Xinyu Liu  wrote:

> Hi, all,
>
> Currently the @Setup method signature in DoFn does not support any
> arguments. This is a bit cumbersome to use for use cases such as creating a
> db connection, rest client or fetch some resources, where we would like to
> read the configs from the PipelineOptions during setup. Shall we support
> adding a DoFn SetupContext that can let the users specify the
> PipelineOptions in the arguments, similar to @StartBundle? Seems the
> PipelineOptions should always be available when the DoFnRunner is created.
> Anyone seeing the downside of it?
>
> Thanks,
> Xinyu
>


Re: BEAM-6855

2021-03-01 Thread Kenneth Knowles
Another workaround might be to create a PCollection that is the tagged
union of the main input and the side input. I think you can avoid
per-element overhead of checking which input they are from by setting some
sort of timer or threshold where you switch a hardcoded lambda to the "main
input only" path.

Kenn

On Tue, Feb 23, 2021 at 5:07 PM Ahmet Altay  wrote:

> Hemali, would this be a reasonable workaround for your problem?
>
> /cc +Kenneth Knowles  - In case there is an alternative
> workaround to BEAM-6855.
> /cc +Cosmin Arad 
>
> On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette  wrote:
>
>> I added JvmInitializer [1] to do some one-time initialization per JVM
>> before processing starts. It might be useful here... the intended use-case
>> was to perform quick configuration functions, but I suppose you could use
>> it to pull some data that you can reference later.
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>>
>> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada  wrote:
>>
>>> +Brian Hulette  I believe you worked on a way to
>>> load data on worker startup?
>>>
>>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins 
>>> wrote:
>>>
 The getState function should be static, sorry. "synchronized static
 @NotNull MyState getState()"

 On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins 
 wrote:

> > On every dataflow start, I want to read from CloudSQL and build the
> cache
>
> If you do this outside of dataflow, you can use a static to do this on
> every worker start. Is that what you're looking for? For example:
>
> final class StateLoader {
>   private StateLoader() {}
>
>   @GuardedBy("this")
>   private static @Nullable MyState state;
>
>   synchronized @NotNull MyState getState() {
> if (state == null) {
>   state = LoadStateFromSQL();
> }
> return state;
>   }
> }
>
> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
> hsuta...@paloaltonetworks.com> wrote:
>
>> Hi,
>>
>> I have one question. This is *kind of a blocker for our upcoming
>> release*. It would be great if you could reply at your earliest
>> convenience.
>>
>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>> processing (StateId, ValueState). I have also implemented OnTimer for my
>> stateful transformation. On every dataflow start, I want to read from
>> CloudSQL and build the cache. For that, I need to provide the pre-built
>> cache as side-input to my current transform. But, it looks like there is
>> some issue when I add side input to my stateful transform. I think I am
>> hitting BEAM-6855 issue (
>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>> workaround? Any help would be appreciated.
>>
>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>> I am using GlobalWindow.
>>
>> private class GetLatestState extends DoFn> DataTunnelStatus>, DataTunnelStateRelational> {
>> @TimerId("tunnelStatusExpiryTimer")
>> private final TimerSpec tunnelStatusExpiryTimer = 
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>> @StateId("tunnelStatus")
>> private final StateSpec> 
>> tunnelStatusCache =
>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>
>> @ProcessElement
>> public void process(@Element KV 
>> input,
>> MultiOutputReceiver out,
>> @StateId("tunnelStatus") 
>> ValueState tunnelStatusCache,
>> @TimerId("tunnelStatusExpiryTimer") Timer 
>> tunnelStatusExpiryTimer,
>> ProcessContext c)
>>
>>
>>
>> Thanks,
>> Hemali Sutaria
>>
>>


DoFn @Setup with PipelineOptions

2021-03-01 Thread Xinyu Liu
Hi, all,

Currently the @Setup method signature in DoFn does not support any
arguments. This is a bit cumbersome to use for use cases such as creating a
db connection, rest client or fetch some resources, where we would like to
read the configs from the PipelineOptions during setup. Shall we support
adding a DoFn SetupContext that can let the users specify the
PipelineOptions in the arguments, similar to @StartBundle? Seems the
PipelineOptions should always be available when the DoFnRunner is created.
Anyone seeing the downside of it?

Thanks,
Xinyu


Re: Question about Schema Logical Types and RowWithGetters

2021-03-01 Thread Steve Niemitz
ah thanks. The class is marked as public and not annotated with @Internal,
but I did just now notice that FieldValueGetter is @Internal.  Out of
curiosity then, is there a recommended way to implement a "row proxy"?
RowWithGetters has a lot of useful logic in it, maybe it should be
non-internal?

On Mon, Mar 1, 2021 at 2:54 PM Reuven Lax  wrote:

> RowWithGetters is used as an internal detail in Beam, so it special cases
> internal Beam-provided logical types. If we want it to work well with user
> logical types, we might need to redesign it a bit.
>
> On Mon, Mar 1, 2021 at 10:44 AM Steve Niemitz  wrote:
>
>> I'm working on a little library to wrap domain objects into Rows (using
>> RowWithGetters) and am trying to implement a new LogicalType.  I'm
>> struggling to understand how it's supposed to interact with RowWithGetters
>> however.
>>
>> Let's say that my LogicalType has a java type of MyLogicalType.Value, and
>> base type of byte[].  This means my I have something like:
>>
>> class MyLogicalType extends Schema.LogicalType> byte[]>
>>
>> Importantly here, I need to implement a toInputType function that takes a
>> byte[] and returns MyLogicalType.Value.
>>
>> What I'm trying to figure out is, if I construct a RowWithGetters, it
>> seems like my getter for the logical type field needs to first convert from
>> my domain value to a byte[], which will next be converted from a byte[] ->
>> MyLogicalType.Value (in toInputType), and then (in the context of the
>> SchemaCoder), be one more time converted from MyLogicalType.Value -> byte[]
>> (in toBaseType).  This seems like there's an extra round trip in here that
>> there shouldn't be.  For reference, the first part takes place in
>> RowWithGetters.getValue [1]
>>
>> More confusingly, OneOfType seems to be special cased to NOT do that, and
>> instead expects the getter to directly return a OneOfType.Value.  If it
>> followed the generic case, I would expect the getter to return a Row here
>> instead.
>>
>> Any thoughts on this?
>>
>> [1]
>> https://github.com/apache/beam/blob/b853f9c7aa3662eb707290ded4c22f86befeda68/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java#L140
>>
>


Re: Question about Schema Logical Types and RowWithGetters

2021-03-01 Thread Reuven Lax
RowWithGetters is used as an internal detail in Beam, so it special cases
internal Beam-provided logical types. If we want it to work well with user
logical types, we might need to redesign it a bit.

On Mon, Mar 1, 2021 at 10:44 AM Steve Niemitz  wrote:

> I'm working on a little library to wrap domain objects into Rows (using
> RowWithGetters) and am trying to implement a new LogicalType.  I'm
> struggling to understand how it's supposed to interact with RowWithGetters
> however.
>
> Let's say that my LogicalType has a java type of MyLogicalType.Value, and
> base type of byte[].  This means my I have something like:
>
> class MyLogicalType extends Schema.LogicalType
>
> Importantly here, I need to implement a toInputType function that takes a
> byte[] and returns MyLogicalType.Value.
>
> What I'm trying to figure out is, if I construct a RowWithGetters, it
> seems like my getter for the logical type field needs to first convert from
> my domain value to a byte[], which will next be converted from a byte[] ->
> MyLogicalType.Value (in toInputType), and then (in the context of the
> SchemaCoder), be one more time converted from MyLogicalType.Value -> byte[]
> (in toBaseType).  This seems like there's an extra round trip in here that
> there shouldn't be.  For reference, the first part takes place in
> RowWithGetters.getValue [1]
>
> More confusingly, OneOfType seems to be special cased to NOT do that, and
> instead expects the getter to directly return a OneOfType.Value.  If it
> followed the generic case, I would expect the getter to return a Row here
> instead.
>
> Any thoughts on this?
>
> [1]
> https://github.com/apache/beam/blob/b853f9c7aa3662eb707290ded4c22f86befeda68/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java#L140
>


Re: Random outputs for ARRAY_CONCAT_AGG fn zetasql

2021-03-01 Thread Kenneth Knowles
Yea, the reason is that SQL relations are not ordered. So any ordering of
[1, 2, 3, 4] and [5, 6] and [7, 8, 9] is possible and correct.

Kenn

On Mon, Mar 1, 2021 at 11:01 AM Tyson Hamilton  wrote:

> I didn't find anything like that after a brief look. What you could do
> instead is something like:
>
> PAssert.thatSingleton(stream).satisfies( row -> assertThat("array_field
> containsInAnyOrder", row.getArray("array_field"),
> containsInAnyOrder(Arrays.asList(...)));
>
> using junit/hamcrest matchers. I didn't verify this works myself but it
> should give you an idea for some next steps.
>
>
> On Mon, Mar 1, 2021 at 12:37 AM Sonam Ramchand <
> sonam.ramch...@venturedive.com> wrote:
>
>> Hi Devs,
>> I have implemented the ARRAY_CONCAT_AGG function for zetasql dialect. I
>> am trying to validate the test as:
>>
>> @Test
>> public void testArrayConcatAggZetasql() {
>>   String sql =
>>   "SELECT ARRAY_CONCAT_AGG(x) AS array_concat_agg FROM (SELECT [1, 2, 3, 
>> 4] AS x UNION ALL SELECT [5, 6] UNION ALL SELECT [7, 8, 9])";
>>
>>   ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
>>   BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
>>   PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
>> beamRelNode);
>>
>>   Schema schema = Schema.builder().addArrayField("array_field", 
>> FieldType.INT64).build();
>>   PAssert.that(stream)
>>   .containsInAnyOrder(
>>   Row.withSchema(schema).addArray(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
>> 9L).build());
>>   
>> pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
>> }
>>
>> Expected Output is: 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L.
>> But I am getting randomly different outputs:
>> 1. 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L
>> 2. 5L, 6L, 7L, 8L, 9L, 1L, 2L, 3L, 4L
>> 3. 7L, 8L, 9L, 5L, 6L, 1L, 2L, 3L, 4L
>>
>> As per my understanding, it is because of containsInAnyOrder function. Is
>> there anything Like:
>>
>>PAssert.that(stream)
>> .containsAnyOfThem(
>> Row.withSchema(schema).addArray(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
>> 9L).build(),
>> Row.withSchema(schema).addArray(5L, 6L, 7L, 8L, 9L, 1L, 2L, 
>> 3L, 4L).build(),
>> Row.withSchema(schema).addArray(7L, 8L, 9L, 5L, 6L, 1L, 2L, 
>> 3L, 4L).build());
>>
>> I would really appreciate if anyone can help me in knowing how to handle
>> such scenario in Beam.
>>
>> Thanks!
>> --
>> Regards,
>> *Sonam*
>> Software Engineer
>> Mobile: +92 3088337296 <+92%20308%208337296>
>>
>> 
>>
>


Re: Random outputs for ARRAY_CONCAT_AGG fn zetasql

2021-03-01 Thread Tyson Hamilton
I didn't find anything like that after a brief look. What you could do
instead is something like:

PAssert.thatSingleton(stream).satisfies( row -> assertThat("array_field
containsInAnyOrder", row.getArray("array_field"),
containsInAnyOrder(Arrays.asList(...)));

using junit/hamcrest matchers. I didn't verify this works myself but it
should give you an idea for some next steps.


On Mon, Mar 1, 2021 at 12:37 AM Sonam Ramchand <
sonam.ramch...@venturedive.com> wrote:

> Hi Devs,
> I have implemented the ARRAY_CONCAT_AGG function for zetasql dialect. I am
> trying to validate the test as:
>
> @Test
> public void testArrayConcatAggZetasql() {
>   String sql =
>   "SELECT ARRAY_CONCAT_AGG(x) AS array_concat_agg FROM (SELECT [1, 2, 3, 
> 4] AS x UNION ALL SELECT [5, 6] UNION ALL SELECT [7, 8, 9])";
>
>   ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
>   BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
>   PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
> beamRelNode);
>
>   Schema schema = Schema.builder().addArrayField("array_field", 
> FieldType.INT64).build();
>   PAssert.that(stream)
>   .containsInAnyOrder(
>   Row.withSchema(schema).addArray(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
> 9L).build());
>   
> pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
> }
>
> Expected Output is: 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L.
> But I am getting randomly different outputs:
> 1. 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L
> 2. 5L, 6L, 7L, 8L, 9L, 1L, 2L, 3L, 4L
> 3. 7L, 8L, 9L, 5L, 6L, 1L, 2L, 3L, 4L
>
> As per my understanding, it is because of containsInAnyOrder function. Is
> there anything Like:
>
>PAssert.that(stream)
> .containsAnyOfThem(
> Row.withSchema(schema).addArray(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
> 9L).build(),
> Row.withSchema(schema).addArray(5L, 6L, 7L, 8L, 9L, 1L, 2L, 
> 3L, 4L).build(),
> Row.withSchema(schema).addArray(7L, 8L, 9L, 5L, 6L, 1L, 2L, 
> 3L, 4L).build());
>
> I would really appreciate if anyone can help me in knowing how to handle
> such scenario in Beam.
>
> Thanks!
> --
> Regards,
> *Sonam*
> Software Engineer
> Mobile: +92 3088337296 <+92%20308%208337296>
>
> 
>


Question about Schema Logical Types and RowWithGetters

2021-03-01 Thread Steve Niemitz
I'm working on a little library to wrap domain objects into Rows (using
RowWithGetters) and am trying to implement a new LogicalType.  I'm
struggling to understand how it's supposed to interact with RowWithGetters
however.

Let's say that my LogicalType has a java type of MyLogicalType.Value, and
base type of byte[].  This means my I have something like:

class MyLogicalType extends Schema.LogicalType

Importantly here, I need to implement a toInputType function that takes a
byte[] and returns MyLogicalType.Value.

What I'm trying to figure out is, if I construct a RowWithGetters, it seems
like my getter for the logical type field needs to first convert from my
domain value to a byte[], which will next be converted from a byte[] ->
MyLogicalType.Value (in toInputType), and then (in the context of the
SchemaCoder), be one more time converted from MyLogicalType.Value -> byte[]
(in toBaseType).  This seems like there's an extra round trip in here that
there shouldn't be.  For reference, the first part takes place in
RowWithGetters.getValue [1]

More confusingly, OneOfType seems to be special cased to NOT do that, and
instead expects the getter to directly return a OneOfType.Value.  If it
followed the generic case, I would expect the getter to return a Row here
instead.

Any thoughts on this?

[1]
https://github.com/apache/beam/blob/b853f9c7aa3662eb707290ded4c22f86befeda68/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java#L140


Beam Dependency Check Report (2021-03-01)

2021-03-01 Thread Apache Jenkins Server

High Priority Dependency Updates Of Beam Python SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
chromedriver-binary
88.0.4324.96.0
89.0.4389.23.0
2021-01-25
2021-02-01BEAM-10426
dill
0.3.1.1
0.3.3
2019-10-07
2020-11-02BEAM-11167
google-cloud-bigquery
1.28.0
2.10.0
2020-10-05
2021-03-01BEAM-5537
google-cloud-datastore
1.15.3
2.1.0
2020-11-16
2020-12-07BEAM-8443
google-cloud-dlp
1.0.0
3.0.1
2020-06-29
2021-02-01BEAM-10344
google-cloud-language
1.3.0
2.0.0
2020-10-26
2020-10-26BEAM-8
google-cloud-pubsub
1.7.0
2.4.0
2020-07-20
2021-03-01BEAM-5539
google-cloud-spanner
1.19.1
3.1.0
2020-11-16
2021-03-01BEAM-10345
google-cloud-videointelligence
1.16.1
2.0.0
2020-11-23
2020-11-23BEAM-11319
google-cloud-vision
1.0.0
2.2.0
2020-03-24
2021-02-15BEAM-9581
grpcio-tools
1.30.0
1.36.0
2020-06-29
2021-03-01BEAM-9582
idna
2.10
3.1
2021-01-04
2021-01-11BEAM-9328
mock
2.0.0
4.0.3
2019-05-20
2020-12-14BEAM-7369
mypy-protobuf
1.18
2.4
2020-03-24
2021-02-08BEAM-10346
nbconvert
5.6.1
6.0.7
2020-10-05
2020-10-05BEAM-11007
Pillow
7.2.0
8.1.0
2020-10-19
2021-01-04BEAM-11071
PyHamcrest
1.10.1
2.0.2
2020-01-20
2020-07-08BEAM-9155
pytest
4.6.11
6.2.2
2020-07-08
2021-02-01BEAM-8606
pytest-xdist
1.34.0
2.2.1
2020-08-17
2021-02-15BEAM-10713
setuptools
52.0.0
54.0.0
2021-01-25
2021-03-01BEAM-10714
tenacity
5.1.5
6.3.1
2019-11-11
2020-12-21BEAM-8607
High Priority Dependency Updates Of Beam Java SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
com.amazonaws:aws-java-sdk-cloudwatch
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11763
com.amazonaws:aws-java-sdk-core
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11764
com.amazonaws:aws-java-sdk-dynamodb
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11765
com.amazonaws:aws-java-sdk-s3
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11766
com.amazonaws:aws-java-sdk-sns
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11767
com.amazonaws:aws-java-sdk-sqs
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11768
com.amazonaws:aws-java-sdk-sts
1.11.718
1.11.964
2020-02-07
2021-02-26BEAM-11769
com.azure:azure-core
1.6.0
1.13.0
2020-07-02
2021-02-06BEAM-11888
com.azure:azure-identity
1.0.8
1.3.0-beta.1
2020-07-07
2021-02-11BEAM-11814
com.azure:azure-storage-common
12.8.0
12.11.0-beta.1
2020-08-13
2021-02-11BEAM-11889
com.datastax.cassandra:cassandra-driver-core
3.10.2
4.0.0
2020-08-26
2019-03-18BEAM-8674
com.esotericsoftware:kryo
4.0.2
5.0.3
2018-03-20
2020-12-15BEAM-5809
com.esotericsoftware.kryo:kryo
2.21
2.24.0
2013-02-27
2014-05-04BEAM-5574
com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin
0.33.0
0.36.0
2020-09-14
2020-11-09BEAM-6645
com.google.api.grpc:grpc-google-cloud-pubsublite-v1
0.7.0
0.11.0
2020-12-08
2021-02-25BEAM-11008
com.google.api.grpc:proto-google-cloud-bigquerystorage-v1
1.8.0
1.12.0
2021-01-05
2021-02-25BEAM-11890
com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2
0.108.0
0.112.0
2021-01-05
2021-02-25BEAM-11891
com.google.api.grpc:proto-google-cloud-dlp-v2
1.1.4
2.2.9
2020-05-04
2021-02-26BEAM-11892
com.google.api.grpc:proto-google-cloud-pubsublite-v1
0.7.0
0.11.0
2020-12-08
2021-02-25BEAM-11009
com.google.api.grpc:proto-

Random outputs for ARRAY_CONCAT_AGG fn zetasql

2021-03-01 Thread Sonam Ramchand
Hi Devs,
I have implemented the ARRAY_CONCAT_AGG function for zetasql dialect. I am
trying to validate the test as:

@Test
public void testArrayConcatAggZetasql() {
  String sql =
  "SELECT ARRAY_CONCAT_AGG(x) AS array_concat_agg FROM (SELECT [1,
2, 3, 4] AS x UNION ALL SELECT [5, 6] UNION ALL SELECT [7, 8, 9])";

  ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
  BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
  PCollection stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);

  Schema schema = Schema.builder().addArrayField("array_field",
FieldType.INT64).build();
  PAssert.that(stream)
  .containsInAnyOrder(
  Row.withSchema(schema).addArray(1L, 2L, 3L, 4L, 5L, 6L, 7L,
8L, 9L).build());
  
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

Expected Output is: 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L.
But I am getting randomly different outputs:
1. 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L
2. 5L, 6L, 7L, 8L, 9L, 1L, 2L, 3L, 4L
3. 7L, 8L, 9L, 5L, 6L, 1L, 2L, 3L, 4L

As per my understanding, it is because of containsInAnyOrder function. Is
there anything Like:

   PAssert.that(stream)
.containsAnyOfThem(
Row.withSchema(schema).addArray(1L, 2L, 3L, 4L, 5L, 6L,
7L, 8L, 9L).build(),
Row.withSchema(schema).addArray(5L, 6L, 7L, 8L, 9L,
1L, 2L, 3L, 4L).build(),
Row.withSchema(schema).addArray(7L, 8L, 9L, 5L, 6L,
1L, 2L, 3L, 4L).build());

I would really appreciate if anyone can help me in knowing how to handle
such scenario in Beam.

Thanks!
-- 
Regards,
*Sonam*
Software Engineer
Mobile: +92 3088337296