Hi Andrew,

Thank you so much for your help. Sorry to hear you changed team :(  I can
handle calcite upgrades if there is a fix. I was working on calcite upgrade
but then we started having so many issues. That's why I stopped doing it.

Talat

On Thu, Mar 2, 2023 at 11:56 AM Andrew Pilloud <apill...@google.com> wrote:

> Hi Talat,
>
> I managed to turn your test case into something against Calcite. It
> looks like there is a bug affecting tables that contain one or more
> single element structs and no multi element structs. I've sent the
> details to the Calcite mailing list here.
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread_tlr9hsmx09by79h91nwp2d4nv8jfwsto&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr&s=g36wnBGvi7DQG7gvljaG08vXIhROyCoz5vWBBRS43Ag&e=
>
> I'm experimenting with ideas on how to work around this but a fix will
> likely require a Calcite upgrade, which is not something I'd have time
> to help with. (I'm not on the Google Beam team anymore.)
>
> Andrew
>
> On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
> <tuya...@paloaltonetworks.com> wrote:
> >
> > Hi @Andrew Pilloud
> >
> > Sorry for the late response. Yes your test is working fine. I changed
> the test input structure like our input structure. Now this test also has
> the same exception.
> >
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQL:
> > WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`,
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> > FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> > WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> > FROM `tempTable` AS `tempTable`)
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQLPlan>
> > LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
> >   LogicalFilter(condition=[=($0.name, 'innerStr')])
> >     LogicalProject(name=[$0.name], id=[$1], value=[$2])
> >       BeamIOSourceRel(table=[[beam, panwRowTestTable]])
> >
> >
> > fieldList must not be null, type = VARCHAR
> > java.lang.AssertionError: fieldList must not be null, type = VARCHAR
> >
> > I dont know what is different from yours. I am sharing my version of the
> test also.
> >
> >
> > Index:
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > IDEA additional info:
> > Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> > <+>UTF-8
> > ===================================================================
> > diff --git
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > ---
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> > +++
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (date 1677017032324)
> > @@ -54,6 +54,9 @@
> >    private static final Schema innerRowSchema =
> >
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
> >
> > +  private static final Schema innerPanwRowSchema =
> > +          Schema.builder().addStringField("name").build();
> > +
> >    private static final Schema innerRowWithArraySchema =
> >        Schema.builder()
> >            .addStringField("string_field")
> > @@ -127,8 +130,12 @@
> >                                .build()))
> >                .put(
> >                    "basicRowTestTable",
> > -                  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col")
> > -
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> > +                  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col", FieldType.INT64, "field")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
> 1L))
> > +                .put(
> > +                  "panwRowTestTable",
> > +
> TestBoundedTable.of(FieldType.row(innerPanwRowSchema), "user_info",
> FieldType.INT64, "id", FieldType.STRING, "value")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L,
> "some_value"))
> >                .put(
> >                    "rowWithArrayTestTable",
> >
> TestBoundedTable.of(FieldType.row(rowWithArraySchema), "col")
> > @@ -219,6 +226,21 @@
> >                  .build());
> >      pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> >    }
> > +
> > +  @Test
> > +  public void testBasicRowWhereField() {
> > +    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> > +    PCollection<Row> stream =
> > +        BeamSqlRelUtils.toPCollection(
> > +            pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT *
> FROM panwRowTestTable WHERE panwRowTestTable.`user_info`.`name` =
> 'innerStr') SELECT * FROM tempTable"));
> > +    Schema outputSchema = Schema.builder().addRowField("col",
> innerRowSchema).addInt64Field("field").build();
> > +    PAssert.that(stream)
> > +        .containsInAnyOrder(
> > +            Row.withSchema(outputSchema)
> > +
> .addValues(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L)
> > +                .build());
> > +    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> > +  }
> >
> >    @Test
> >    public void testArrayConstructor() {
> >
> >
> >
> >
> > On Fri, Feb 10, 2023 at 6:14 PM Andrew Pilloud <apill...@google.com>
> wrote:
> >>
> >> I have a test case that I believe should reproduce this on both head
> and 2.43 but it ends up with a different logical plan. Can you provide your
> input types?
> >>
> >> We have a class of issues around compex types
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_issues_19009&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr&s=jiwgVLI-2aKFAt9d57xaZIDi2TUK1Y3qMdMUNYFfNdA&e=
> I don't believe the "LogicalFilter(condition=[=($2.name, 'User1')])"
> particularly "$2.name" is something that works, in my test it seems that
> the planner has flattened the complex input and reproduced a ROW at the
> output.
> >>
> >>     INFO: SQLPlan>
> >>     LogicalProject(col=[ROW($0, $1)], field=[$2])
> >>       LogicalFilter(condition=[=($0, 'innerStr')])
> >>         LogicalProject(string_field=[$0.string_field],
> long_field=[$0.long_field], field=[$1])
> >>           BeamIOSourceRel(table=[[beam, basicRowTestTable]])
> >>
> >>     Feb 10, 2023 6:07:35 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> >>     INFO: BEAMPlan>
> >>     BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t0.string_field],
> expr#3=[$t0.long_field], expr#4=[ROW($t2, $t3)],
> expr#5=['innerStr':VARCHAR], expr#6=[=($t2, $t5)], col=[$t4], field=[$t1],
> $condition=[$t6])
> >>       BeamIOSourceRel(table=[[beam, basicRowTestTable]])
> >>
> >> ---
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> >> +++
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> >> @@ -127,8 +127,8 @@ public class BeamComplexTypeTest {
> >>                                .build()))
> >>                .put(
> >>                    "basicRowTestTable",
> >> -                  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col")
> >> -
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> >> +                  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col", FieldType.INT64, "field")
> >> +
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
> 1L))
> >>                .put(
> >>                    "rowWithArrayTestTable",
> >>
> TestBoundedTable.of(FieldType.row(rowWithArraySchema), "col")
> >> @@ -220,6 +220,21 @@ public class BeamComplexTypeTest {
> >>      pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> >>    }
> >>
> >> +  @Test
> >> +  public void testBasicRowWhereField() {
> >> +    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> >> +    PCollection<Row> stream =
> >> +        BeamSqlRelUtils.toPCollection(
> >> +            pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT *
> FROM basicRowTestTable WHERE basicRowTestTable.col.string_field =
> 'innerStr') SELECT * FROM tempTable"));
> >> +    Schema outputSchema = Schema.builder().addRowField("col",
> innerRowSchema).addInt64Field("field").build();
> >> +    PAssert.that(stream)
> >> +        .containsInAnyOrder(
> >> +            Row.withSchema(outputSchema)
> >> +
> .addValues(Row.withSchema(innerRowSchema).addValues("innerStr",
> 1L).build(), 1L)
> >> +                .build());
> >> +    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> >> +  }
> >> +
> >>    @Test
> >>    public void testArrayConstructor() {
> >>      BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> >>
> >>
> >> On Fri, Feb 3, 2023 at 2:06 PM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
> >>>
> >>> Hi Andrew,
> >>>
> >>> Thank you for your MR. I am parricated to help us to solve the issue.
> I rerun our tests and they are partially passing now with your fix.
> However, there is one more issue with the WITH clause.
> >>>
> >>> When i run following query somehow beam lost type of column
> >>>
> >>> WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE
> PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable
> >>>
> >>> I havent test on Beam Master. I run with your latest patch on our code
> base. This is the output
> >>>
> >>> 14:00:30.095 [Test worker] INFO
> o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
> >>> WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`,
> `PCOLLECTION`.`user_info`
> >>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
> >>> WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT
> `tempTable`.`id`, `tempTable`.`value`, `tempTable`.`user_info`
> >>> FROM `tempTable` AS `tempTable`)
> >>> 14:00:30.106 [Test worker] DEBUG 
> >>> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel
> - Plan after converting SqlNode to RelNode
> >>> LogicalProject(id=[$0], value=[$1], user_info=[$2])
> >>>   LogicalFilter(condition=[=($2.name, 'User1')])
> >>>     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> >>>
> >>> 14:00:30.107 [Test worker] DEBUG 
> >>> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel
> - Plan after converting SqlNode to RelNode
> >>> LogicalProject(id=[$0], value=[$1], user_info=[$2])
> >>>   LogicalFilter(condition=[=($2.name, 'User1')])
> >>>     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> >>>
> >>> 14:00:30.109 [Test worker] INFO
> o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan>
> >>> LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)])
> >>>   LogicalFilter(condition=[=($2.name, 'User1')])
> >>>     LogicalProject(id=[$0], value=[$1], name=[$2.name])
> >>>       BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> >>>
> >>> 14:00:30.173 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
> COST = {inf}
> >>> 14:00:30.173 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
> [BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] rels [#27]
> >>> 14:00:30.173 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118: Apply rule
> [BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] to
> [rel#27:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, PCOLLECTION])]
> >>> 14:00:30.174 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#41
> via BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)
> >>> 14:00:30.175 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118 generated 1
> successors:
> [rel#41:BeamEnumerableConverter.ENUMERABLE(input=BeamIOSourceRel#27)]
> >>> 14:00:30.175 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
> COST = {inf}
> >>> 14:00:30.175 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
> [ProjectToCalcRule] rels [#33]
> >>> 14:00:30.175 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136: Apply rule
> [ProjectToCalcRule] to
> [rel#33:LogicalProject.NONE(input=RelSubset#32,inputs=0..1,exprs=[$2.name
> ])]
> >>> 14:00:30.177 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#44
> via ProjectToCalcRule
> >>> 14:00:30.178 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136 generated 1
> successors:
> [rel#44:LogicalCalc.NONE(input=RelSubset#32,expr#0..2={inputs},expr#3=$
> t2.name,proj#0..1={exprs},2=$t3)]
> >>> 14:00:30.178 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
> COST = {inf}
> >>> 14:00:30.178 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
> [FilterToCalcRule] rels [#35]
> >>> 14:00:30.178 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#160: Apply rule
> [FilterToCalcRule] to
> [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition==($2.name,
> 'User1'))]
> >>>
> >>> fieldList must not be null, type = VARCHAR
> >>> java.lang.AssertionError: fieldList must not be null, type = VARCHAR
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelDataTypeImpl.getFieldList(RelDataTypeImpl.java:164)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.checkValid(RexFieldAccess.java:76)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.<init>(RexFieldAccess.java:64)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:208)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:911)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:894)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:94)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:161)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:113)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:896)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:894)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall.accept(RexCall.java:189)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:302)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder.addCondition(RexProgramBuilder.java:277)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.rules.FilterToCalcRule.onMatch(FilterToCalcRule.java:76)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:239)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:61)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
> >>> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:373)
> >>> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:211)
> >>> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112)
> >>> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
> >>> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:110)
> >>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
> >>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
> >>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373)
> >>> at
> com.paloaltonetworks.cortex.streamcompute.filter.Filter.expand(Filter.java:126)
> >>> at
> com.paloaltonetworks.cortex.streamcompute.filter.Filter.expand(Filter.java:49)
> >>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
> >>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
> >>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373)
> >>> at
> com.paloaltonetworks.cortex.streamcompute.filter.WithClauseFilterComplexBulkTest.testIt(WithClauseFilterComplexBulkTest.java:149)
> >>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> >>> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> >>> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> >>> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> >>> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> >>> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)
> >>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> >>> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> >>> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> >>> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> >>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> >>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> >>> at org.junit.runners.Suite.runChild(Suite.java:128)
> >>> at org.junit.runners.Suite.runChild(Suite.java:27)
> >>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> >>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> >>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> >>> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
> >>> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> >>> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> >>> at
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
> >>> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> >>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> >>> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> >>> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> >>> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> >>> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> >>> at com.sun.proxy.$Proxy5.processTestClass(Unknown Source)
> >>> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
> >>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> >>> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> >>> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> >>> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
> >>> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
> >>> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
> >>> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
> >>> at
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
> >>> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >>> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >>> at
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> >>> at java.base/java.lang.Thread.run(Thread.java:829)
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Thu, Feb 2, 2023 at 1:06 PM Andrew Pilloud <apill...@google.com>
> wrote:
> >>>>
> >>>> It looks like Calcite stopped considering field names in RelNode
> equality as of Calcite 2.22 (which we use in Beam v2.34.0+). This can
> result in a planner state where two nodes that only differ by field name
> are considered equivalent.
> >>>>
> >>>> I have a fix for Beam in
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_pull_25290&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr&s=sN2vqbeKtqj5xruI4uJSEP9f_kEA1ARhRTSopFq07ow&e=
> and I'll send an email to the Calcite dev list with more details.
> >>>>
> >>>> Andrew
> >>>>
> >>>> On Fri, Jan 27, 2023 at 11:33 AM Andrew Pilloud <apill...@google.com>
> wrote:
> >>>>>
> >>>>> Also this is at very least a Beam bug. You can file a Beam issue if
> you want, otherwise I will when I get back.
> >>>>>
> >>>>> Andrew
> >>>>>
> >>>>> On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud <apill...@google.com>
> wrote:
> >>>>>>
> >>>>>> Hi Talat,
> >>>>>>
> >>>>>> I did get your test case running and added some logging to
> RexProgramBuilder.mergePrograms. There is only one merge that occurs during
> the test and it has an output type of RecordType(JavaType(int) ID,
> JavaType(class java.lang.String) V). This does seem like the correct output
> name but it doesn't match the final output name, so something is still
> different than the Beam test case. I also modified mergePrograms to
> purposely corrupt the output names, that did not cause the test to fail or
> trip the 'assert mergedProg.getOutputRowType() ==
> topProgram.getOutputRowType();' in mergePrograms. I could not find any
> Calcite unit tests for RexProgramBuilder.mergePrograms or
> CoreRules.CALC_MERGE rule so I think it is still probable that the problem
> is in this area.
> >>>>>>
> >>>>>> One minor issue I encountered. It took me a while to get your test
> case running, it doesn't appear there are any calcite gradle rules to run
> CoreQuidemTest and constructing the classpath manually was tedious. Did I
> miss something?
> >>>>>>
> >>>>>> I'm still working on this but I'm out today and Monday, it will
> probably be Wednesday before I make any more progress.
> >>>>>>
> >>>>>> Andrew
> >>>>>>
> >>>>>> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
> >>>>>>>
> >>>>>>> Hi Andrew,
> >>>>>>>
> >>>>>>> Yes This aligned also with my debugging. In My Kenn's reply you
> can see a sql test which I wrote in Calcite. Somehow Calcite does not have
> this issue with the 1.28 version.
> >>>>>>>
> >>>>>>> !use post
> >>>>>>> !set outputformat mysql
> >>>>>>>
> >>>>>>> #Test aliases with with clause
> >>>>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id,
> "hr"."emps"."name" as v from "hr"."emps")
> >>>>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable
> WHERE tempTable.v <> '11' ;
> >>>>>>> +-----+-----------+
> >>>>>>> | ID  | value     |
> >>>>>>> +-----+-----------+
> >>>>>>> | 100 | Bill      |
> >>>>>>> | 110 | Theodore  |
> >>>>>>> | 150 | Sebastian |
> >>>>>>> | 200 | Eric      |
> >>>>>>> +-----+-----------+
> >>>>>>> (4 rows)
> >>>>>>>
> >>>>>>> !ok
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud <
> apill...@google.com> wrote:
> >>>>>>>>
> >>>>>>>> Yes, that worked.
> >>>>>>>>
> >>>>>>>> The issue does not occur if I disable all of the following
> planner rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE), and
> BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
> >>>>>>>>
> >>>>>>>> All the rules share a common call to
> RexProgramBuilder.mergePrograms, so I suspect the problem lies there. I
> spent some time looking but wasn't able to find it by code inspection, it
> looks like this code path is doing the right thing with names. I'll spend
> some time tomorrow trying to reproduce this on pure Calcite.
> >>>>>>>>
> >>>>>>>> Andrew
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Andrew,
> >>>>>>>>>
> >>>>>>>>> Thanks for writing a test for this use case. Without Where
> clause it works as expected on our test cases also too. Please add where
> clause on second select. With the below query it does not return column
> names. I tested on my local also.
> >>>>>>>>>
> >>>>>>>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v
> FROM PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable
> WHERE id > 1
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud <
> apill...@google.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> +d...@beam.apache.org
> >>>>>>>>>>
> >>>>>>>>>> I tried reproducing this but was not successful, the output
> schema was as expected. I added the following to
> BeamSqlMultipleSchemasTest.java at head. (I did discover that
> PAssert.that(result).containsInAnyOrder(output) doesn't validate column
> names however.)
> >>>>>>>>>>
> >>>>>>>>>>   @Test
> >>>>>>>>>>   public void testSelectAs() {
> >>>>>>>>>>     PCollection<Row> input = pipeline.apply(create(row(1,
> "strstr")));
> >>>>>>>>>>
> >>>>>>>>>>     PCollection<Row> result =
> >>>>>>>>>>         input.apply(SqlTransform.query("WITH tempTable (id, v)
> AS (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS
> fout_int, v AS fout_string FROM tempTable"));
> >>>>>>>>>>
> >>>>>>>>>>     Schema output_schema =
> >>>>>>>>>>
>  
> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
> >>>>>>>>>>     assertThat(result.getSchema(), equalTo(output_schema));
> >>>>>>>>>>
> >>>>>>>>>>     Row output = Row.withSchema(output_schema).addValues(1,
> "strstr").build();
> >>>>>>>>>>     PAssert.that(result).containsInAnyOrder(output);
> >>>>>>>>>>     pipeline.run();
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Kenn,
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you for replying back to my email.
> >>>>>>>>>>>
> >>>>>>>>>>> I was under the same impression about Calcite. But I wrote a
> test on Calcite 1.28 too. It is working without issue that I see on BEAM
> >>>>>>>>>>>
> >>>>>>>>>>> Here is my test case. If you want you can also run on Calcite.
> Please put under core/src/test/resources/sql as text file. and Run
> CoreQuidemTest class.
> >>>>>>>>>>>
> >>>>>>>>>>> !use post
> >>>>>>>>>>> !set outputformat mysql
> >>>>>>>>>>>
> >>>>>>>>>>> #Test aliases with with clause
> >>>>>>>>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id,
> "hr"."emps"."name" as v from "hr"."emps")
> >>>>>>>>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM
> tempTable WHERE tempTable.v <> '11' ;
> >>>>>>>>>>> +-----+-----------+
> >>>>>>>>>>> | ID  | value     |
> >>>>>>>>>>> +-----+-----------+
> >>>>>>>>>>> | 100 | Bill      |
> >>>>>>>>>>> | 110 | Theodore  |
> >>>>>>>>>>> | 150 | Sebastian |
> >>>>>>>>>>> | 200 | Eric      |
> >>>>>>>>>>> +-----+-----------+
> >>>>>>>>>>> (4 rows)
> >>>>>>>>>>>
> >>>>>>>>>>> !ok
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles <
> k...@apache.org> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looking at the code that turns a logical CalcRel into a
> BeamCalcRel I do not see any obvious cause for this:
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_b3aa2e89489898f8c760294ba4dba2310ac53e70_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rule_BeamCalcRule.java-23L69&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr&s=0aBPl94TiI_GUnilXOKfeqql2w3j1QHYzUSHnoOxD_E&e=
> >>>>>>>>>>>>
> >>>>>>>>>>>> I don't like to guess that upstream libraries have the bug,
> but in this case I wonder if the alias is lost in the Calcite optimizer
> rule for merging the projects and filters into a Calc.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Kenn
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles <
> k...@apache.org> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am not sure I understand the question, but I do see an
> issue.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Context: "CalcRel" is an optimized relational operation that
> is somewhat like ParDo, with a small snippet of a single-assignment DSL
> embedded in it. Calcite will choose to merge all the projects and filters
> into the node, and then generates Java bytecode to directly execute the DSL.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Problem: it looks like the CalcRel has output columns with
> aliases "id" and "v" where it should have output columns with aliases "id"
> and "value".
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay <
> al...@google.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Adding: @Andrew Pilloud @Kenneth Knowles
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
> user@beam.apache.org> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi All,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I am using Beam 2.43 with Calcite SQL with Java.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have a query with a WITH clause and some aliasing. Looks
> like Beam Query optimizer after optimizing my query, it drops Select
> statement's aliases. Can you help me to identify where the problem is ?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This is my query
> >>>>>>>>>>>>>>> INFO: SQL:
> >>>>>>>>>>>>>>> WITH `tempTable` (`id`, `v`) AS (SELECT
> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
> >>>>>>>>>>>>>>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT
> `tempTable`.`id` AS `id`, `tempTable`.`v` AS `value`
> >>>>>>>>>>>>>>> FROM `tempTable` AS `tempTable`
> >>>>>>>>>>>>>>> WHERE `tempTable`.`v` <> '11')
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This is Calcite Plan look at LogicalProject(id=[$0],
> value=[$1]) in SQL plan.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jan 12, 2023 12:19:08 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> >>>>>>>>>>>>>>> INFO: SQLPlan>
> >>>>>>>>>>>>>>> LogicalProject(id=[$0], value=[$1])
> >>>>>>>>>>>>>>>   LogicalFilter(condition=[<>($1, '11')])
> >>>>>>>>>>>>>>>     LogicalProject(id=[$1.f_nestedInt],
> v=[$1.f_nestedString])
> >>>>>>>>>>>>>>>       BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> But Beam Plan does not have a LogicalProject(id=[$0],
> value=[$1]) or similar.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jan 12, 2023 12:19:08 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> >>>>>>>>>>>>>>> INFO: BEAMPlan>
> >>>>>>>>>>>>>>> BeamCalcRel(expr#0..1=[{inputs}],
> expr#2=[$t1.f_nestedInt], expr#3=[$t1.f_nestedString],
> expr#4=['11':VARCHAR], expr#5=[<>($t3, $t4)], id=[$t2], v=[$t3],
> $condition=[$t5])
> >>>>>>>>>>>>>>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks
>

Reply via email to