Flink version: 1.12.0 

在使用 Flink 执行 Flink SQL  流表 join 维表, 运行报错(流表SQL 和维表SQL单独运行都没有问题), 错误堆栈信息如下:

Exception in thread "main" java.lang.RuntimeException: 
org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common 
type of GeneratedExpression(field$18,isNull$17,,STRING,None) and 
ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)), 
GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))).
        at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:47)
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unable to 
find common type of GeneratedExpression(field$18,isNull$17,,STRING,None) and 
ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)), 
GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))).
        at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:307)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:307)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:143)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:190)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:84)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:38)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:38)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
        at com.hmd.stream.SqlSubmit.callInsertInto(SqlSubmit.java:110)
        at com.hmd.stream.SqlSubmit.callCommand(SqlSubmit.java:85)
        at com.hmd.stream.SqlSubmit.run(SqlSubmit.java:71)
        at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:45)

运行的 SQL 如下:
--  流表 source kafka
CREATE TABLE `t_Order_Order` (
                                 id BIGINT,
                                 type INT,
                                 amount VARCHAR,
                                 receivedAmount VARCHAR,
                                 channelType VARCHAR,
                                 accountId BIGINT,
                                 isCreditPeriod VARCHAR,
                                 isCyclePeriod VARCHAR,
                                 originalOrderId VARCHAR,
                                 status VARCHAR,
                                 insertTime VARCHAR,
                                 `mark` INT,
                                 isNegotiation VARCHAR,
                                 statusTime1 VARCHAR,
                                 statusTime2 VARCHAR,
                                 statusTime4 VARCHAR,
                                 statusTime8 VARCHAR,
                                 statusTime16 VARCHAR,
                                 isFirstOrder VARCHAR,
                                 isReturn VARCHAR ,
                                 orgCode VARCHAR ,
                                 proctime AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'ods_Homedo_t_Order_Order',
    'properties.bootstrap.servers' = '10.0.15.130:9092',
    'properties.group.id' = 'test-homodo',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset',
    'json.ignore-parse-errors' = 'true'
);

-- dim_finance_account_fortest
CREATE TABLE `dim_finance_account_fortest`(
    `id` BIGINT,
    `mark` INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'table-name'='t_finance_account_fortest',
    'url'='jdbc:mysql://10.0.0.29:3306/hmd_ods',
    'username'='read',
    'password'='si7v3#,a',
    'lookup.cache.max-rows'='5000',
    'lookup.cache.ttl'='600s'
);



-- sink
CREATE TABLE `sink_kafka` (
                            order_id BIGINT ,
                            account_id BIGINT  ,
                            original_order_id VARCHAR ,
                            org_code VARCHAR ,
                            order_type_id INT,
                            order_status_id VARCHAR,
                            order_status_name VARCHAR,
                            channel_type_id VARCHAR,
                            channel_type_name VARCHAR,
                            is_credit_order INT,
                            is_credit_period VARCHAR,
                            is_cycle_period VARCHAR,
                            is_negotiation VARCHAR,
                            is_firstorder VARCHAR,
                            is_return VARCHAR,
                            order_gmv VARCHAR,
                            reder_total_received_amount VARCHAR,
                            order_date VARCHAR,
                            order_time VARCHAR,
                            order_cancel_time VARCHAR,
                            order_complete_time VARCHAR,
                            order_pend_payment VARCHAR,
                            order_confirm_time VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO `sink_kafka`
SELECT
    oo.id AS order_id ,
    oo.accountId AS account_id,
    oo.originalOrderId AS original_order_id,
    oo.orgCode AS org_code,
    oo.type AS order_type_id,
    oo.status AS order_status_id,
    ( case
          when oo.status = 1 then '交易取消'
          when oo.status = 2 then '交易完成'
          when oo.status = 4 then '等待客服处理'
          when oo.status = 8 then '等待客户付款'
          when oo.status =16 then '等待订单发出'
          when oo.status =32 then '等待客户收货'
          else '其他'
        end
    ) AS order_status_name,
    oo.channelType AS channel_type_id,
    ( case oo.channelType
          when 1 then 'OMS'
          when 2 then 'PC'
          when 4 then 'M站'
          when 8 then 'APP'
          when 16 then 'APP'
          when 32 then '小程序'
          else '其他'
        end
    )  AS channel_type_name,
    ( case
          when oo.isCreditPeriod =1 and oo.isCyclePeriod =1 then 1
          else 0
        end
    ) AS is_credit_order,
    oo.isCreditPeriod AS is_credit_period,
    oo.isCyclePeriod AS is_cycle_period,
    oo.isNegotiation AS is_negotiation,
    oo.isFirstOrder AS is_firstorder,
    oo.isReturn AS is_return,
    oo.amount AS order_gmv,
    oo.receivedAmount AS reder_total_received_amount,
    SUBSTR(oo.insertTime,1,10) AS order_date,
    SUBSTR(oo.insertTime,1,19) AS order_time,
    oo.statusTime1 AS order_cancel_time,
    oo.statusTime2 AS order_complete_time,
    ( case
         when oo.isCreditPeriod =1 and oo.isCyclePeriod =1 then oo.statusTime4
         else oo.statusTime8
        end
    ) AS order_pend_payment,
    oo.statusTime16 AS order_confirm_time
FROM
    (SELECT
        id ,
        type ,
        amount ,
        receivedAmount ,
        channelType ,
        accountId ,
        isCreditPeriod ,
        isCyclePeriod ,
        originalOrderId ,
        status,
        insertTime ,
        isNegotiation ,
        statusTime1 ,
        statusTime2 ,
        statusTime4 ,
        statusTime8 ,
        statusTime16 ,
        isFirstOrder ,
        isReturn ,
        orgCode,
        proctime
    FROM `t_Order_Order` WHERE mark > 0) AS oo
LEFT JOIN `dim_finance_account_fortest` FOR SYSTEM_TIME AS OF oo.proctime AS 
dfat
ON oo.accountId = dfat.id <http://dfat.id/>;

回复