dear,
i have two problems now:
1. when i stop a flink job using command "yarn application -kill <appId&gt;" or 
"echo 'stop' | ./bin/yarn-session.sh -id <appId&gt;",a new flink job is 
started&nbsp;automatically sometimes??how can i aviod this? flink version 1.11


2.how should i write the DDL of a sink table when one of the column type is 
POJO(such as JSONObject)?
i tried this:
CREATE TABLE test (name string,age int,json any)
        WITH (
          'connector' = 'print',
          'hostname' = 'localhost',
          'port' = '9999',
          'byte-delimiter' = '10',
          'format' = 'changelog-csv',
          'changelog-csv.column-delimiter' = '|'
        )and got:Exception in thread "main" 
org.apache.flink.table.api.TableException: Type is not supported: ANY       at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
   at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType(FlinkTypeFactory.scala)
        at 
org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.appendDerivedColumns(MergeTableLikeUtil.java:408)
         at 
org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.access$000(MergeTableLikeUtil.java:201)
   at 
org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:143)
        at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:117)
       at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
  at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)  
     at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
   at userSink.TestSink.main(TestSink.java:29)

Reply via email to