Hi Fabian,

Thanks for the information.
I have been reading about it and doing the same as a part of flink job
written in Java

I am using proctime for both the tables. Can you please verify once the
implementation of temporal tables


here is the snippet.
----------------------------
public class StreamingJob {

public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cg54");
kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");

kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

DataStream<String> badipStream = env.addSource(new
FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
kafkaConsumerProperties));

DataStream<String> badipStreamM =
badipStream
.map(new MapFunction<String, String>() {
       private static final long serialVersionUID = -686775202L;
       @Override
       public String map(String value) throws Exception {
        try {
        String[] v = value.split("\\t");
    if(v.length > 1) {
    return v[0].toString();
    } else
    return "0.0.0.0";
        } catch (Exception e) {
System.err.println(e);
return "0.0.0.0";
        }
       }
   });

Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
r_proctime.proctime");*

tableEnv.registerTable("BadIP", badipTable);
TemporalTableFunction badIPTT =
badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
tableEnv.registerFunction("BadIPTT", badIPTT);



DataStream<ObjectNode> inKafkaStream = env
.addSource(new FlinkKafkaConsumer<>("tests", new
JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
DataStream<Tuple2<String,String>> inKafkaStreamM =
inKafkaStream
.rebalance()
.filter(value -> value != null)
.map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
       private static final long serialVersionUID = -6867120202L;
       @Override
       public Tuple2<String,String> map(ObjectNode node) throws Exception {
        try {
        ObjectNode nodeValue = (ObjectNode) node.get("value");
            return new Tuple2<>(nodeValue.get("source.ip").asText(),
nodeValue.get("destination.ip").asText());
        } catch (Exception e) {
System.err.println(e);
System.out.println(node);
return null;
        }
       }
   });

Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
destinationIp, k_proctime.proctime"*);
tableEnv.registerTable("KafkaSource", kafkaSource);
* Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*

TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
 Types.STRING(),
 Types.STRING());

DataStream<Tuple2<String,String>> outStreamMalicious =
tableEnv.toAppendStream(resultKafkaMalicious, tupleType);

Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

ObjectMapper mapper = new ObjectMapper();
DataStream<String> sinkStreamMaliciousData = outStreamMalicious
.map(new MapFunction<Tuple2<String,String>,String>() {
private static final long serialVersionUID = -6347120202L;
@Override
public String map(Tuple2<String,String> tuple) throws Exception {
try {
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", tuple.f0);
node.put("destination.ip", tuple.f1);
return node.toString();
} catch (Exception e) {
System.err.println(e);
System.out.println(tuple);
return null;
}
}
});


sinkStreamMaliciousData.addSink(new
FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
kafkaProducerProperties));
env.execute("Flink List Matching");
}
-------------------------------------------------------

On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Nishant,
>
> You should model the query as a join with a time-versioned table [1].
> The bad-ips table would be the time-time versioned table [2].
> Since it is a time-versioned table, it could even be updated with new IPs.
>
> This type of join will only keep the time-versioned table (the bad-ips in
> state) and not the other (high-volume) table.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>
> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
> nishantgupta1...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for your reply
>> I have a continuous stream of kafka coming and static table of badips. I
>> wanted to segregate records having bad ip.
>>
>> So therefore i was joining it. But with that 60 gb memory getting run out
>>
>> So i used below query.
>> Can u please guide me in this regard
>>
>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The query that you wrote is not a time-windowed join.
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>>> (or event time) attribute of badips.
>>>
>>> What exactly are you trying to achieve with the query?
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>> nishantgupta1...@gmail.com>:
>>>
>>>> Hi Team,
>>>>
>>>> I am running a query for Time Window Join as below
>>>>
>>>> INSERT INTO sourceKafkaMalicious
>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>> sourceKafka.`source.ip`=badips.ip
>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>
>>>> Time windowed join, Flink SQL should automatically clear older records, 
>>>> Some
>>>> how the query does not clear the heapspace and fails with error after
>>>> sometime.
>>>>
>>>> Can you please let me know what could go wrong, or is it a issue
>>>>
>>>> Environment File chunks
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> tables:
>>>>   - name: sourceKafka
>>>>     type: source-table
>>>>     update-mode: append
>>>>     connector:
>>>>       type: kafka
>>>>       version: "universal"
>>>>       topic: test-data-flatten
>>>>       properties:
>>>>         - key: zookeeper.connect
>>>>           value: x.x.x.x:2181
>>>>         - key: bootstrap.servers
>>>>           value: x.x.x.x:9092
>>>>         - key: group.id
>>>>           value: testgroup
>>>>     format:
>>>>       type: json
>>>>       fail-on-missing-field: false
>>>>       json-schema: >
>>>>         {
>>>>           type: 'object',
>>>>           properties: {
>>>>             'source.ip': {
>>>>                type: 'string'
>>>>             },
>>>>             'source.port': {
>>>>                type: 'string'
>>>>             }
>>>>           }
>>>>         }
>>>>       derive-schema: false
>>>>     schema:
>>>>       - name: ' source.ip '
>>>>         type: VARCHAR
>>>>       - name: 'source.port'
>>>>         type: VARCHAR
>>>>
>>>>   - name: sourceKafkaMalicious
>>>>     type: sink-table
>>>>     update-mode: append
>>>>     connector:
>>>>       type: kafka
>>>>       version: "universal"
>>>>       topic: test-data-mal
>>>>       properties:
>>>>         - key: zookeeper.connect
>>>>           value: x.x.x.x:2181
>>>>         - key: bootstrap.servers
>>>>           value: x.x.x.x:9092
>>>>         - key: group.id
>>>>           value: testgroupmal
>>>>     format:
>>>>       type: json
>>>>       fail-on-missing-field: false
>>>>       json-schema: >
>>>>         {
>>>>           type: 'object',
>>>>           properties: {
>>>>             'source.ip': {
>>>>                type: 'string'
>>>>             },
>>>>             'source.port': {
>>>>                type: 'string'
>>>>             }
>>>>           }
>>>>         }
>>>>       derive-schema: false
>>>>     schema:
>>>>       - name: ' source.ip '
>>>>         type: VARCHAR
>>>>       - name: 'source.port'
>>>>         type: VARCHAR
>>>>
>>>>   - name: badips
>>>>     type: source-table
>>>>     #update-mode: append
>>>>     connector:
>>>>       type: filesystem
>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>     format:
>>>>       type: csv
>>>>       fields:
>>>>         - name: ip
>>>>           type: VARCHAR
>>>>       comment-prefix: "#"
>>>>     schema:
>>>>       - name: ip
>>>>         type: VARCHAR
>>>>
>>>> execution:
>>>>   planner: blink
>>>>   type: streaming
>>>>   time-characteristic: event-time
>>>>   periodic-watermarks-interval: 200
>>>>   result-mode: table
>>>>   max-table-result-rows: 1000000
>>>>   parallelism: 3
>>>>   max-parallelism: 128
>>>>   min-idle-state-retention: 0
>>>>   max-idle-state-retention: 0
>>>>   restart-strategy:
>>>>     type: fallback
>>>>
>>>> configuration:
>>>>   table.optimizer.join-reorder-enabled: true
>>>>   table.exec.spill-compression.enabled: true
>>>>   table.exec.spill-compression.block-size: 128kb
>>>>  Properties that describe the cluster to which table programs are
>>>> submitted to.
>>>>
>>>> deployment:
>>>>   response-timeout: 5000
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>

Reply via email to