[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17313:
-------------------------------
    Description: 
Test code like follwing(in blink planner):
{code:java}
                tEnv.sqlUpdate("create table randomSource (" +
                                                "               a varchar(10)," 
+
                                                "               b 
decimal(20,2)" +
                                                "       ) with (" +
                                                "               'type' = 
'random'," +
                                                "               'count' = '10'" 
+
                                                "       )");
                tEnv.sqlUpdate("create table printSink (" +
                                                "               a varchar(10)," 
+
                                                "               b 
decimal(22,2)," +
                                                "               c 
timestamp(3)," +
                                                "       ) with (" +
                                                "       'type' = 'print'" +
                                                "       )");
                tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
                tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation<Row> getRecordType() {
                return getTableSchema().toRowType();
        }
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 
'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

        at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
        at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
        at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
        at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
        at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
        at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
        at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
doesn't consider the different physical and logical type validation logic of 
source and sink:   logical type should be able to cover the physical type in 
source, but physical type should be able to cover the logic type in sink vice 
verse. Besides, the decimal type should be taken more carefully, when target 
type is Legacy(Decimal), it should be able to accept any precision decimal type.






  was:
Test code like follwing(in blink planner):
{code:java}
                tEnv.sqlUpdate("create table randomSource (" +
                                                "               a varchar(10)," 
+
                                                "               b 
decimal(20,2)" +
                                                "       ) with (" +
                                                "               'type' = 
'random'," +
                                                "               'count' = '10'" 
+
                                                "       )");
                tEnv.sqlUpdate("create table printSink (" +
                                                "               a varchar(10)," 
+
                                                "               b 
decimal(22,2)," +
                                                "               c 
timestamp(3)," +
                                                "       ) with (" +
                                                "       'type' = 'print'" +
                                                "       )");
                tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
                tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation<Row> getRecordType() {
                return getTableSchema().toRowType();
        }
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 
'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

        at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
        at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
        at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
        at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
        at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
        at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
        at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
doesn't consider the different physical and logical type validation logic of 
source and sink.







> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17313
>                 URL: https://issues.apache.org/jira/browse/FLINK-17313
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Terry Wang
>            Priority: Major
>
> Test code like follwing(in blink planner):
> {code:java}
>               tEnv.sqlUpdate("create table randomSource (" +
>                                               "               a varchar(10)," 
> +
>                                               "               b 
> decimal(20,2)" +
>                                               "       ) with (" +
>                                               "               'type' = 
> 'random'," +
>                                               "               'count' = '10'" 
> +
>                                               "       )");
>               tEnv.sqlUpdate("create table printSink (" +
>                                               "               a varchar(10)," 
> +
>                                               "               b 
> decimal(22,2)," +
>                                               "               c 
> timestamp(3)," +
>                                               "       ) with (" +
>                                               "       'type' = 'print'" +
>                                               "       )");
>               tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>               tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation<Row> getRecordType() {
>               return getTableSchema().toRowType();
>       }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>       at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>       at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>       at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>       at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>       at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
> Other type validation exception is similar, I dig into and think it's caused 
> by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the 
> method doesn't consider the different physical and logical type validation 
> logic of source and sink:   logical type should be able to cover the physical 
> type in source, but physical type should be able to cover the logic type in 
> sink vice verse. Besides, the decimal type should be taken more carefully, 
> when target type is Legacy(Decimal), it should be able to accept any 
> precision decimal type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to