hello
我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗?
下面是我的异常日志以及sql文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=3;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
    operation STRING,
    operation_channel STRING,
    `time` STRING,
    ip STRING,
    lat STRING,
    lng STRING,
    user_id STRING,
    device_id STRING,
    imei STRING,
    targets ARRAY<ROW&lt;`type` STRING,`value` STRING>>,
    product_name STRING,
    product_version STRING,
    product_vendor STRING,
    platform STRING,
    platform_version STRING,
    `languaage` STRING,
    locale STRING,
    other_para MAP<STRING, STRING NULL>
) with (
    'connector'='kafka',
    'topic'='cloud_behavior',
    'properties.bootstrap.servers'='',
    'properties.group.id'='testGroup',
    'format'='avro',
    'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
    operation STRING,
    operation_channel STRING,
    ip STRING,
    lat STRING,
    lng STRING,
    user_id STRING,
    device_id STRING
) with (
    'connector'='jdbc',
    'url'='jdbc:mysql://hosts:3306/d_bigdata',
    'table-name'='flink_sql_test',
    'username'='',
    'password'='',
    'sink.buffer-flush.max-rows'='100'
);

-- 业务过程
insert into cloud_behavior_sink
select
     *
from cloud_behavior_source;

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY<ROW&lt;`type` VARCHAR(2147483647),
`value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP<VARCHAR(2147483647), VARCHAR(2147483647)>]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
        at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY<ROW&lt;`type` VARCHAR(2147483647),
`value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP<VARCHAR(2147483647), VARCHAR(2147483647)>]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
        at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
        at scala.Option.map(Option.scala:146)
        at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        ... 11 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复