Sorry to reply to my own post but I wasn't able to figure out a solution
for this. Does anyone have any suggestions I could try?
------ Original Message ------
From: "Chris Miller" <chris...@gmail.com>
To: "Timo Walther" <twal...@apache.org>; "user" <user@flink.apache.org>
Sent: 29/01/2019 10:06:47
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)
Thanks Timo, I didn't realise supplying Row could automatically apply
the correct types. In this case your suggestion doesn't solve the
problem though, I still get the exact same error. I assume that's
because there isn't a time attribute type on the tradesByInstr table
itself, but rather on the groupedTrades table that it joins with.
System.out.println(tradesByInstr.getSchema().toRowType()) outputs:
-> Row(InstrumentId: Integer, Name: String, ClosePrice: Double,
TradeCount: Long, Quantity: Double, Cost: Double)
System.out.println(groupedTrades.getSchema().toRowType()) outputs:
-> Row(t_InstrumentId: Integer, t_CounterpartyId: Integer, TradeCount:
Long, Quantity: Double, Cost: Double, LastTrade_EventTime:
TimeIndicatorTypeInfo(rowtime))
Looking at the stack trace it seems the query optimiser is tripping up
on the LastTrade_EventTime column, but that is required for the
temporal table join.
Any other ideas on how I can work around this problem?
Many thanks,
Chris
------ Original Message ------
From: "Timo Walther" <twal...@apache.org>
To: "Chris Miller" <chris...@gmail.com>; "user" <user@flink.apache.org>
Sent: 29/01/2019 09:44:14
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)
Hi Chris,
the exception message is a bit misleading. The time attribute (time
indicator) type is an internal type and should not be used by users.
The following line should solve your issue. Instead of:
DataStream<Tuple2<Boolean, Row>> tradesByInstrStream =
tableEnv.toRetractStream(tradesByInstr, typeInfo);
You can do
DataStream<Tuple2<Boolean, Row>> tradesByInstrStream =
tableEnv.toRetractStream(tradesByInstr, Row.class);
The API will automatically insert the right types for the table passed
when using a plain `Row.class`.
I hope this helps.
Regards,
Timo
Am 25.01.19 um 20:14 schrieb Chris Miller:
I'm trying to group some data and then enrich it by joining with a
temporal table function, however my test code (attached) is failing
with the error shown below. Can someone please give me a clue as to
what I'm doing wrong?
Exception in thread "main" java.lang.AssertionError: mismatched type
$5 TIMESTAMP(3)
at
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)
at
org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at
org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at
org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
at
org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
at test.Test.main(Test.java:78)