使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?


场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
    "logType":"syslog",
    "message":"sla;flkdsjf"
}
{
    "logType":"alarm",
    "message":"sla;flkdsjf"
}
      t_env.from_path("source")\
          .filter("logType=syslog")\
          .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)


如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:


      t_env.from_path("source")\
          .filter("logType=syslog")\
          .insert_into("sink1")\
          .filter("logType=alarm")\
          .insert_into("sink2")
请各位大牛指点,感谢





Reply via email to