I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
    + "    order_id BIGINT,\n"
    + "    order_no VARCHAR,\n"
    + ") WITH (\n"
    + "    'connector.type' = 'kafka',\n"
    ...........
    + "    'update-mode' = 'append',\n"
    + "    'format.type' = 'json',\n"
    + "    'format.derive-schema' = 'true'\n"
    + ")");using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
    "id" : 1,
    "name" : "order_id",
    "column_type" : -5,
    "last_value" : 4606458,
    "value" : 4606458
  }, {
    "id" : 2,
    "name" : "order_no",
    "column_type" : 12,
    "last_value" : "EDBMFSJ00001S2003050006628",
    "value" : "EDBMFSJ00001S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined 
format class.

Thanks,
Lei



wangl...@geekplus.com.cn

Reply via email to