Thank you! Please let me know if the workaround works for you. Best, Fabian
Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller <chris...@gmail.com>: > Hi Fabian, > > Thanks for confirming the issue and suggesting a workaround - I'll give > that a try. I've created a JIRA issue as you suggested, > https://issues.apache.org/jira/browse/FLINK-15112 > > Many thanks, > Chris > > > ------ Original Message ------ > From: "Fabian Hueske" <fhue...@gmail.com> > To: "Chris Miller" <chris...@gmail.com> > Cc: "user@flink.apache.org" <user@flink.apache.org> > Sent: 06/12/2019 14:52:16 > Subject: Re: Joining multiple temporal tables > > Hi Chris, > > Your query looks OK to me. > Moreover, you should get a SQLParseException (or something similar) if it > wouldn't be valid SQL. > > Hence, I assume you are running in a bug in one of the optimizer rules. > I tried to reproduce the problem on the SQL training environment and > couldn't write a query that joins two temporal tables. > What worked though was to first create a view of a query that joins the > stream with one temporal table and then join the view with the second one. > Maybe that workaround also works for you? > > It would be great if you could open a Jira issue for this bug including > your program to reproduce the bug. > > Thank you, > Fabian > > Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller <chris...@gmail.com > >: > >> I want to decorate/enrich a stream by joining it with "lookups" to the >> most recent data available in other streams. For example, suppose I have a >> stream of product orders. For each order, I want to add price and FX rate >> information based on the order's product ID and order currency. >> >> Is it possible to join a table with two other temporal tables to achieve >> this? I'm trying but getting a NullPointerException deep inside Flink's >> Calcite code. I've attached some sample code that demonstrates the problem. >> Is my SQL incorrect/invalid (in which case Flink ideally should detect the >> problem and provide a better error message), or is the SQL correct and this >> a bug/limitation in Flink? If it's the latter, how do I achieve a similar >> result? >> >> The SQL I'm trying to run: >> >> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price >> FROM Orders AS o, >> LATERAL TABLE (FxRateLookup(o.rowtime)) AS f, >> LATERAL TABLE (PriceLookup(o.rowtime)) AS p >> WHERE o.currency = f.currency >> AND o.productId = p.productId >> >> The exception I get: >> >> Exception in thread "main" java.lang.NullPointerException >> at >> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129) >> at >> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91) >> at >> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) >> at >> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) >> at >> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) >> at >> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284) >> at >> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) >> at >> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) >> at >> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) >> at >> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228) >> at >> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212) >> at >> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138) >> at >> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61) >> at >> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) >> at >> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187) >> at >> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127) >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) >> at >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) >> at >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) >> at >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218) >> at test.PojoTest.run(PojoTest.java:96) >> at test.PojoTest.main(PojoTest.java:23) >> >