Re: Beam Sql parse error: Cannot find a matching Calcite SqlTypeName for Beam logical type: OneOf

2023-04-24 Thread Andrew Pilloud via user
It means SQL doesn't support the "OneOf" type.

On Mon, Apr 24, 2023 at 1:42 AM Jeff Zhang  wrote:

>
> Hi all,
>
> I got the following error when running sql: select * from PCOLLECTION,
> what does this mean?
>
>
> Exception in thread "main"
> org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse
> query select * from PCOLLECTION
> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:216)
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:110)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373)
> at
> io.zjffdu.demos.beam.KafkaRiskExample.readKafkaJson(KafkaRiskExample.java:151)
> at io.zjffdu.demos.beam.KafkaRiskExample.main(KafkaRiskExample.java:169)
> Caused by:
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.ValidationException:
> java.lang.IllegalArgumentException: Cannot find a matching Calcite
> SqlTypeName for Beam logical type: OneOf
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:226)
> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:184)
> ... 8 more
> Caused by: java.lang.IllegalArgumentException: Cannot find a matching
> Calcite SqlTypeName for Beam logical type: OneOf
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Beam shell sql with zeta

2023-04-20 Thread Andrew Pilloud via user
set plannerName doesn't actually do anything on the SQL shell at query
parse time, it will still use the calcite parser. Have you tried calcite
SQL?

Support for struts is somewhat limited. I know there are bugs around nested
structs and structs with single values.

Andrew

On Thu, Apr 20, 2023 at 9:26 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> I have a question regarding usage of Zeta with SQL extensions in SQL
> shell. I try to:
>
> ```
>
> SET runner = DirectRunner;
> SET tempLocation = `/tmp/test/`;
> SET streaming=`True`;
> SET plannerName =
> `org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;
>
> CREATE EXTERNAL TABLE etl_raw(
>  event_timestamp TIMESTAMP,
>  event_type VARCHAR,
>  message_id VARCHAR,
>  tracking_source VARCHAR,
>  tracking_version VARCHAR,
>  `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>>
> )
> TYPE pubsub
> LOCATION 'projects/xxx/topics/xxx'
> TBLPROPERTIES '{"format":"json"}';
>
> ```
>
> But get error `parse failed: Encountered "STRUCT" `.
>
> If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but
> still I do fail to receive data on
>
> `SELECT * FROM etl_raw LIMIT 1;` with exception of
> `java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure
> that the field is there in json payload.
>
> With commented out `repo_state` filed I am able to retrieve the data.
> Unfortunately I do not have control over the payload structure as its
> 3rd party hook to make it flat.
>
> In general I am unable to parse json msg from pubsub having structured
> field.
>
> Is anyone familiar with this part of Beam functionalities?
>
> Best regards
>
> Wisniowski Piotr
>
>
>
>


Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Andrew Pilloud via user
Hi Talat,

I managed to turn your test case into something against Calcite. It
looks like there is a bug affecting tables that contain one or more
single element structs and no multi element structs. I've sent the
details to the Calcite mailing list here.
https://lists.apache.org/thread/tlr9hsmx09by79h91nwp2d4nv8jfwsto

I'm experimenting with ideas on how to work around this but a fix will
likely require a Calcite upgrade, which is not something I'd have time
to help with. (I'm not on the Google Beam team anymore.)

Andrew

On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
 wrote:
>
> Hi @Andrew Pilloud
>
> Sorry for the late response. Yes your test is working fine. I changed the 
> test input structure like our input structure. Now this test also has the 
> same exception.
>
> Feb 21, 2023 2:02:28 PM 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: SQL:
> WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`, 
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT 
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> FROM `tempTable` AS `tempTable`)
> Feb 21, 2023 2:02:28 PM 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: SQLPlan>
> LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
>   LogicalFilter(condition=[=($0.name, 'innerStr')])
> LogicalProject(name=[$0.name], id=[$1], value=[$2])
>   BeamIOSourceRel(table=[[beam, panwRowTestTable]])
>
>
> fieldList must not be null, type = VARCHAR
> java.lang.AssertionError: fieldList must not be null, type = VARCHAR
>
> I dont know what is different from yours. I am sharing my version of the test 
> also.
>
>
> Index: 
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> ===
> diff --git 
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> --- 
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> +++ 
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  (date 1677017032324)
> @@ -54,6 +54,9 @@
>private static final Schema innerRowSchema =
>
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
>
> +  private static final Schema innerPanwRowSchema =
> +  Schema.builder().addStringField("name").build();
> +
>private static final Schema innerRowWithArraySchema =
>Schema.builder()
>.addStringField("string_field")
> @@ -127,8 +130,12 @@
>.build()))
>.put(
>"basicRowTestTable",
> -  TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
> -  
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> +  TestBoundedTable.of(FieldType.row(innerRowSchema), "col", 
> FieldType.INT64, "field")
> +  
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 
> 1L))
> +.put(
> +  "panwRowTestTable",
> +  TestBoundedTable.of(FieldType.row(innerPanwRowSchema), 
> "user_info", FieldType.INT64, "id", FieldType.STRING, "value")
> +  
> .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L, 
> "some_value"))
>.put(
>"rowWithArrayTestTable",
>TestBoundedTable.of(FieldType.row(rowWithArraySchema), 
> "col")
> @@ -219,6 +226,21 @@
>  .build());
>  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
>}
> +
> +  @Test
> +  public void testBasicRowWhereField() {
> +BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> +PCollection stream =
> +BeamSqlRelUtils.toPCollection(
> +pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM 
> panwRowTestTable WHERE panwRowTestTable.`user_info`.`name` = 'in

Re: Beam SQL Alias issue while using With Clause

2023-02-10 Thread Andrew Pilloud via user
voke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy5.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
> at
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
>
>
>
>
> On Thu, Feb 2, 2023 at 1:06 PM Andrew Pilloud  wrote:
>
>> It looks like Calcite stopped considering field names in RelNode equality
>> as of Calcite 2.22 (which we use in Beam v2.34.0+). This can result in a
>> planner state where two nodes that only differ by field name are considered
>> equivalent.
>>
>> I have a fix for Beam in https://github.com/apache/beam/pull/25290
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_pull_25290=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=Zq7lZSEVnOqbVQquizMtIO5a2yUoZRWyaM63PCyD6M7a6bSqJeUJctpvrtSaBoMm=zbs7j8bbjaW39q8n3AF_bLzmIaCdLmJutqxEVfTHOZE=>
>> and I'll send an email to the Calcite dev list with more details.
>>
>> Andrew
>>
>> On Fri, Jan 27, 2023 at 11:33 AM Andrew Pilloud 
>> wrote:
>>
>>> Also this is at very least a Beam bug. You can file a Beam issue if you
>>> want, otherwise I will when I get back.
>>>
>>> Andrew
>>>
>>> On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud 
>>> wrote:
>>>
>>>> Hi Talat,
>>>>
>>>> I did get your test case running and added some logging to
>>>> RexProgramBuilder.mergePrograms. There is only one merge that occurs during
>>>> the test and it has an output type of RecordType(JavaType(int) ID,
>>>> JavaType(class java.lang.String) V). This does seem like the correct output
>>>> name but it doesn't match the final output name, so something is still
>>>> different than the Beam test case. I also modified mergePrograms to
>>>> purposely corrupt the output names, that did not cause the test to fail or
>>>> trip the 'assert mergedProg.getOutputRowType() ==
>>>> topProgram.getOutputRowType();' in mergePrograms. I could not find any
>>>> Calcite unit tests for RexProgramBuilder.mergePrograms or
>>>> CoreRules.CALC_MERGE rule so I think it is still probable that the problem
>>>> is in this area.
>>>>
>>>> One minor issue I encountered. It took me a while to get your test case
>>>> running, it doesn't appear there are any calcite gradle rules to run
>>>> CoreQuidemTest and constructing the classpath manually was tedious. Did I
>>>> miss something?
>>>>
>>>> I'm still working on this but I'm out today and Monday, it will
>>>> probably be Wednesday before I make any more progress.
>

Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-07 Thread Andrew Pilloud via user
This sounds reasonable to me as well.

I've made swaps like this in the past, the base image of each is probably a
bigger factor than the JDK. The openjdk images were based on Debian 11. The
default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
option. Ubuntu is a Debian derivative but the versions and package names
aren't exact matches and Ubuntu tends to update a little faster. For most
users I don't think this will matter but users building custom containers
may need to make minor changes. The alpine option will be much smaller
(which could be a significant improvement) but would be a more significant
change to the environment.

On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev 
wrote:

> Seams reasonable to me.
>
> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
> wrote:
> >
> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
> stopped being built and supported since July 2022. I have filed [2] to
> track the resolution of this issue.
> >
> > Based upon [1], almost everyone is swapping to the eclipse-temurin
> container[3] as their base based upon the linked issues from the
> deprecation notice[1]. The eclipse-temurin container is released under
> these licenses:
> > Apache License, Version 2.0
> > Eclipse Distribution License 1.0 (BSD)
> > Eclipse Public License 2.0
> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
> Assembly Exception
> > 一 (Secondary) GNU General Public License, version 2 with the GNU
> Classpath Exception
> >
> > I propose that we swap all our containers to the eclipse-temurin
> containers[3].
> >
> > Open to other ideas and also would be great to hear about your
> experience in any other projects that you have had to make a similar
> decision.
> >
> > 1: https://github.com/docker-library/openjdk/issues/505
> > 2: https://github.com/apache/beam/issues/25371
> > 3: https://hub.docker.com/_/eclipse-temurin
>


Re: Beam SQL Alias issue while using With Clause

2023-02-02 Thread Andrew Pilloud via user
It looks like Calcite stopped considering field names in RelNode equality
as of Calcite 2.22 (which we use in Beam v2.34.0+). This can result in a
planner state where two nodes that only differ by field name are considered
equivalent.

I have a fix for Beam in https://github.com/apache/beam/pull/25290 and I'll
send an email to the Calcite dev list with more details.

Andrew

On Fri, Jan 27, 2023 at 11:33 AM Andrew Pilloud  wrote:

> Also this is at very least a Beam bug. You can file a Beam issue if you
> want, otherwise I will when I get back.
>
> Andrew
>
> On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud 
> wrote:
>
>> Hi Talat,
>>
>> I did get your test case running and added some logging to
>> RexProgramBuilder.mergePrograms. There is only one merge that occurs during
>> the test and it has an output type of RecordType(JavaType(int) ID,
>> JavaType(class java.lang.String) V). This does seem like the correct output
>> name but it doesn't match the final output name, so something is still
>> different than the Beam test case. I also modified mergePrograms to
>> purposely corrupt the output names, that did not cause the test to fail or
>> trip the 'assert mergedProg.getOutputRowType() ==
>> topProgram.getOutputRowType();' in mergePrograms. I could not find any
>> Calcite unit tests for RexProgramBuilder.mergePrograms or
>> CoreRules.CALC_MERGE rule so I think it is still probable that the problem
>> is in this area.
>>
>> One minor issue I encountered. It took me a while to get your test case
>> running, it doesn't appear there are any calcite gradle rules to run
>> CoreQuidemTest and constructing the classpath manually was tedious. Did I
>> miss something?
>>
>> I'm still working on this but I'm out today and Monday, it will probably
>> be Wednesday before I make any more progress.
>>
>> Andrew
>>
>> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> Yes This aligned also with my debugging. In My Kenn's reply you can see
>>> a sql test which I wrote in Calcite. Somehow Calcite does not have this
>>> issue with the 1.28 version.
>>>
>>> !use post
>>> !set outputformat mysql
>>>
>>> #Test aliases with with clause
>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>>> "hr"."emps"."name" as v from "hr"."emps")
>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>>> tempTable.v <> '11' ;
>>> +-+---+
>>> | ID  | value |
>>> +-+---+
>>> | 100 | Bill  |
>>> | 110 | Theodore  |
>>> | 150 | Sebastian |
>>> | 200 | Eric  |
>>> +-+---+
>>> (4 rows)
>>>
>>> !ok
>>>
>>>
>>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud 
>>> wrote:
>>>
>>>> Yes, that worked.
>>>>
>>>> The issue does not occur if I disable all of the following planner
>>>> rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
>>>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
>>>> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>>>
>>>> All the rules share a common call to RexProgramBuilder.mergePrograms,
>>>> so I suspect the problem lies there. I spent some time looking but wasn't
>>>> able to find it by code inspection, it looks like this code path is doing
>>>> the right thing with names. I'll spend some time tomorrow trying to
>>>> reproduce this on pure Calcite.
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> Thanks for writing a test for this use case. Without Where clause it
>>>>> works as expected on our test cases also too. Please add where clause on
>>>>> second select. With the below query it does not return column names. I
>>>>> tested on my local also.
>>>>>
>>>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>>>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>>>>> id > 1
>>>>>
>>>>> Thanks
>>>>>
>>>&

Re: Dataflow and mounting large data sets

2023-01-31 Thread Andrew Pilloud via user
I would guess that you have some existing code that expects random IO
access to the files via the Java IO or NIO interface (the common blocking
IO in a DoFn pattern), so using a Beam IO which is what we recommend and
are discussing here would be a significant rewrite?

I worked on Isilon from 6.5 - 7.2 and in those days NFS and SMB were the
high performance options, however these are complex protocols and userspace
implementations tend to be missing the features that enable that
performance. If you do go down the path of a userspace IO (either Beam or
Java NIO wrapper) you'd possibly get better results with FTP, SFTP, or
HTTP. It looks like Isilon added the ability to expose the filesystem via
the S3 protocol in 9.0

and there
is a Beam S3 connector

but I
would be amazed if you were on Isilon 9.0+.

On Tue, Jan 31, 2023 at 3:56 PM Luke Cwik via user 
wrote:

> I would also suggest looking at NFS client implementations in Java that
> would allow you to talk to the NFS server without needing to mount it
> within the OS. A quick search yielded
> https://github.com/raisercostin/yanfs or
> https://github.com/EMCECS/nfs-client-java
>
> On Tue, Jan 31, 2023 at 3:31 PM Chad Dombrova  wrote:
>
>> Thanks for the info.  We are going to test this further and we'll let you
>> know how it goes.
>>
>> -chad
>>
>>
>> On Mon, Jan 30, 2023 at 2:14 PM Valentyn Tymofieiev 
>> wrote:
>>
>>> It applies to custom containers as well. You can find the container
>>> manifest in the GCE VM metadata, and it should have an entry for privileged
>>> mode. The reason for this was to enable GPU accelerator support, but agree
>>> with Robert that it is not part of any contracts, so in theory this could
>>> change or perhaps be more strictly limited to accelerator support. In fact,
>>> originally, this was only enabled for pipelines using accelerators but for
>>> purely internal implementation details I believe it is currently enabled
>>> for all pipelines.
>>>
>>> So for prototyping purposes I think you could try it, but I can't make
>>> any guarantees in this thread that privileged mode will continue to work.
>>>
>>> cc: @Aaron Li  FYI
>>>
>>>
>>> On Mon, Jan 30, 2023 at 12:16 PM Robert Bradshaw 
>>> wrote:
>>>
 I'm also not sure it's part of the contract that the containerization
 technology we use will always have these capabilities.

 On Mon, Jan 30, 2023 at 10:53 AM Chad Dombrova 
 wrote:
 >
 > Hi Valentyn,
 >
 >>
 >> Beam SDK docker containers on Dataflow VMs are currently launched in
 privileged mode.
 >
 >
 > Does this only apply to stock sdk containers?  I'm asking because we
 use a custom sdk container that we build.  We've tried various ways of
 running mount from within our custom beam container in Dataflow and we
 could not get it to work, while the same thing succeeds in local tests and
 in our CI (gitlab).  The assessment at the time (this was maybe a year ago)
 was that the container was not running in privileged mode, but if you think
 that's incorrect we can revisit this and report back with some error logs.
 >
 > -chad
 >

>>>


Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Andrew Pilloud via user
Also this is at very least a Beam bug. You can file a Beam issue if you
want, otherwise I will when I get back.

Andrew

On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud  wrote:

> Hi Talat,
>
> I did get your test case running and added some logging to
> RexProgramBuilder.mergePrograms. There is only one merge that occurs during
> the test and it has an output type of RecordType(JavaType(int) ID,
> JavaType(class java.lang.String) V). This does seem like the correct output
> name but it doesn't match the final output name, so something is still
> different than the Beam test case. I also modified mergePrograms to
> purposely corrupt the output names, that did not cause the test to fail or
> trip the 'assert mergedProg.getOutputRowType() ==
> topProgram.getOutputRowType();' in mergePrograms. I could not find any
> Calcite unit tests for RexProgramBuilder.mergePrograms or
> CoreRules.CALC_MERGE rule so I think it is still probable that the problem
> is in this area.
>
> One minor issue I encountered. It took me a while to get your test case
> running, it doesn't appear there are any calcite gradle rules to run
> CoreQuidemTest and constructing the classpath manually was tedious. Did I
> miss something?
>
> I'm still working on this but I'm out today and Monday, it will probably
> be Wednesday before I make any more progress.
>
> Andrew
>
> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Hi Andrew,
>>
>> Yes This aligned also with my debugging. In My Kenn's reply you can see a
>> sql test which I wrote in Calcite. Somehow Calcite does not have this issue
>> with the 1.28 version.
>>
>> !use post
>> !set outputformat mysql
>>
>> #Test aliases with with clause
>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>> "hr"."emps"."name" as v from "hr"."emps")
>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>> tempTable.v <> '11' ;
>> +-+---+
>> | ID  | value |
>> +-+---+
>> | 100 | Bill  |
>> | 110 | Theodore  |
>> | 150 | Sebastian |
>> | 200 | Eric  |
>> +-+---+
>> (4 rows)
>>
>> !ok
>>
>>
>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud 
>> wrote:
>>
>>> Yes, that worked.
>>>
>>> The issue does not occur if I disable all of the following planner
>>> rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
>>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
>>> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>>
>>> All the rules share a common call to RexProgramBuilder.mergePrograms, so
>>> I suspect the problem lies there. I spent some time looking but wasn't able
>>> to find it by code inspection, it looks like this code path is doing the
>>> right thing with names. I'll spend some time tomorrow trying to reproduce
>>> this on pure Calcite.
>>>
>>> Andrew
>>>
>>>
>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi Andrew,
>>>>
>>>> Thanks for writing a test for this use case. Without Where clause it
>>>> works as expected on our test cases also too. Please add where clause on
>>>> second select. With the below query it does not return column names. I
>>>> tested on my local also.
>>>>
>>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>>>> id > 1
>>>>
>>>> Thanks
>>>>
>>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>>>> wrote:
>>>>
>>>>> +d...@beam.apache.org 
>>>>>
>>>>> I tried reproducing this but was not successful, the output schema was
>>>>> as expected. I added the following to BeamSqlMultipleSchemasTest.java at
>>>>> head. (I did discover that  
>>>>> PAssert.that(result).containsInAnyOrder(output)
>>>>> doesn't validate column names however.)
>>>>>
>>>>>   @Test
>>>>>   public void testSelectAs() {
>>>>> PCollection input = pipeline.apply(create(row(1, "strstr")));
>>>>>
>>>>> PCollection result =
>>>>> input.apply

Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Andrew Pilloud via user
Hi Talat,

I did get your test case running and added some logging to
RexProgramBuilder.mergePrograms. There is only one merge that occurs during
the test and it has an output type of RecordType(JavaType(int) ID,
JavaType(class java.lang.String) V). This does seem like the correct output
name but it doesn't match the final output name, so something is still
different than the Beam test case. I also modified mergePrograms to
purposely corrupt the output names, that did not cause the test to fail or
trip the 'assert mergedProg.getOutputRowType() ==
topProgram.getOutputRowType();' in mergePrograms. I could not find any
Calcite unit tests for RexProgramBuilder.mergePrograms or
CoreRules.CALC_MERGE rule so I think it is still probable that the problem
is in this area.

One minor issue I encountered. It took me a while to get your test case
running, it doesn't appear there are any calcite gradle rules to run
CoreQuidemTest and constructing the classpath manually was tedious. Did I
miss something?

I'm still working on this but I'm out today and Monday, it will probably be
Wednesday before I make any more progress.

Andrew

On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer 
wrote:

> Hi Andrew,
>
> Yes This aligned also with my debugging. In My Kenn's reply you can see a
> sql test which I wrote in Calcite. Somehow Calcite does not have this issue
> with the 1.28 version.
>
> !use post
> !set outputformat mysql
>
> #Test aliases with with clause
> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
> "hr"."emps"."name" as v from "hr"."emps")
> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
> tempTable.v <> '11' ;
> +-+---+
> | ID  | value |
> +-+---+
> | 100 | Bill      |
> | 110 | Theodore  |
> | 150 | Sebastian |
> | 200 | Eric  |
> +-+---+
> (4 rows)
>
> !ok
>
>
> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud 
> wrote:
>
>> Yes, that worked.
>>
>> The issue does not occur if I disable all of the following planner rules:
>> CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
>> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>
>> All the rules share a common call to RexProgramBuilder.mergePrograms, so
>> I suspect the problem lies there. I spent some time looking but wasn't able
>> to find it by code inspection, it looks like this code path is doing the
>> right thing with names. I'll spend some time tomorrow trying to reproduce
>> this on pure Calcite.
>>
>> Andrew
>>
>>
>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> Thanks for writing a test for this use case. Without Where clause it
>>> works as expected on our test cases also too. Please add where clause on
>>> second select. With the below query it does not return column names. I
>>> tested on my local also.
>>>
>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>>> id > 1
>>>
>>> Thanks
>>>
>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>>> wrote:
>>>
>>>> +d...@beam.apache.org 
>>>>
>>>> I tried reproducing this but was not successful, the output schema was
>>>> as expected. I added the following to BeamSqlMultipleSchemasTest.java at
>>>> head. (I did discover that  PAssert.that(result).containsInAnyOrder(output)
>>>> doesn't validate column names however.)
>>>>
>>>>   @Test
>>>>   public void testSelectAs() {
>>>> PCollection input = pipeline.apply(create(row(1, "strstr")));
>>>>
>>>> PCollection result =
>>>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS
>>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int,
>>>> v AS fout_string FROM tempTable"));
>>>>
>>>> Schema output_schema =
>>>>
>>>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
>>>> assertThat(result.getSchema(), equalTo(output_schema));
>>>>
>>>> Row output = Row.withSchema(output_schema).addValues(1,
>>>> "strstr").build();
>>>> PAssert.that(result).containsInAnyOrder(output

Re: Beam SQL Alias issue while using With Clause

2023-01-25 Thread Andrew Pilloud via user
Yes, that worked.

The issue does not occur if I disable all of the following planner rules:
CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).

All the rules share a common call to RexProgramBuilder.mergePrograms, so I
suspect the problem lies there. I spent some time looking but wasn't able
to find it by code inspection, it looks like this code path is doing the
right thing with names. I'll spend some time tomorrow trying to reproduce
this on pure Calcite.

Andrew


On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer 
wrote:

> Hi Andrew,
>
> Thanks for writing a test for this use case. Without Where clause it works
> as expected on our test cases also too. Please add where clause on second
> select. With the below query it does not return column names. I tested on
> my local also.
>
> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
> id > 1
>
> Thanks
>
> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
> wrote:
>
>> +d...@beam.apache.org 
>>
>> I tried reproducing this but was not successful, the output schema was as
>> expected. I added the following to BeamSqlMultipleSchemasTest.java at head.
>> (I did discover that  PAssert.that(result).containsInAnyOrder(output)
>> doesn't validate column names however.)
>>
>>   @Test
>>   public void testSelectAs() {
>> PCollection input = pipeline.apply(create(row(1, "strstr")));
>>
>> PCollection result =
>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS (SELECT
>> f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int, v AS
>> fout_string FROM tempTable"));
>>
>> Schema output_schema =
>>
>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
>> assertThat(result.getSchema(), equalTo(output_schema));
>>
>> Row output = Row.withSchema(output_schema).addValues(1,
>> "strstr").build();
>> PAssert.that(result).containsInAnyOrder(output);
>> pipeline.run();
>>   }
>>
>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi Kenn,
>>>
>>> Thank you for replying back to my email.
>>>
>>> I was under the same impression about Calcite. But I wrote a test on
>>> Calcite 1.28 too. It is working without issue that I see on BEAM
>>>
>>> Here is my test case. If you want you can also run on Calcite. Please
>>> put under core/src/test/resources/sql as text file. and Run CoreQuidemTest
>>> class.
>>>
>>> !use post
>>> !set outputformat mysql
>>>
>>> #Test aliases with with clause
>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>>> "hr"."emps"."name" as v from "hr"."emps")
>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>>> tempTable.v <> '11' ;
>>> +-+---+
>>> | ID  | value |
>>> +-+---+
>>> | 100 | Bill  |
>>> | 110 | Theodore  |
>>> | 150 | Sebastian |
>>> | 200 | Eric  |
>>> +-+---+
>>> (4 rows)
>>>
>>> !ok
>>>
>>>
>>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles 
>>> wrote:
>>>
>>>> Looking at the code that turns a logical CalcRel into a BeamCalcRel I
>>>> do not see any obvious cause for this:
>>>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_b3aa2e89489898f8c760294ba4dba2310ac53e70_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rule_BeamCalcRule.java-23L69=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=KXc2qSceL6qFbFnQ_2qUOHr9mKuc6zYY8rJTNZC8p_wTcNs4M6mHQoCuoc4JfeaA=KjzplEf29oFB6uivvdjixpQiArWtfV-1SXpALL-ugEM=>
>>>>
>>>> I don't like to guess that upstream libraries have the bug, but in this
>>>> case I wonder if the alias is lost in the Calcite optimizer rule for
>>>> merging the projects and filters into a Calc.
>>>>
>>&

Re: Beam SQL Alias issue while using With Clause

2023-01-24 Thread Andrew Pilloud via user
+d...@beam.apache.org 

I tried reproducing this but was not successful, the output schema was as
expected. I added the following to BeamSqlMultipleSchemasTest.java at head.
(I did discover that  PAssert.that(result).containsInAnyOrder(output)
doesn't validate column names however.)

  @Test
  public void testSelectAs() {
PCollection input = pipeline.apply(create(row(1, "strstr")));

PCollection result =
input.apply(SqlTransform.query("WITH tempTable (id, v) AS (SELECT
f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int, v AS
fout_string FROM tempTable"));

Schema output_schema =

Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
assertThat(result.getSchema(), equalTo(output_schema));

Row output = Row.withSchema(output_schema).addValues(1,
"strstr").build();
PAssert.that(result).containsInAnyOrder(output);
pipeline.run();
  }

On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer 
wrote:

> Hi Kenn,
>
> Thank you for replying back to my email.
>
> I was under the same impression about Calcite. But I wrote a test on
> Calcite 1.28 too. It is working without issue that I see on BEAM
>
> Here is my test case. If you want you can also run on Calcite. Please put
> under core/src/test/resources/sql as text file. and Run CoreQuidemTest
> class.
>
> !use post
> !set outputformat mysql
>
> #Test aliases with with clause
> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
> "hr"."emps"."name" as v from "hr"."emps")
> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
> tempTable.v <> '11' ;
> +-+---+
> | ID  | value |
> +-+---+
> | 100 | Bill  |
> | 110 | Theodore  |
> | 150 | Sebastian |
> | 200 | Eric  |
> +-+---+
> (4 rows)
>
> !ok
>
>
> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles  wrote:
>
>> Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
>> not see any obvious cause for this:
>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_b3aa2e89489898f8c760294ba4dba2310ac53e70_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rule_BeamCalcRule.java-23L69=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=KXc2qSceL6qFbFnQ_2qUOHr9mKuc6zYY8rJTNZC8p_wTcNs4M6mHQoCuoc4JfeaA=KjzplEf29oFB6uivvdjixpQiArWtfV-1SXpALL-ugEM=>
>>
>> I don't like to guess that upstream libraries have the bug, but in this
>> case I wonder if the alias is lost in the Calcite optimizer rule for
>> merging the projects and filters into a Calc.
>>
>> Kenn
>>
>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles  wrote:
>>
>>> I am not sure I understand the question, but I do see an issue.
>>>
>>> Context: "CalcRel" is an optimized relational operation that is somewhat
>>> like ParDo, with a small snippet of a single-assignment DSL embedded in it.
>>> Calcite will choose to merge all the projects and filters into the node,
>>> and then generates Java bytecode to directly execute the DSL.
>>>
>>> Problem: it looks like the CalcRel has output columns with aliases "id"
>>> and "v" where it should have output columns with aliases "id" and "value".
>>>
>>> Kenn
>>>
>>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:
>>>
>>>> Adding: @Andrew Pilloud  @Kenneth Knowles
>>>> 
>>>>
>>>> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am using Beam 2.43 with Calcite SQL with Java.
>>>>>
>>>>> I have a query with a WITH clause and some aliasing. Looks like Beam
>>>>> Query optimizer after optimizing my query, it drops Select statement's
>>>>> aliases. Can you help me to identify where the problem is ?
>>>>>
>>>>> This is my query
>>>>> INFO: SQL:
>>>>> WITH `tempTable` (`id`, `v`) AS (SELECT
>>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
>>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>

Re: Dataflow SQL streaming extensions

2022-08-09 Thread Andrew Pilloud via user
Hi Marcin,

I'm having a little trouble understanding this. I think this is a
summary of your problem statement: You have a pipeline that windows
data on event time. Your event generator has an artificial 30 second
delay. The pipeline appears to be experiencing a 10-20 second delay
instead of the expected 30 second delay so you think it may be using
processing time instead of event time. You want some help
investigating the issue.

Is it possible that your clocks are not synchronised as well as you
think they are? The 30 second delay is somewhat small, does the issue
persist if you up it to an hour?

This list isn't going to be much help in debugging Dataflow SQL
issues, you should contact GCP support for that, but we can help with
Beam SQL (which it is based on). Most Beam SQL pipelines only support
using an older syntax where the windows are in a GROUP BY clause. I
believe the GROUP BY format is supported by Dataflow SQL, can you try
that? Documentation is here:
https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/

Andrew


On Fri, Aug 5, 2022 at 8:15 AM Marcin Kuthan  wrote:
>
> Hi
>
>
>
> I'm experimenting with Dataflow SQL streaming extension and I observed that 
> the event_timestamp field in the payload is ignored.
>
>
>
> I would like to calculate the average value of the values reported by the 
> sensor every 5 seconds.
>
>
>
> SELECT CURRENT_TIMESTAMP() AS created_at, * FROM
>
> (SELECT
>
> s1.window_start AS window_start,
>
> s1.window_end AS window_end,
>
> MIN(event_timestamp) AS event_timestamp_min,
>
> MAX(event_timestamp) AS event_timestamp_max,
>
> AVG(s1.sensor_value) AS sensor_value_avg,
>
> FROM TUMBLE(
>
> (SELECT * FROM 
> pubsub.topic.`sc-9366-nga-dev`.`marcin-atm22-signal-1`),
>
> DESCRIPTOR(event_timestamp),
>
> "INTERVAL 5 SECOND"
>
> ) as s1
>
> GROUP BY window_start, window_end)
>
>
>
> For testing purposes sensor data is artificially generated, and 
> event_timestamp is always 30 seconds behind current time.
>
>
>
> current timestamp: 2022-08-05T15:00:24+00:00
>
> {'event_timestamp': '2022-08-05T14:59:54+00:00', 'sensor_value': 
> 0.4083962116009032}
>
>
>
> But I get the following result at 15:00:28 (the latest row stored in BQ) :
>
> [{
>
>   "created_at": "2022-08-05T15:00:20.170Z",
>
>   "window_start": "2022-08-05T15:00:05Z",
>
>   "window_end": "2022-08-05T15:00:10Z",
>
>   "event_timestamp_min": "2022-08-05T15:00:05.019Z",
>
>   "event_timestamp_max": "2022-08-05T15:00:09.035Z",
>
>   "sensor_value_avg": "0.1612730883"
>
> }]
>
>
>
> Why is there a record created at 15:00:20 with a window 15:00:05-15:00:10 if 
> the input event_time is always delayed by 30 seconds? At 15:00:20 the latest 
> emitted sensor event_timestamp is ~ 14:59:50.
>
>
>
> Moreover the watermark lag reported by dataflow is always 10-20 seconds, even 
> if the event_timestamp reported by the sensor is far behind the wallclock.
>
>
> Any ideas?
>
>
> Regards,
>
> Marcin
>
>


[ANNOUNCE] Beam 2.31.0 Released

2021-07-09 Thread Andrew Pilloud
The Apache Beam team is pleased to announce the release of version 2.31.0.

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing.
See https://beam.apache.org

You can download the release here:
https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed
on the Beam blog: https://beam.apache.org/blog/beam-2.31.0/

Thank you to everyone who contributed to this release, and we hope you
enjoy using Beam 2.31.0

-Andrew, on behalf of the Apache Beam community.


Re: A problem with calcite sql

2021-05-11 Thread Andrew Pilloud
If the type was just a nested row this should would work:
SELECT `market_transactionManagement_transactionManagers`.`email` FROM
PCOLLECTION
or this:
SELECT market_transactionManagement_transactionManagers.email FROM
PCOLLECTION

If you have exactly one element in the array something like this should
work:
SELECT market_transactionManagement_transactionManagers[1].email FROM
PCOLLECTION

If you want to extract the array, try something like this:
SELECT manager.email FROM
UNNEST(PCOLLECTION.market_transactionManagement_transactionManagers) AS
manager

On Tue, May 11, 2021 at 10:22 PM Tao Li  wrote:

> Thanks Andrew. With `id` syntax I am not seeing “Unhandled logical type
> SqlCharType” error any more. This is great progress!
>
>
>
> However I am still seeing an issue by querying a composite field. Below is
> the schema of the array type field:
>
>
>
> Field{name=market_transactionManagement_transactionManagers, description=,
> type=ARRAY>, options={{}}}
>
>
>
> My sql query is selecting a nested field: SELECT
> `market_transactionManagement_transactionManagers.email` FROM PCOLLECTION
>
>
>
> Error:
>
>
>
> Caused by:
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorException:
> Column 'market_transactionManagement_transactionManagers.email' not found
> in any table
>
>
>
> So what would be the right syntax? Thanks!
>
>
>
> *From: *Andrew Pilloud 
> *Date: *Tuesday, May 11, 2021 at 11:51 AM
> *To: *Tao Li 
> *Cc: *"user@beam.apache.org" , Yuan Feng <
> yua...@zillowgroup.com>
> *Subject: *Re: A problem with calcite sql
>
>
>
> SELECT CAST('CAST(id AS VARCHAR)' AS VARCHAR) FROM PCOLLECTION works for
> me, but I don't think that is what you wanted. Note that ' is for string
> literals and ` is for escaping names in Beam SQL's default dialect config.
>
>
>
> Try:
>
> SELECT `id` FROM PCOLLECTION
>
>
>
> On Tue, May 11, 2021 at 10:58 AM Tao Li  wrote:
>
> @Andrew Pilloud  thanks for your suggestions. I
> tried CAST and TRIM but it did not work:
>
>
>
> Sql Stmt I am using: SELECT 'CAST(id AS VARCHAR)' FROM PCOLLECTION
>
>
>
> Logs:
>
>
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> SQL:
>
> SELECT 'CAST(id AS VARCHAR)'
>
> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> SQLPlan>
>
> LogicalProject(EXPR$0=['CAST(id AS VARCHAR)'])
>
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> BEAMPlan>
>
> BeamCalcRel(expr#0..44=[{inputs}], expr#45=['CAST(id AS VARCHAR)'],
> EXPR$0=[$t45])
>
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
>
> Exception in thread "main" java.lang.RuntimeException: Unhandled logical
> type SqlCharType
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:911)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:341)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:348)
>
>
>
> *From: *Andrew Pilloud 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, May 10, 2021 at 7:46 PM
> *To: *user 
> *Cc: *Yuan Feng 
> *Subject: *Re: A problem with calcite sql
>
>
>
> For the first one you have https://issues.apache.org/jira/browse/BEAM-5251
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-5251=04%7C01%7Ctaol%40zillow.com%7C2c8c6047a05842fde53008d914adce2d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637563559001417793%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=C4bGHyhOjB%2Bhh2HPUAAhOKICQfTisBJ1FrE4AWah1QQ%3D=0>
>
> For the second, I opened a new issue for you:
> https://issues.apache.org/jira/browse/BEAM-12323
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12323=04%7C01%7Ctaol%40zillow.com%7C2c8c6047a05842fde53008d914adce2d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637563559001427748%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=QIRqCuy9IBCEjPWWcwmIW1TR%2BB5LjrJAMTK3VzP%2F93s%3D=0>
>
>
>
> Your second issue is because our Avro conversion library doesn't know how
> to handle fixed length strings. These normally show up in SQL when you are
> outputting 

Re: A problem with calcite sql

2021-05-11 Thread Andrew Pilloud
SELECT CAST('CAST(id AS VARCHAR)' AS VARCHAR) FROM PCOLLECTION works for
me, but I don't think that is what you wanted. Note that ' is for string
literals and ` is for escaping names in Beam SQL's default dialect config.

Try:

SELECT `id` FROM PCOLLECTION

On Tue, May 11, 2021 at 10:58 AM Tao Li  wrote:

> @Andrew Pilloud  thanks for your suggestions. I
> tried CAST and TRIM but it did not work:
>
>
>
> Sql Stmt I am using: SELECT 'CAST(id AS VARCHAR)' FROM PCOLLECTION
>
>
>
> Logs:
>
>
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> SQL:
>
> SELECT 'CAST(id AS VARCHAR)'
>
> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> SQLPlan>
>
> LogicalProject(EXPR$0=['CAST(id AS VARCHAR)'])
>
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> BEAMPlan>
>
> BeamCalcRel(expr#0..44=[{inputs}], expr#45=['CAST(id AS VARCHAR)'],
> EXPR$0=[$t45])
>
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
>
> Exception in thread "main" java.lang.RuntimeException: Unhandled logical
> type SqlCharType
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:911)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:341)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:348)
>
>
>
> *From: *Andrew Pilloud 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, May 10, 2021 at 7:46 PM
> *To: *user 
> *Cc: *Yuan Feng 
> *Subject: *Re: A problem with calcite sql
>
>
>
> For the first one you have https://issues.apache.org/jira/browse/BEAM-5251
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-5251=04%7C01%7Ctaol%40zillow.com%7Cd0d024cf363d4a0df9a708d914270a8b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637562980181312070%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=QGT5TNbR17Bb2T2MjBVxD7mfxzIe4XG9%2Bfgr%2BjBvRzY%3D=0>
>
> For the second, I opened a new issue for you:
> https://issues.apache.org/jira/browse/BEAM-12323
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12323=04%7C01%7Ctaol%40zillow.com%7Cd0d024cf363d4a0df9a708d914270a8b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637562980181322031%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=BZ1y7IQcb5Hw93b%2BcrZpe6G%2BDw3E0KcPUDuxjWuV%2BDw%3D=0>
>
>
>
> Your second issue is because our Avro conversion library doesn't know how
> to handle fixed length strings. These normally show up in SQL when you are
> outputting a constant. I'm not sure exactly how to work around it, if you
> can get the output type to be a VARCHAR (instead of CHAR) this problem will
> go away. You might be able to do something like 'CAST("Your String Literal"
> AS VARCHAR)' , 'TRIM("Your String Literal")' or ' "Your String Literal" ||
> "" '.
>
>
>
> On Mon, May 10, 2021 at 7:25 PM Tao Li  wrote:
>
> Sorry to bug with another question. I was saving a data set with below
> schema (this dataset comes from sql query). Saw the SqlCharType issue.
> Did anyone see this issue before?
>
>
>
> [main] INFO com.zillow.pipeler.core.transform.DatasetFlattenerCore -
> Fields:
>
> Field{name=id, description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Field{name=user_tmp, description=, type=LOGICAL_TYPE NOT NULL,
> options={{}}}
>
> Field{name=market_name, description=, type=LOGICAL_TYPE NOT NULL,
> options={{}}}
>
> Field{name=market_transactionManagement_transactionManagers_email,
> description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Field{name=market_transactionManagement_transactionManagers_name,
> description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Field{name=market_transactionManagement_transactionProfileId,
> description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Options:{{}}
>
> Exception in thread "main" java.lang.RuntimeException: Unhandled logical
> type SqlCharType
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:911)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
>
> at
> org.apach

Re: A problem with calcite sql

2021-05-10 Thread Andrew Pilloud
For the first one you have https://issues.apache.org/jira/browse/BEAM-5251
For the second, I opened a new issue for you:
https://issues.apache.org/jira/browse/BEAM-12323

Your second issue is because our Avro conversion library doesn't know how
to handle fixed length strings. These normally show up in SQL when you are
outputting a constant. I'm not sure exactly how to work around it, if you
can get the output type to be a VARCHAR (instead of CHAR) this problem will
go away. You might be able to do something like 'CAST("Your String Literal"
AS VARCHAR)' , 'TRIM("Your String Literal")' or ' "Your String Literal" ||
"" '.

On Mon, May 10, 2021 at 7:25 PM Tao Li  wrote:

> Sorry to bug with another question. I was saving a data set with below
> schema (this dataset comes from sql query). Saw the SqlCharType issue.
> Did anyone see this issue before?
>
>
>
> [main] INFO com.zillow.pipeler.core.transform.DatasetFlattenerCore -
> Fields:
>
> Field{name=id, description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Field{name=user_tmp, description=, type=LOGICAL_TYPE NOT NULL,
> options={{}}}
>
> Field{name=market_name, description=, type=LOGICAL_TYPE NOT NULL,
> options={{}}}
>
> Field{name=market_transactionManagement_transactionManagers_email,
> description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Field{name=market_transactionManagement_transactionManagers_name,
> description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Field{name=market_transactionManagement_transactionProfileId,
> description=, type=LOGICAL_TYPE NOT NULL, options={{}}}
>
> Options:{{}}
>
> Exception in thread "main" java.lang.RuntimeException: Unhandled logical
> type SqlCharType
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:911)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:341)
>
> at
> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:348)
>
>
>
>
>
> *From: *Tao Li 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, May 10, 2021 at 7:19 PM
> *To: *"user@beam.apache.org" 
> *Cc: *Yuan Feng 
> *Subject: *Re: A problem with calcite sql
>
>
>
> Never mind. Looks like “user” is a reserved name.
>
>
>
> *From: *Tao Li 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, May 10, 2021 at 7:10 PM
> *To: *"user@beam.apache.org" 
> *Cc: *Yuan Feng 
> *Subject: *A problem with calcite sql
>
>
>
> Hi Beam community,
>
>
>
> I am seeing a weird issue by using calcite sql. I don’t understand why
> it’s complaining my query is not valid. Once I removed “user AS user”, it
> worked fine. Please advise. Thanks.
>
>
>
> Exception in thread "main"
> org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse
> query SELECT id AS id, user AS user, market_name AS market_name,
> market_transactionManagement_transactionManagers.email AS
> market_transactionManagement_transactionManagers_email,
> market_transactionManagement_transactionManagers.name AS
> market_transactionManagement_transactionManagers_name,
> market_transactionManagement_transactionProfileId AS
> market_transactionManagement_transactionProfileId FROM PCOLLECTION
>
> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:214)
>
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:111)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
>
> at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
>
> at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
>
> at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>
> at
> com.zillow.pipeler.core.transform.DatasetFlattenerCore.updateSchemaBasedOnAvroSchema(DatasetFlattenerCore.java:85)
>
> at
> com.zillow.pipeler.core.transform.DatasetFlattenerCore.execute(DatasetFlattenerCore.java:61)
>
> at
> com.zillow.pipeler.core.transform.DatasetFlattenerCore.execute(DatasetFlattenerCore.java:29)
>
> at
> com.zillow.pipeler.orchestrator.BaseOrchestrator.run(BaseOrchestrator.java:61)
>
> at
> com.zillow.pipeler.orchestrator.transform.DatasetFlattenerOrchestrator.main(DatasetFlattenerOrchestrator.java:71)
>
> Caused by:
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParseException:
> Encountered "AS user" at line 1, column 23.
>
> Was expecting one of:
>
> 
>
> "ORDER" ...
>
> "LIMIT" ...
>


Re: How Beam SQL Side Input refresh/update

2021-05-07 Thread Andrew Pilloud
We call this a Slowly Changing Dimensions join, there was a previous effort

to add this to Beam that is partially implemented in Java
. Unfortunately we haven't
finished the work on the SQL side, I haven't looked into what is involved
to finish it. It might be possible to do this just using PeriodicImpulse to
refresh your static dataset, but it might also require changes to sql in
BeamSideInputJoinRel

.

You could also take a look at this stack overflow post for ideas:
https://stackoverflow.com/questions/41570276/joining-a-stream-against-a-table-in-dataflow

Andrew

On Fri, May 7, 2021 at 9:45 AM Talat Uyarer 
wrote:

> Hi,
>
> Based on Join documentation. If I have a Join with Unbounded and Bounded
>
>> For this type of JOIN bounded input is treated as a side-input by the
>> implementation. This means that window/trigger is inherented from upstreams.
>
>
> On my pipeline I dont have any triggering or window. I use a global window
> on the Unbounded side. Basically I read from kafka  data and I want to join
> with static data to enrich the kafka message. Not very frequently I want to
> update my static data. I am trying to understand How i can update when I
> update my static data.
>
> Thanks
>


Re: Query regarding support for ROLLUP

2021-05-06 Thread Andrew Pilloud
I'm not familiar with the semantics of ROLLUP but the results look like
this query, which might work?
select warehouse, SUM(quantity) as quantity from PCOLLECTION group by
warehouse
UNION select "Warehouse_Total", SUM(quantity) as quantity from PCOLLECTION

On Thu, May 6, 2021 at 7:07 AM D, Anup (Nokia - IN/Bangalore) <
anu...@nokia.com> wrote:

> Thank you Brian, Andrew for your response.
>
> Do you see any alternatives currently in Beam SQL that could be used to
> achieve this ?
>
>
>
> *From:* Andrew Pilloud 
> *Sent:* Wednesday, May 5, 2021 10:39 PM
> *To:* Brian Hulette 
> *Cc:* user 
> *Subject:* Re: Query regarding support for ROLLUP
>
>
>
> I can confirm we don't have anything in Beam to support ROLLUP, it is
> silently dropped. I opened
> https://issues.apache.org/jira/browse/BEAM-12288, support needs to be
> implemented
> in 
> sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
>  For now I'll remove it from our docs and update the planner to reject the
> query (https://issues.apache.org/jira/browse/BEAM-12289).
>
>
>
> On Wed, May 5, 2021 at 8:16 AM Brian Hulette  wrote:
>
> +Andrew Pilloud  do you know if this is a bug?
>
>
>
> On Tue, May 4, 2021 at 7:38 AM D, Anup (Nokia - IN/Bangalore) <
> anu...@nokia.com> wrote:
>
> Hi All,
>
>
>
> I was trying to use “GROUP BY WITH ROLLUP” (2.29.0 version) which I saw
> here -
> https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/#
> <https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/>
>
>
>
> "select warehouse, SUM(quantity) as quantity from PCOLLECTION group by
> ROLLUP(warehouse)"));
>
>
>
> Warehouse | quantity
>
> -
>
> Melbourne |  100
>
> New York|  200
>
> New York| 200
>
>
>
> Output below seems to ignore ROLLUP.
>
>
>
> Warehouse | quantity
>
> ---
>
> Melbourne | 100
>
> New York| 400
>
>
>
> *Warehouse_Total | 500 => not generated*
>
>
>
> Could you please confirm if this is supported or I am missing something.
>
> I tried to search JIRA/documentation to get some pointers but could not
> find.
>
>
>
> Thanks
>
> Anup
>
>


Re: Query regarding support for ROLLUP

2021-05-05 Thread Andrew Pilloud
I can confirm we don't have anything in Beam to support ROLLUP, it is
silently dropped. I opened https://issues.apache.org/jira/browse/BEAM-12288,
support needs to be implemented
in 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 For now I'll remove it from our docs and update the planner to reject the
query (https://issues.apache.org/jira/browse/BEAM-12289).

On Wed, May 5, 2021 at 8:16 AM Brian Hulette  wrote:

> +Andrew Pilloud  do you know if this is a bug?
>
> On Tue, May 4, 2021 at 7:38 AM D, Anup (Nokia - IN/Bangalore) <
> anu...@nokia.com> wrote:
>
>> Hi All,
>>
>>
>>
>> I was trying to use “GROUP BY WITH ROLLUP” (2.29.0 version) which I saw
>> here -
>> https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/#
>>
>>
>>
>> "select warehouse, SUM(quantity) as quantity from PCOLLECTION group by
>> ROLLUP(warehouse)"));
>>
>>
>>
>> Warehouse | quantity
>>
>> -
>>
>> Melbourne |  100
>>
>> New York|  200
>>
>> New York| 200
>>
>>
>>
>> Output below seems to ignore ROLLUP.
>>
>>
>>
>> Warehouse | quantity
>>
>> ---
>>
>> Melbourne | 100
>>
>> New York| 400
>>
>>
>>
>> *Warehouse_Total | 500 => not generated*
>>
>>
>>
>> Could you please confirm if this is supported or I am missing something.
>>
>> I tried to search JIRA/documentation to get some pointers but could not
>> find.
>>
>>
>>
>> Thanks
>>
>> Anup
>>
>


Re: Select one field from an array of nested rows in beam SQL

2021-02-25 Thread Andrew Pilloud
There is some recent work to improve unnest
 that went into Beam 2.25.0+, it
might cover your use case. It looks like we have no support for the Collect
operator, which is your problem here. I verified 'SELECT ARRAY(SELECT f_int
FROM PCOLLECTION)' doesn't work and filed
https://issues.apache.org/jira/browse/BEAM-11872

For the UDF side of things, we haven't put much work into making nested
rows work well with UDFs. WrappedRow is intended to be an internal wrapper
for BeamCalcRel, we should probably be passing a schema Row, which gives
you access to fields by name. I filed a JIRA for this:
https://issues.apache.org/jira/browse/BEAM-11871

Andrew

On Wed, Feb 24, 2021 at 10:33 PM Zhiheng Huang 
wrote:

> Hi beam users,
>
> We have a use case where we have a schema such as:
>
> Schema.of(
> Field.of("array_of_nested_rows",
>  FieldType.array(FieldType.row(
>  Schema.of(Field.of("row_field1", FieldType.INT32),
> Field.of("otherScalarField", FieldType.STRING)
> )
>
> We would like to select "array_of_nested_rows.row_field1" as a list of
> ints together with "otherScalarField" as the output. For example, in
> BigQuery we can achieve this with:
>
> SELECT
>   otherScalarField,
>   ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
> FROM
>   table
>
> Trying this query with beam SQL yields:
>
> Unable to convert query select array(select score from
> UNNEST(Yt8mAnnotation)) from PCOLLECTION
> org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
> convert query select array(select score from UNNEST(Yt8mAnnotation)) from
> PCOLLECTION
> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181)
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
> ...
>
> Caused by:
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
> There are not enough rules to produce a node with desired properties:
> convention=BEAM_LOGICAL.
> Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL]
> There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part
> of the original plan is as follows
> 56:LogicalCorrelate(correlation=[$cor0], joinType=[inner],
> requiredColumns=[{0}])
>   8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam,
> PCOLLECTION]])
>   54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0])
> 52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2])
>   50:Uncollect(subset=[rel#51:Subset#3.NONE])
> ...
>
> We have also tried to define a UDF that takes in array_of_nested_rows.
> This doesn't work out either because the input param passed into the UDF
> eval function is a list of WrappedRow
> ,
> which doesn't allow us to query field value with its name. It only supports
> getting the field value given an index. This is useless for us since we do
> not know how to get the row field schema in the eval function.
>
>  Do you have any suggestions about how to achieve this? We are using beam
> 2.22.0
>
> Thanks a lot!
>


Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Andrew Pilloud
We could support EXPECT statements in proposal 2 as long as we restricted
it to known fields.

We are getting into implementation details now. Making unknown fields just
a normal column introduces a number of problems. ZetaSQL doesn't support
Map type. All our IOs would need to explicitly deal with that special
column. There would be a lack of consistency between the various types
(Avro, Proto, Json) which should all support this.

We might also want something even more invasive: everything is an unknown
field unless it is referenced in the SQL query. All of these options are
possible. I guess we need someone who has time to work on it to write a
proposal.

On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax  wrote:

> I'm not sure that we could support EXCEPT statements, as that would
> require introspecting the unknown fields (what if the EXCEPT statement
> matches a field that later is added as an unknown field?). IMO this sort of
> behavior only makes sense on true pass-through queries. Anything that
> modifies the input record would be tricky to support.
>
> Nested rows would work for proposal 2. You would need to make sure that
> the unknown-fields map is recursively added to all nested rows, and you
> would do this when you infer a schema from the avro schema.
>
> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud  wrote:
>
>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>> statements, which returns all columns except specific ones. Adding an
>> unknown field does seem like a reasonable way to handle this. It probably
>> needs to be something that is native to the Row type, so columns added to
>> nested rows also work.
>>
>> Andrew
>>
>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax  wrote:
>>
>>> There's a difference between a fully dynamic schema and simply being
>>> able to forward "unknown" fields to the output.
>>>
>>> A fully-dynamic schema is not really necessary unless we also had
>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>> the new fields by name, there's no reason to add them to the main schema.
>>>
>>> However, if you have a SELECT * FROM WHERE  statement that does no
>>> aggregation, there's fundamentally no reason we couldn't forward the
>>> messages exactly. In theory we could forward the exact bytes that are in
>>> the input PCollection, which would necessarily forward the new fields. In
>>> practice I believe that we convert the input messages to Beam Row objects
>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>> output those messages. I believe this is where we "lose" the unknown
>>> messages,but this is an implementation artifact - in theory we could output
>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>> schema, since you can't really do anything with these extra fields except
>>> forward them to your output.
>>>
>>> I see two possible ways to address this.
>>>
>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>> This might be very expensive though - we risk having to keep two copies of
>>> every message around, one in the original Avro format and one in Row format.
>>>
>>> 2. The other way would be to do what protocol buffers do. We could add
>>> one extra field to the inferred Beam schema to store new, unknown fields
>>> (probably this would be a map-valued field). This extra field would simply
>>> store the raw bytes of these unknown fields, and then when converting back
>>> to Avro they would be added to the output message. This might also add some
>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>
>>> Reuven
>>>
>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette 
>>> wrote:
>>>
>>>> Reuven, could you clarify what you have in mind? I know multiple times
>>>> we've discussed the possibility of adding update compatibility support to
>>>> SchemaCoder, including support for certain schema changes (field
>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>
>>>> But it sounds like Talat is asking for something a little beyond that,
>>>> effectively a dynamic schema. Is that something you think we can support?
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.or

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Andrew Pilloud
gt;> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>>>> PCOLLECTION
>>>>>>
>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to 
>>>>>> Beam
>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>> convert that generic record to Row.
>>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>>
>>>>>> In the current scenario When we have schema changes first we restart
>>>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>>>> turn on the latest schema for writers. Because of Avro's 
>>>>>> GrammerResolver[1]
>>>>>> we read different versions of the schema and we always produce the latest
>>>>>> schema's record. Without breaking our pipeline we are able to handle
>>>>>> multiple versions of data in the same streaming pipeline. If we can
>>>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>>>> code. That's why I am looking for some solution. We dont need multiple
>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>> schema on the fly.
>>>>>>
>>>>>> I hope I can explain it :)
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> [1]
>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc=>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>>> your SQL statement as well when the schema changes? If not, what are 
>>>>>>> those
>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>> statement clearly didn't reference those fields in a SELECT statement 
>>>>>>> since
>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>> are also changing the SQL statement?
>>>>>>>
>>>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>>>> those fields are included?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Hi Andrew,
>>>>>>>>
>>>>>>>> I assume SQL query is not going to change. Changing things is the
>>>>>>>> Row schema by adding new columns or rename columns. if we keep a 
>>>>>>>> version
>>>>>>>> information on somewhere for example a KV pair. Key is schema 
>>>>>>>> information,
>>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 
>>>>>>>> 15k
>>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which 
>>>>>>>> is
>>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you 
>>>>>>>> think
>>>>>>>> it is not still doable ?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe 
>>>>>>>>> our
>>>>>>>>> recommended practice is to run both the old and new pipeline for some 
>>>>>>>>> time,
>>>>>>>>> then pick a window boundary to transition the output from the old 
>>>>>>>>> pipeline
>>>>>>>>> to the new one.
>>>>>>>>>
>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to 
>>>>>>>>> serialize
>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't 
>>>>>>>>> handle
>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>
>>>>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>>>>> the structure of the pipeline graph. This would significantly limit 
>>>>>>>>> the
>>>>>>>>> changes you can make. It would also require some changes to SQL to 
>>>>>>>>> try to
>>>>>>>>> produce the same plan for an updated SQL query.
>>>>>>>>>
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>>>>> format. We generate our rows based on our Avro schema. Over time the 
>>>>>>>>>> schema
>>>>>>>>>> is changing. I believe Beam SQL generates Java code based on what we 
>>>>>>>>>> define
>>>>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea 
>>>>>>>>>> How can
>>>>>>>>>> we handle schema changes with resubmitting our beam job. Is it 
>>>>>>>>>> possible to
>>>>>>>>>> generate SQL java code on the fly ?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>


Re: About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Andrew Pilloud
Unfortunately we don't have a way to generate the SQL Java code on the fly,
even if we did, that wouldn't solve your problem. I believe our recommended
practice is to run both the old and new pipeline for some time, then pick a
window boundary to transition the output from the old pipeline to the new
one.

Beam doesn't handle changing the format of data sent between intermediate
steps in a running pipeline. Beam uses "coders" to serialize data between
steps of the pipeline. The builtin coders (including the Schema Row Coder
used by SQL) have a fixed data format and don't handle schema evolution.
They are optimized for performance at all costs.

If you worked around this, the Beam model doesn't support changing the
structure of the pipeline graph. This would significantly limit the changes
you can make. It would also require some changes to SQL to try to produce
the same plan for an updated SQL query.

Andrew

On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer 
wrote:

> Hi,
>
> We are using Beamsql on our pipeline. Our Data is written in Avro format.
> We generate our rows based on our Avro schema. Over time the schema is
> changing. I believe Beam SQL generates Java code based on what we define as
> BeamSchema while submitting the pipeline. Do you have any idea How can we
> handle schema changes with resubmitting our beam job. Is it possible to
> generate SQL java code on the fly ?
>
> Thanks
>


Re: Beam-SQL: How to create a UDF that returns a Map ?

2020-02-07 Thread Andrew Pilloud
Thanks for reporting and finding the root cause! Last I heard Calcite was
going to start a release shortly. We plan to update once the next version
is out.

Andrew

On Fri, Feb 7, 2020 at 4:38 AM Niels Basjes  wrote:

> Hi,
>
> I've done some serious debugging and traced the problem to what seems to
> be the root cause.
> The class
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.
> JavaToSqlTypeConversionRules
> does not have a mapping from java.util.Map to SqlTypeName.MAP.
>
> As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER which 
> breaks everything downstream.
>
> The nice thing is that this root cause seems to be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-3429
>
> https://github.com/apache/calcite/commit/ff44204dc2899e0c34e94f70c2e0c301170daca3
>
> Which has not yet been released ...
>
> I created https://issues.apache.org/jira/browse/BEAM-9267 to track this.
>
> Niels Basjes
>
>
>
> On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes  wrote:
>
>> Hi,
>>
>> My context: Java 8 , Beam 2.19.0
>>
>> *TLDR*: How do I create a Beam-SQL UDF that returns a Map> String> ?
>>
>> I have a library ( https://yauaa.basjes.nl ) that people would like to
>> use in combination with Beam-SQL.
>> The essence of this library is that a String goes in an a Key-Value set
>> (Map) comes out.
>>
>> I've already done this for Flink-SQL and there it was relatively easy:
>> Just implement the appropriate function and specify that the return type is
>> a Map.
>> See
>> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88
>>
>> Now for Beam-SQL I've been able to implement a function that returns a
>> String but if I try to return a Map I get a nasty error.
>>
>> If I use this smallest function possible in my SQL (Calcite)
>>
>> public class FooMap implements SerializableFunction> String>> {
>> @Override
>> public Map apply(String input) {
>> final HashMap hashMap = new HashMap<>();
>> hashMap.put("Some", "Thing");
>> return hashMap;
>> }
>> }
>>
>> I get this error
>>
>>
>> java.lang.NullPointerException: Null type
>>
>> at
>> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
>> at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
>> at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>> at java.util.Iterator.forEachRemaining(Iterator.java:116)
>> at
>> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>> at
>> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
>> at
>> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
>>
>>
>> If I have a function that returns a String it all works as expected
>>
>>
>> public class BarString implements SerializableFunction {
>> @Override
>> public String apply(String input) {
>> return new StringBuilder(input).reverse().toString();
>> }
>> }
>>
>> I already had a look at the Beam sourcecode and I have not yet been able to 
>> figure out how I can explicitly tell the framework the Schema that my 
>> function returns (like I did in the Flink implementation of the same).
>>
>> I found the built in functions that use a @UDF annotation that specifies the 

Re: Pipeline graph info

2019-11-21 Thread Andrew Pilloud
This is awesome, but not easily discoverable. Kirill was looking for the
same thing a few weeks ago. We should add a blog post or something to make
it easier to discover.

Andrew

On Thu, Nov 21, 2019 at 11:31 AM Rustam Mehmandarov 
wrote:

> Oh, that was much easier that I thought! 珞Thanks a lot!
>
> Rustam
>
> On Thu, Nov 21, 2019, 18:07 Luke Cwik  wrote:
>
>> Pipeline pipeline = ... build my pipeline ...
>> String dotString = PipelineDotRenderer.toDotString(pipeline);
>>
>> Now that you have a string containing a DOT[1] graph. You can use any
>> graph layout application/library[2] to visualize it. For example with
>> graphviz[3] (after saving the dotString to /tmp/file.png):
>> dot -Tpng -o /tmp/file.png /tmp/file.dot
>> Open /tmp/file.png with any image viewer.
>>
>> 1: https://en.wikipedia.org/wiki/DOT_(graph_description_language)
>> 2:
>> https://en.wikipedia.org/wiki/DOT_(graph_description_language)#Layout_programs
>> 3: https://en.wikipedia.org/wiki/Graphviz
>>
>> On Thu, Nov 21, 2019 at 1:33 AM Rustam Mehmandarov 
>> wrote:
>>
>>> Hi, Luke!
>>>
>>> Thanks a lot! Do you have a short code example of how the renderers work
>>> in Beam?
>>>
>>> Rustam
>>>
>>> On Mon, Nov 11, 2019, 17:16 Luke Cwik  wrote:
>>>
 Have you tried the PipelineDotRenderer[1]?

 It can create a dot graph of both a Java pipeline object and also the
 proto pipeline representation. It isn't very sophisticated so feel free to
 contribute to improve upon it.

 1:
 https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806c859e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java#L28

 On Sat, Nov 9, 2019 at 1:40 PM Rustam Mehmandarov <
 mehmanda...@gmail.com> wrote:

> Hi all,
>
> I am looking for a way to get the pipeline graph for Beam Java API,
> something similar to the info that is used to visualise the pipeline when
> submitting the job to Dataflow. Any help is greatly appreciated.
>
> Thanks,
> Rustam
>



Re:

2019-06-20 Thread Andrew Pilloud
Hi Alex,

Unfortunately you are receiving an UnsupportedOperationException because
selecting nested rows is not supported by Calcite. You select fields out of
the nested row but not a row itself. There are some recent bug fixes in
this area in Calcite 1.20, so it might be worth trying that. There has been
a lot of work on this area in Calcite in the past year so it is also
possible the work that remains is in Beam. We have a bug open on the issue
in Beam: https://issues.apache.org/jira/browse/BEAM-5189

Beam is repackaging Calcite by copying and relocating the code using
the gradle relocate rule. The code is unchanged, but
'org.apache.beam.repackaged.beam_sdks_java_extensions_sql' is added to the
class path so it won't conflict with Calcite in use by some of the Beam
runners. If you want to change the version to a local snapshot of calcite
for development you can modify it here:
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/build.gradle#L72

Andrew

On Thu, Jun 20, 2019 at 3:02 AM alex goos  wrote:

> Beam 2.13, I'm trying to do SqlTransform on Row records with hierarchical
> structures. Records having (nullable) arrays of (nullabke) sub-records:
>
> "select p.recordType, p.listOfServiceData.seqOf[0].ratingGroup as
> ratingGroup, p.abcdAddress.seqOf[0] as ABCD_IP FROM  PCOLLECTION "
>
> The code resposible for the Error when accessing subsctructures seems to
> come from a repackaged Calcite Library: Where do I start asking
> around/fixing the issue ?! Here? at the Apache Calcite mailing list? How is
> Beam "repackageing" Calcite-core ? Manually,by importing code ?!
>
> Exception in thread "main" java.lang.UnsupportedOperationException: class
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexFieldAccess:
> ITEM($28.seqOf, 0).ratingGroup
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.Util.needToImplement(Util.java:955)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql2rel.RelStructuredTypeFlattener.flattenProjection(RelStructuredTypeFlattener.java:662)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql2rel.RelStructuredTypeFlattener.flattenProjections(RelStructuredTypeFlattener.java:587)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql2rel.RelStructuredTypeFlattener.rewriteRel(RelStructuredTypeFlattener.java:501)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ReflectUtil.invokeVisitorInternal(ReflectUtil.java:257)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ReflectUtil.invokeVisitor(ReflectUtil.java:214)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ReflectUtil$1.invokeVisitor(ReflectUtil.java:464)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql2rel.RelStructuredTypeFlattener$RewriteRelVisitor.visit(RelStructuredTypeFlattener.java:768)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql2rel.RelStructuredTypeFlattener.rewrite(RelStructuredTypeFlattener.java:195)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql2rel.SqlToRelConverter.flattenTypes(SqlToRelConverter.java:468)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.rel(PlannerImpl.java:236)
> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:129)
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:87)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:122)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:82)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> Many thanks!
>


[ANNOUNCE] Apache Beam 2.12.0 released!

2019-04-25 Thread Andrew Pilloud
The Apache Beam team is pleased to announce the release of version 2.12.0!

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing. See https://beam.apache.org

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bugfixes, features, and improvements detailed on
the Beam blog: https://beam.apache.org/blog/2019/04/25/beam-2.12.0.html

Thanks to everyone who contributed to this release, and we hope you enjoy
using Beam 2.12.0.

-- Andrew Pilloud, on behalf of The Apache Beam team


Re:

2018-12-10 Thread Andrew Pilloud
Ouch. Someone runs into this every few months. Beam SQL has this pattern in
a few different places. I really wish we could fix this, but it is a hard
problem. There was a nice thread describing why over on dev:

https://lists.apache.org/thread.html/b5a37ef32d892fdecd1fc0b16b24fdc934cd2b0c0c77c193431739e0@%3Cdev.beam.apache.org%3E

Andrew

On Mon, Dec 10, 2018 at 3:58 AM Matt Casters  wrote:

> So, on the off-chance someone else bumps into this.
> The actual real fix this error I got:
>
> java.lang.IllegalArgumentException: No filesystem found for scheme
>
> was for me to set the correct ClassLoader for the current Thread:
>
> ClassLoader oldContextClassLoader =
> Thread.currentThread().getContextClassLoader();
> try {
>
>   Thread.currentThread().setContextClassLoader( yourClassLoader );
>
>   // Build/Run Pipeline
>
> } finally {
>
>   Thread.currentThread().setContextClassLoader( oldContextClassLoader );
>
> }
>
> It allows org.apache.beam.sdk.io.FileSystems to pick up the correct
> classloader.
>
> HTH,
> Matt
> ---
> Matt Casters attcast...@gmail.com>
> Senior Solution Architect, Kettle Project Founder
>
>
>
> Op vr 30 nov. 2018 om 12:51 schreef Matt Casters :
>
>> I just wanted to thank you again.  I split up my project in a beam core
>> stuff and my plugin.  This got rid of a number of circular dependency
>> issues and lib conflicts.
>> I also gave the Dataflow PipelineOptions the list of files to stage.
>>
>> That has made things work and much quicker than I anticipated I must
>> admit.
>> I'm in awe of how clean and intuitive the Beam API is (once you get the
>> hang of it).
>> Thanks for everything!
>>
>> https://github.com/mattcasters/kettle-beam-core
>> https://github.com/mattcasters/kettle-beam
>>
>> Cheers,
>>
>> Matt
>> ---
>> Matt Casters attcast...@gmail.com>
>> Senior Solution Architect, Kettle Project Founder
>>
>>
>> Op do 29 nov. 2018 om 19:03 schreef Matt Casters :
>>
>>> Thanks a lot for the replies. The problem is not that the jar files
>>> aren't in the classloader, it's that something somewhere insists on using
>>> the parent classloader.
>>> I guess it makes sense since I noticed that running in my IDEA Beam
>>> copied all required runtime binaries into GCP Storage so it must have an
>>> idea of what to pick up.
>>> I'm guessing it tries to pick up everything in the classpath.
>>>
>>> Throwing all the generated maven jar files into the main classpath of
>>> Kettle in this case is a bit messy I'm going to look for an alternative
>>> like an application alongside to communicate with.
>>>
>>> I'll report back once I get a bit further along.
>>>
>>> Cheers,
>>> Matt
>>>
>>> Op do 29 nov. 2018 om 17:10 schreef Juan Carlos Garcia <
>>> jcgarc...@gmail.com>:
>>>
 If you are using Gradle for packaging, make sure your final jar
 (fat-jar) contains all the services files merged.

 Using the Gradle shadowJar plugin include "*mergeServiceFiles()*"
 instruction like:

 apply plugin: 'com.github.johnrengelman.shadow'
 shadowJar {
 mergeServiceFiles()

 zip64 true
 classifier = 'bundled'
 }

 If you are using Maven then use the Shade plugin.

 On Thu, Nov 29, 2018 at 4:50 PM Robert Bradshaw 
 wrote:

> BeamJava uses com.google.auto.service.AutoService which, at the end of
> the day, is shorthand for Java's standard ServiceLoader mechanisms
> (e.g. see [1]). I'm not an expert on the details of how this works,
> but you'll probably have to make sure these filesystem dependencies
> are in your custom classloader's jar.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
> On Thu, Nov 29, 2018 at 3:57 PM Matt Casters 
> wrote:
> >
> > Hello Beam,
> >
> > I've been taking great steps forward in having Kettle generate Beam
> pipelines and they actually execute just find in unit testing in IntelliJ.
> > The problem starts when I collect all the libraries needed for Beam
> and the Runners and throw them into the Kettle project as a plugin.
> >
> > Caused by: java.lang.IllegalArgumentException: No filesystem found
> for scheme gs
> > at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)
> > at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)
> > at org.apache.beam.sdk.io
> .FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:213)
> > at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:700)
> > at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1028)
> > at
> org.kettle.beam.core.transform.BeamOutputTransform.expand(BeamOutputTransform.java:87)
> > ... 32 more
> >
> > This also happens for local file execution ("scheme file" in that
> case).
> >
> > So the questions are: how is Beam bootstrapped? How does Beam

Re: RabbitMqIO missing in Maven Central

2018-11-08 Thread Andrew Pilloud
RabbitMqIO wasn't merged into Beam until after the 2.8.0 release was cut.
The first release it will appear in is 2.9.0.

Andrew

On Thu, Nov 8, 2018 at 1:10 PM Jeroen Steggink | knowsy 
wrote:

> Hi guys,
>
> I tried getting the new RabbitMqIO, however, it's not present in Maven
> Central (http://central.maven.org/maven2/org/apache/beam/), while other
> 2.8.0 sdks are.
>
> Any reason why?
>
> Best,
>
> Jeroen
>
>


Re: Large CSV files

2018-07-23 Thread Andrew Pilloud
Hi Kelsey,

I posted a reply on stackoverflow. It sounds like you might be using the
DirectRunner, which isn't meant to handle datasets that are too big to fit
into memory. If that is the case, have you tried the Flink local runner or
the Dataflow runner?

Andrew

On Mon, Jul 23, 2018 at 4:06 AM Kelsey RIDER 
wrote:

> Hello,
>
>
>
> SO question here :
> https://stackoverflow.com/questions/51439189/how-to-read-large-csv-with-beam
>
> Anybody have any ideas? Am I missing something?
>
>
>
> Thanks
> Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.
>


Re: BigQueryIO Failed Insert

2018-05-08 Thread Andrew Pilloud
Hi Strahinja,

Looking at the code, not found is considered a transient error which will be 
retried until the table is created. Try using neverRetry and see if that gives 
you the results you expect?

Andrew

On 2018/05/04 17:16:39, Strahinja Vladetic  
wrote: 
> Hey all,
> 
> I have been running into an issue with streaming writes to Big Query using
> a custom destination and I can't seem to find an appropriate solution in
> the docs so I was hoping someone here could help.
> 
> The Issue: Specifying a table destination which does not exist causes the
> message to fail to be
>   inserted and there is no way to handle the error.
> 
> The problem is that the message does not appear in the `getFailedInserts()`
> function so I don't see a way of getting a handle on this error so that I
> could, for example, direct the message to a poison or deadletter queue.
> 
> Some code for reference
> 
> 
> 
> 
> Is there a way of handling this use case in the current beam SDK?
> 
> Thanks,
> Strahinja Vladetic
>