Hi Fabian,

I am facing an issue if run multiple queries like this:
Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");
Table resultKafkaSafe = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp<>B.bad_ip");

(And also now heap space issue is not coming with temporal tables)
Error:
-------
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Only single column join key is supported. Found [] in [InnerJoin(where:
(AND(__TEMPORAL_JOIN_CONDITION(k_proctime, bad_ip), <>(sourceIp, bad_ip))),
join: (sourceIp, destinationIp, k_proctime, bad_ip))]
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:198)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:149)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:101)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:168)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:277)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$.create(DataStreamTemporalJoinToCoProcessTranslator.scala:117)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin.createTranslator(DataStreamTemporalTableJoin.scala:77)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:110)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:98)
at
org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:250)
at
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:431)
at
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:421)
at org.apache.flink.table.planner.StreamPlanner.org
$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
at
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at flink.StreamingJob.main(StreamingJob.java:140)

On Fri, Sep 20, 2019 at 8:26 PM Nishant Gupta <nishantgupta1...@gmail.com>
wrote:

> Use case is similar. But not able to check heap space issue, as data size
> is small. Thought of mean while checking with you.
>
>
> Thanks for looking into it. Really appreciate it.
> I have marked the usage of temporal tables in bold red for ease of
> reference.
>
> On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> This looks OK on the first sight.
>> Is it doing what you expect?
>>
>> Fabian
>>
>> Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
>> nishantgupta1...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the information.
>>> I have been reading about it and doing the same as a part of flink job
>>> written in Java
>>>
>>> I am using proctime for both the tables. Can you please verify once the
>>> implementation of temporal tables
>>>
>>>
>>> here is the snippet.
>>> ----------------------------
>>> public class StreamingJob {
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> ParameterTool params = ParameterTool.fromArgs(args);
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>
>>> Properties kafkaConsumerProperties = new Properties();
>>>
>>> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>>> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>>> "cg54");
>>> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>> "latest");
>>>
>>>
>>> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>>
>>> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>>
>>> DataStream<String> badipStream = env.addSource(new
>>> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
>>> kafkaConsumerProperties));
>>>
>>> DataStream<String> badipStreamM =
>>> badipStream
>>> .map(new MapFunction<String, String>() {
>>>        private static final long serialVersionUID = -686775202L;
>>>        @Override
>>>        public String map(String value) throws Exception {
>>>         try {
>>>         String[] v = value.split("\\t");
>>>     if(v.length > 1) {
>>>     return v[0].toString();
>>>     } else
>>>     return "0.0.0.0";
>>>         } catch (Exception e) {
>>> System.err.println(e);
>>> return "0.0.0.0";
>>>         }
>>>        }
>>>    });
>>>
>>> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
>>> r_proctime.proctime");*
>>>
>>> tableEnv.registerTable("BadIP", badipTable);
>>> TemporalTableFunction badIPTT =
>>> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
>>> tableEnv.registerFunction("BadIPTT", badIPTT);
>>>
>>>
>>>
>>> DataStream<ObjectNode> inKafkaStream = env
>>> .addSource(new FlinkKafkaConsumer<>("tests", new
>>> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
>>> DataStream<Tuple2<String,String>> inKafkaStreamM =
>>> inKafkaStream
>>> .rebalance()
>>> .filter(value -> value != null)
>>> .map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
>>>        private static final long serialVersionUID = -6867120202L;
>>>        @Override
>>>        public Tuple2<String,String> map(ObjectNode node) throws
>>> Exception {
>>>         try {
>>>         ObjectNode nodeValue = (ObjectNode) node.get("value");
>>>             return new Tuple2<>(nodeValue.get("source.ip").asText(),
>>> nodeValue.get("destination.ip").asText());
>>>         } catch (Exception e) {
>>> System.err.println(e);
>>> System.out.println(node);
>>> return null;
>>>         }
>>>        }
>>>    });
>>>
>>> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
>>> destinationIp, k_proctime.proctime"*);
>>> tableEnv.registerTable("KafkaSource", kafkaSource);
>>> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
>>> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
>>> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>>>
>>> TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
>>>  Types.STRING(),
>>>  Types.STRING());
>>>
>>> DataStream<Tuple2<String,String>> outStreamMalicious =
>>> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>>>
>>> Properties kafkaProducerProperties = new Properties();
>>>
>>> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>>> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
>>> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>>>
>>> ObjectMapper mapper = new ObjectMapper();
>>> DataStream<String> sinkStreamMaliciousData = outStreamMalicious
>>> .map(new MapFunction<Tuple2<String,String>,String>() {
>>> private static final long serialVersionUID = -6347120202L;
>>> @Override
>>> public String map(Tuple2<String,String> tuple) throws Exception {
>>> try {
>>> ObjectNode node = mapper.createObjectNode();
>>> node.put("source.ip", tuple.f0);
>>> node.put("destination.ip", tuple.f1);
>>> return node.toString();
>>> } catch (Exception e) {
>>> System.err.println(e);
>>> System.out.println(tuple);
>>> return null;
>>> }
>>> }
>>> });
>>>
>>>
>>> sinkStreamMaliciousData.addSink(new
>>> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
>>> kafkaProducerProperties));
>>> env.execute("Flink List Matching");
>>> }
>>> -------------------------------------------------------
>>>
>>> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Nishant,
>>>>
>>>> You should model the query as a join with a time-versioned table [1].
>>>> The bad-ips table would be the time-time versioned table [2].
>>>> Since it is a time-versioned table, it could even be updated with new
>>>> IPs.
>>>>
>>>> This type of join will only keep the time-versioned table (the bad-ips
>>>> in state) and not the other (high-volume) table.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>>>>
>>>> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
>>>> nishantgupta1...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for your reply
>>>>> I have a continuous stream of kafka coming and static table of badips.
>>>>> I wanted to segregate records having bad ip.
>>>>>
>>>>> So therefore i was joining it. But with that 60 gb memory getting run
>>>>> out
>>>>>
>>>>> So i used below query.
>>>>> Can u please guide me in this regard
>>>>>
>>>>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The query that you wrote is not a time-windowed join.
>>>>>>
>>>>>> INSERT INTO sourceKafkaMalicious
>>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>>> sourceKafka.`source.ip`=badips.ip
>>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>>
>>>>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing
>>>>>> time (or event time) attribute of badips.
>>>>>>
>>>>>> What exactly are you trying to achieve with the query?
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>>>>> nishantgupta1...@gmail.com>:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I am running a query for Time Window Join as below
>>>>>>>
>>>>>>> INSERT INTO sourceKafkaMalicious
>>>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>>>> sourceKafka.`source.ip`=badips.ip
>>>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>>>
>>>>>>> Time windowed join, Flink SQL should automatically clear older
>>>>>>> records, Some how the query does not clear the heapspace and fails
>>>>>>> with error after sometime.
>>>>>>>
>>>>>>> Can you please let me know what could go wrong, or is it a issue
>>>>>>>
>>>>>>> Environment File chunks
>>>>>>>
>>>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> tables:
>>>>>>>   - name: sourceKafka
>>>>>>>     type: source-table
>>>>>>>     update-mode: append
>>>>>>>     connector:
>>>>>>>       type: kafka
>>>>>>>       version: "universal"
>>>>>>>       topic: test-data-flatten
>>>>>>>       properties:
>>>>>>>         - key: zookeeper.connect
>>>>>>>           value: x.x.x.x:2181
>>>>>>>         - key: bootstrap.servers
>>>>>>>           value: x.x.x.x:9092
>>>>>>>         - key: group.id
>>>>>>>           value: testgroup
>>>>>>>     format:
>>>>>>>       type: json
>>>>>>>       fail-on-missing-field: false
>>>>>>>       json-schema: >
>>>>>>>         {
>>>>>>>           type: 'object',
>>>>>>>           properties: {
>>>>>>>             'source.ip': {
>>>>>>>                type: 'string'
>>>>>>>             },
>>>>>>>             'source.port': {
>>>>>>>                type: 'string'
>>>>>>>             }
>>>>>>>           }
>>>>>>>         }
>>>>>>>       derive-schema: false
>>>>>>>     schema:
>>>>>>>       - name: ' source.ip '
>>>>>>>         type: VARCHAR
>>>>>>>       - name: 'source.port'
>>>>>>>         type: VARCHAR
>>>>>>>
>>>>>>>   - name: sourceKafkaMalicious
>>>>>>>     type: sink-table
>>>>>>>     update-mode: append
>>>>>>>     connector:
>>>>>>>       type: kafka
>>>>>>>       version: "universal"
>>>>>>>       topic: test-data-mal
>>>>>>>       properties:
>>>>>>>         - key: zookeeper.connect
>>>>>>>           value: x.x.x.x:2181
>>>>>>>         - key: bootstrap.servers
>>>>>>>           value: x.x.x.x:9092
>>>>>>>         - key: group.id
>>>>>>>           value: testgroupmal
>>>>>>>     format:
>>>>>>>       type: json
>>>>>>>       fail-on-missing-field: false
>>>>>>>       json-schema: >
>>>>>>>         {
>>>>>>>           type: 'object',
>>>>>>>           properties: {
>>>>>>>             'source.ip': {
>>>>>>>                type: 'string'
>>>>>>>             },
>>>>>>>             'source.port': {
>>>>>>>                type: 'string'
>>>>>>>             }
>>>>>>>           }
>>>>>>>         }
>>>>>>>       derive-schema: false
>>>>>>>     schema:
>>>>>>>       - name: ' source.ip '
>>>>>>>         type: VARCHAR
>>>>>>>       - name: 'source.port'
>>>>>>>         type: VARCHAR
>>>>>>>
>>>>>>>   - name: badips
>>>>>>>     type: source-table
>>>>>>>     #update-mode: append
>>>>>>>     connector:
>>>>>>>       type: filesystem
>>>>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>>>>     format:
>>>>>>>       type: csv
>>>>>>>       fields:
>>>>>>>         - name: ip
>>>>>>>           type: VARCHAR
>>>>>>>       comment-prefix: "#"
>>>>>>>     schema:
>>>>>>>       - name: ip
>>>>>>>         type: VARCHAR
>>>>>>>
>>>>>>> execution:
>>>>>>>   planner: blink
>>>>>>>   type: streaming
>>>>>>>   time-characteristic: event-time
>>>>>>>   periodic-watermarks-interval: 200
>>>>>>>   result-mode: table
>>>>>>>   max-table-result-rows: 1000000
>>>>>>>   parallelism: 3
>>>>>>>   max-parallelism: 128
>>>>>>>   min-idle-state-retention: 0
>>>>>>>   max-idle-state-retention: 0
>>>>>>>   restart-strategy:
>>>>>>>     type: fallback
>>>>>>>
>>>>>>> configuration:
>>>>>>>   table.optimizer.join-reorder-enabled: true
>>>>>>>   table.exec.spill-compression.enabled: true
>>>>>>>   table.exec.spill-compression.block-size: 128kb
>>>>>>>  Properties that describe the cluster to which table programs are
>>>>>>> submitted to.
>>>>>>>
>>>>>>> deployment:
>>>>>>>   response-timeout: 5000
>>>>>>>
>>>>>>>
>>>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>

Reply via email to