Hi Fabian, I am facing an issue if run multiple queries like this: Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip"); Table resultKafkaSafe = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp<>B.bad_ip");
(And also now heap space issue is not coming with temporal tables) Error: ------- Exception in thread "main" org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [] in [InnerJoin(where: (AND(__TEMPORAL_JOIN_CONDITION(k_proctime, bad_ip), <>(sourceIp, bad_ip))), join: (sourceIp, destinationIp, k_proctime, bad_ip))] at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:198) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:149) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:101) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:168) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:277) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$.create(DataStreamTemporalJoinToCoProcessTranslator.scala:117) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin.createTranslator(DataStreamTemporalTableJoin.scala:77) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:110) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:98) at org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:250) at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:431) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:421) at org.apache.flink.table.planner.StreamPlanner.org $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) at flink.StreamingJob.main(StreamingJob.java:140) On Fri, Sep 20, 2019 at 8:26 PM Nishant Gupta <nishantgupta1...@gmail.com> wrote: > Use case is similar. But not able to check heap space issue, as data size > is small. Thought of mean while checking with you. > > > Thanks for looking into it. Really appreciate it. > I have marked the usage of temporal tables in bold red for ease of > reference. > > On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> This looks OK on the first sight. >> Is it doing what you expect? >> >> Fabian >> >> Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta < >> nishantgupta1...@gmail.com>: >> >>> Hi Fabian, >>> >>> Thanks for the information. >>> I have been reading about it and doing the same as a part of flink job >>> written in Java >>> >>> I am using proctime for both the tables. Can you please verify once the >>> implementation of temporal tables >>> >>> >>> here is the snippet. >>> ---------------------------- >>> public class StreamingJob { >>> >>> public static void main(String[] args) throws Exception { >>> >>> ParameterTool params = ParameterTool.fromArgs(args); >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >>> >>> Properties kafkaConsumerProperties = new Properties(); >>> >>> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); >>> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, >>> "cg54"); >>> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>> "latest"); >>> >>> >>> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); >>> >>> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); >>> >>> DataStream<String> badipStream = env.addSource(new >>> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(), >>> kafkaConsumerProperties)); >>> >>> DataStream<String> badipStreamM = >>> badipStream >>> .map(new MapFunction<String, String>() { >>> private static final long serialVersionUID = -686775202L; >>> @Override >>> public String map(String value) throws Exception { >>> try { >>> String[] v = value.split("\\t"); >>> if(v.length > 1) { >>> return v[0].toString(); >>> } else >>> return "0.0.0.0"; >>> } catch (Exception e) { >>> System.err.println(e); >>> return "0.0.0.0"; >>> } >>> } >>> }); >>> >>> Table badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip, >>> r_proctime.proctime");* >>> >>> tableEnv.registerTable("BadIP", badipTable); >>> TemporalTableFunction badIPTT = >>> badipTable.createTemporalTableFunction("r_proctime", "bad_ip"); >>> tableEnv.registerFunction("BadIPTT", badIPTT); >>> >>> >>> >>> DataStream<ObjectNode> inKafkaStream = env >>> .addSource(new FlinkKafkaConsumer<>("tests", new >>> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties)); >>> DataStream<Tuple2<String,String>> inKafkaStreamM = >>> inKafkaStream >>> .rebalance() >>> .filter(value -> value != null) >>> .map(new MapFunction<ObjectNode, Tuple2<String,String>>() { >>> private static final long serialVersionUID = -6867120202L; >>> @Override >>> public Tuple2<String,String> map(ObjectNode node) throws >>> Exception { >>> try { >>> ObjectNode nodeValue = (ObjectNode) node.get("value"); >>> return new Tuple2<>(nodeValue.get("source.ip").asText(), >>> nodeValue.get("destination.ip").asText()); >>> } catch (Exception e) { >>> System.err.println(e); >>> System.out.println(node); >>> return null; >>> } >>> } >>> }); >>> >>> Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp, >>> destinationIp, k_proctime.proctime"*); >>> tableEnv.registerTable("KafkaSource", kafkaSource); >>> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, >>> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE >>> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");* >>> >>> TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>( >>> Types.STRING(), >>> Types.STRING()); >>> >>> DataStream<Tuple2<String,String>> outStreamMalicious = >>> tableEnv.toAppendStream(resultKafkaMalicious, tupleType); >>> >>> Properties kafkaProducerProperties = new Properties(); >>> >>> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); >>> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); >>> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); >>> >>> ObjectMapper mapper = new ObjectMapper(); >>> DataStream<String> sinkStreamMaliciousData = outStreamMalicious >>> .map(new MapFunction<Tuple2<String,String>,String>() { >>> private static final long serialVersionUID = -6347120202L; >>> @Override >>> public String map(Tuple2<String,String> tuple) throws Exception { >>> try { >>> ObjectNode node = mapper.createObjectNode(); >>> node.put("source.ip", tuple.f0); >>> node.put("destination.ip", tuple.f1); >>> return node.toString(); >>> } catch (Exception e) { >>> System.err.println(e); >>> System.out.println(tuple); >>> return null; >>> } >>> } >>> }); >>> >>> >>> sinkStreamMaliciousData.addSink(new >>> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(), >>> kafkaProducerProperties)); >>> env.execute("Flink List Matching"); >>> } >>> ------------------------------------------------------- >>> >>> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Nishant, >>>> >>>> You should model the query as a join with a time-versioned table [1]. >>>> The bad-ips table would be the time-time versioned table [2]. >>>> Since it is a time-versioned table, it could even be updated with new >>>> IPs. >>>> >>>> This type of join will only keep the time-versioned table (the bad-ips >>>> in state) and not the other (high-volume) table. >>>> >>>> Best, >>>> Fabian >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html >>>> >>>> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta < >>>> nishantgupta1...@gmail.com>: >>>> >>>>> Hi Fabian, >>>>> >>>>> Thanks for your reply >>>>> I have a continuous stream of kafka coming and static table of badips. >>>>> I wanted to segregate records having bad ip. >>>>> >>>>> So therefore i was joining it. But with that 60 gb memory getting run >>>>> out >>>>> >>>>> So i used below query. >>>>> Can u please guide me in this regard >>>>> >>>>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> The query that you wrote is not a time-windowed join. >>>>>> >>>>>> INSERT INTO sourceKafkaMalicious >>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>>>>> sourceKafka.`source.ip`=badips.ip >>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>>>>> >>>>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing >>>>>> time (or event time) attribute of badips. >>>>>> >>>>>> What exactly are you trying to achieve with the query? >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < >>>>>> nishantgupta1...@gmail.com>: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> I am running a query for Time Window Join as below >>>>>>> >>>>>>> INSERT INTO sourceKafkaMalicious >>>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>>>>>> sourceKafka.`source.ip`=badips.ip >>>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>>>>>> >>>>>>> Time windowed join, Flink SQL should automatically clear older >>>>>>> records, Some how the query does not clear the heapspace and fails >>>>>>> with error after sometime. >>>>>>> >>>>>>> Can you please let me know what could go wrong, or is it a issue >>>>>>> >>>>>>> Environment File chunks >>>>>>> >>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>> tables: >>>>>>> - name: sourceKafka >>>>>>> type: source-table >>>>>>> update-mode: append >>>>>>> connector: >>>>>>> type: kafka >>>>>>> version: "universal" >>>>>>> topic: test-data-flatten >>>>>>> properties: >>>>>>> - key: zookeeper.connect >>>>>>> value: x.x.x.x:2181 >>>>>>> - key: bootstrap.servers >>>>>>> value: x.x.x.x:9092 >>>>>>> - key: group.id >>>>>>> value: testgroup >>>>>>> format: >>>>>>> type: json >>>>>>> fail-on-missing-field: false >>>>>>> json-schema: > >>>>>>> { >>>>>>> type: 'object', >>>>>>> properties: { >>>>>>> 'source.ip': { >>>>>>> type: 'string' >>>>>>> }, >>>>>>> 'source.port': { >>>>>>> type: 'string' >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> derive-schema: false >>>>>>> schema: >>>>>>> - name: ' source.ip ' >>>>>>> type: VARCHAR >>>>>>> - name: 'source.port' >>>>>>> type: VARCHAR >>>>>>> >>>>>>> - name: sourceKafkaMalicious >>>>>>> type: sink-table >>>>>>> update-mode: append >>>>>>> connector: >>>>>>> type: kafka >>>>>>> version: "universal" >>>>>>> topic: test-data-mal >>>>>>> properties: >>>>>>> - key: zookeeper.connect >>>>>>> value: x.x.x.x:2181 >>>>>>> - key: bootstrap.servers >>>>>>> value: x.x.x.x:9092 >>>>>>> - key: group.id >>>>>>> value: testgroupmal >>>>>>> format: >>>>>>> type: json >>>>>>> fail-on-missing-field: false >>>>>>> json-schema: > >>>>>>> { >>>>>>> type: 'object', >>>>>>> properties: { >>>>>>> 'source.ip': { >>>>>>> type: 'string' >>>>>>> }, >>>>>>> 'source.port': { >>>>>>> type: 'string' >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> derive-schema: false >>>>>>> schema: >>>>>>> - name: ' source.ip ' >>>>>>> type: VARCHAR >>>>>>> - name: 'source.port' >>>>>>> type: VARCHAR >>>>>>> >>>>>>> - name: badips >>>>>>> type: source-table >>>>>>> #update-mode: append >>>>>>> connector: >>>>>>> type: filesystem >>>>>>> path: "/home/cyanadmin/ipsum/levels/badips.csv" >>>>>>> format: >>>>>>> type: csv >>>>>>> fields: >>>>>>> - name: ip >>>>>>> type: VARCHAR >>>>>>> comment-prefix: "#" >>>>>>> schema: >>>>>>> - name: ip >>>>>>> type: VARCHAR >>>>>>> >>>>>>> execution: >>>>>>> planner: blink >>>>>>> type: streaming >>>>>>> time-characteristic: event-time >>>>>>> periodic-watermarks-interval: 200 >>>>>>> result-mode: table >>>>>>> max-table-result-rows: 1000000 >>>>>>> parallelism: 3 >>>>>>> max-parallelism: 128 >>>>>>> min-idle-state-retention: 0 >>>>>>> max-idle-state-retention: 0 >>>>>>> restart-strategy: >>>>>>> type: fallback >>>>>>> >>>>>>> configuration: >>>>>>> table.optimizer.join-reorder-enabled: true >>>>>>> table.exec.spill-compression.enabled: true >>>>>>> table.exec.spill-compression.block-size: 128kb >>>>>>> Properties that describe the cluster to which table programs are >>>>>>> submitted to. >>>>>>> >>>>>>> deployment: >>>>>>> response-timeout: 5000 >>>>>>> >>>>>>> >>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>> >>>>>>