Hi Fabian, Thanks for the information. I have been reading about it and doing the same as a part of flink job written in Java
I am using proctime for both the tables. Can you please verify once the implementation of temporal tables here is the snippet. ---------------------------- public class StreamingJob { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Properties kafkaConsumerProperties = new Properties(); kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cg54"); kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); DataStream<String> badipStream = env.addSource(new FlinkKafkaConsumer<>("badips", new SimpleStringSchema(), kafkaConsumerProperties)); DataStream<String> badipStreamM = badipStream .map(new MapFunction<String, String>() { private static final long serialVersionUID = -686775202L; @Override public String map(String value) throws Exception { try { String[] v = value.split("\\t"); if(v.length > 1) { return v[0].toString(); } else return "0.0.0.0"; } catch (Exception e) { System.err.println(e); return "0.0.0.0"; } } }); Table badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip, r_proctime.proctime");* tableEnv.registerTable("BadIP", badipTable); TemporalTableFunction badIPTT = badipTable.createTemporalTableFunction("r_proctime", "bad_ip"); tableEnv.registerFunction("BadIPTT", badIPTT); DataStream<ObjectNode> inKafkaStream = env .addSource(new FlinkKafkaConsumer<>("tests", new JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties)); DataStream<Tuple2<String,String>> inKafkaStreamM = inKafkaStream .rebalance() .filter(value -> value != null) .map(new MapFunction<ObjectNode, Tuple2<String,String>>() { private static final long serialVersionUID = -6867120202L; @Override public Tuple2<String,String> map(ObjectNode node) throws Exception { try { ObjectNode nodeValue = (ObjectNode) node.get("value"); return new Tuple2<>(nodeValue.get("source.ip").asText(), nodeValue.get("destination.ip").asText()); } catch (Exception e) { System.err.println(e); System.out.println(node); return null; } } }); Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp, destinationIp, k_proctime.proctime"*); tableEnv.registerTable("KafkaSource", kafkaSource); * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");* TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.STRING()); DataStream<Tuple2<String,String>> outStreamMalicious = tableEnv.toAppendStream(resultKafkaMalicious, tupleType); Properties kafkaProducerProperties = new Properties(); kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); ObjectMapper mapper = new ObjectMapper(); DataStream<String> sinkStreamMaliciousData = outStreamMalicious .map(new MapFunction<Tuple2<String,String>,String>() { private static final long serialVersionUID = -6347120202L; @Override public String map(Tuple2<String,String> tuple) throws Exception { try { ObjectNode node = mapper.createObjectNode(); node.put("source.ip", tuple.f0); node.put("destination.ip", tuple.f1); return node.toString(); } catch (Exception e) { System.err.println(e); System.out.println(tuple); return null; } } }); sinkStreamMaliciousData.addSink(new FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(), kafkaProducerProperties)); env.execute("Flink List Matching"); } ------------------------------------------------------- On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Nishant, > > You should model the query as a join with a time-versioned table [1]. > The bad-ips table would be the time-time versioned table [2]. > Since it is a time-versioned table, it could even be updated with new IPs. > > This type of join will only keep the time-versioned table (the bad-ips in > state) and not the other (high-volume) table. > > Best, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html > > Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta < > nishantgupta1...@gmail.com>: > >> Hi Fabian, >> >> Thanks for your reply >> I have a continuous stream of kafka coming and static table of badips. I >> wanted to segregate records having bad ip. >> >> So therefore i was joining it. But with that 60 gb memory getting run out >> >> So i used below query. >> Can u please guide me in this regard >> >> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> The query that you wrote is not a time-windowed join. >>> >>> INSERT INTO sourceKafkaMalicious >>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>> sourceKafka.`source.ip`=badips.ip >>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>> >>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time >>> (or event time) attribute of badips. >>> >>> What exactly are you trying to achieve with the query? >>> >>> Best, Fabian >>> >>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < >>> nishantgupta1...@gmail.com>: >>> >>>> Hi Team, >>>> >>>> I am running a query for Time Window Join as below >>>> >>>> INSERT INTO sourceKafkaMalicious >>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>>> sourceKafka.`source.ip`=badips.ip >>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>>> >>>> Time windowed join, Flink SQL should automatically clear older records, >>>> Some >>>> how the query does not clear the heapspace and fails with error after >>>> sometime. >>>> >>>> Can you please let me know what could go wrong, or is it a issue >>>> >>>> Environment File chunks >>>> >>>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> tables: >>>> - name: sourceKafka >>>> type: source-table >>>> update-mode: append >>>> connector: >>>> type: kafka >>>> version: "universal" >>>> topic: test-data-flatten >>>> properties: >>>> - key: zookeeper.connect >>>> value: x.x.x.x:2181 >>>> - key: bootstrap.servers >>>> value: x.x.x.x:9092 >>>> - key: group.id >>>> value: testgroup >>>> format: >>>> type: json >>>> fail-on-missing-field: false >>>> json-schema: > >>>> { >>>> type: 'object', >>>> properties: { >>>> 'source.ip': { >>>> type: 'string' >>>> }, >>>> 'source.port': { >>>> type: 'string' >>>> } >>>> } >>>> } >>>> derive-schema: false >>>> schema: >>>> - name: ' source.ip ' >>>> type: VARCHAR >>>> - name: 'source.port' >>>> type: VARCHAR >>>> >>>> - name: sourceKafkaMalicious >>>> type: sink-table >>>> update-mode: append >>>> connector: >>>> type: kafka >>>> version: "universal" >>>> topic: test-data-mal >>>> properties: >>>> - key: zookeeper.connect >>>> value: x.x.x.x:2181 >>>> - key: bootstrap.servers >>>> value: x.x.x.x:9092 >>>> - key: group.id >>>> value: testgroupmal >>>> format: >>>> type: json >>>> fail-on-missing-field: false >>>> json-schema: > >>>> { >>>> type: 'object', >>>> properties: { >>>> 'source.ip': { >>>> type: 'string' >>>> }, >>>> 'source.port': { >>>> type: 'string' >>>> } >>>> } >>>> } >>>> derive-schema: false >>>> schema: >>>> - name: ' source.ip ' >>>> type: VARCHAR >>>> - name: 'source.port' >>>> type: VARCHAR >>>> >>>> - name: badips >>>> type: source-table >>>> #update-mode: append >>>> connector: >>>> type: filesystem >>>> path: "/home/cyanadmin/ipsum/levels/badips.csv" >>>> format: >>>> type: csv >>>> fields: >>>> - name: ip >>>> type: VARCHAR >>>> comment-prefix: "#" >>>> schema: >>>> - name: ip >>>> type: VARCHAR >>>> >>>> execution: >>>> planner: blink >>>> type: streaming >>>> time-characteristic: event-time >>>> periodic-watermarks-interval: 200 >>>> result-mode: table >>>> max-table-result-rows: 1000000 >>>> parallelism: 3 >>>> max-parallelism: 128 >>>> min-idle-state-retention: 0 >>>> max-idle-state-retention: 0 >>>> restart-strategy: >>>> type: fallback >>>> >>>> configuration: >>>> table.optimizer.join-reorder-enabled: true >>>> table.exec.spill-compression.enabled: true >>>> table.exec.spill-compression.block-size: 128kb >>>> Properties that describe the cluster to which table programs are >>>> submitted to. >>>> >>>> deployment: >>>> response-timeout: 5000 >>>> >>>> >>>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> >>>