Thank you!
Please let me know if the workaround works for you.

Best, Fabian

Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller <chris...@gmail.com>:

> Hi Fabian,
>
> Thanks for confirming the issue and suggesting a workaround - I'll give
> that a try. I've created a JIRA issue as you suggested,
> https://issues.apache.org/jira/browse/FLINK-15112
>
> Many thanks,
> Chris
>
>
> ------ Original Message ------
> From: "Fabian Hueske" <fhue...@gmail.com>
> To: "Chris Miller" <chris...@gmail.com>
> Cc: "user@flink.apache.org" <user@flink.apache.org>
> Sent: 06/12/2019 14:52:16
> Subject: Re: Joining multiple temporal tables
>
> Hi Chris,
>
> Your query looks OK to me.
> Moreover, you should get a SQLParseException (or something similar) if it
> wouldn't be valid SQL.
>
> Hence, I assume you are running in a bug in one of the optimizer rules.
> I tried to reproduce the problem on the SQL training environment and
> couldn't write a query that joins two temporal tables.
> What worked though was to first create a view of a query that joins the
> stream with one temporal table and then join the view with the second one.
> Maybe that workaround also works for you?
>
> It would be great if you could open a Jira issue for this bug including
> your program to reproduce the bug.
>
> Thank you,
> Fabian
>
> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller <chris...@gmail.com
> >:
>
>> I want to decorate/enrich a stream by joining it with "lookups" to the
>> most recent data available in other streams. For example, suppose I have a
>> stream of product orders. For each order, I want to add price and FX rate
>> information based on the order's product ID and order currency.
>>
>> Is it possible to join a table with two other temporal tables to achieve
>> this? I'm trying but getting a NullPointerException deep inside Flink's
>> Calcite code. I've attached some sample code that demonstrates the problem.
>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
>> problem and provide a better error message), or is the SQL correct and this
>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
>> result?
>>
>> The SQL I'm trying to run:
>>
>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>>   FROM Orders AS o,
>>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>>   WHERE o.currency = f.currency
>>   AND o.productId = p.productId
>>
>> The exception I get:
>>
>> Exception in thread "main" java.lang.NullPointerException
>>     at
>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
>>     at
>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
>>     at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>     at
>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>>     at
>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>>     at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
>>     at
>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>>     at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>>     at
>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>>     at
>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
>>     at
>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
>>     at
>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
>>     at
>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
>>     at
>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
>>     at
>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
>>     at
>> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
>>     at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>     at
>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
>>     at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
>>     at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
>>     at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
>>     at test.PojoTest.run(PojoTest.java:96)
>>     at test.PojoTest.main(PojoTest.java:23)
>>
>

Reply via email to