You can try to use UDTF
------------------ Original ------------------ From: srikanth flink <flink.d...@gmail.com> Date: Mon,Sep 16,2019 9:23 PM To: dev <dev@flink.apache.org>, user <u...@flink.apache.org>, user-ml <user...@flink.apache.org> Subject: Re: Can I do a lookup in FlinkSQL? Hi there, I'm working with streaming in FlinkSQL. I've two tables created one with dynamic stream and the other a periodic updates. I would like to keep the periodic table a static(but updates with new data every day or so by flushing the old), So at any point of time the static table should contain new set of data. With dynamic table being populated with stream data, could I do a lookup on a column of static table to find if the value exists. This is what I have done: dynamic table: sourceKafka static table: badips Trying to build a list, kind of using ROW() function and done. From dynamic table, trying to lookup into the list if the value exists. Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips); Resonse: [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN operator must have compatible types Is it possible to solve my use case? If so, where am I going wrong? Thanks Srikanth