????
??????????????????????????????????????????????kafka????????????3??????????????????????????????????????????????????????????????????140????????????????????????????????????1100??????????????????



????
userbehavior????????
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- ????????????????????????????
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
??ts??????watermark??ts??????????????
) WITH (
    'connector.type' = 'kafka',  -- ???? kafka connector
    'connector.version' = 'universal',  -- kafka ??????universal 
???? 0.11 ??????????
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- ?????? 
offset ????????
    'connector.properties.zookeeper.connect' = '192.168.0.150:2181', 
 -- zookeeper ????
    'connector.properties.bootstrap.servers' = '192.168.0.150:9092', 
 -- kafka broker ????
    'format.type' = 'json'  -- ???????????? json
)

????????????????????
CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- ???? elasticsearch 
connector
    'connector.version' = '6',  -- elasticsearch ??????6 ?????? 
es 6+ ???? 7+ ??????
    'connector.hosts' = 'http://192.168.0.150:9200',  -- 
elasticsearch ????
    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch 
??????????????????????????
    'connector.document-type' = 'user_behavior', -- elasticsearch ?? 
type????????????????????
    'connector.bulk-flush.max-actions' = '1',  -- ??????????????
    'format.type' = 'json',  -- ???????????? json
    'update-mode' = 'append'
)

????????
INSERT INTO buy_cnt_per_hour 
SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*) 
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)

kafka????????????

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.*;


public class UserBehaviorProducer {
    public static final String brokerList = "192.168.0.150:9092";

    //    public static final String topic="user_behavior";
    public static final String topic = "user_behavior";

    public static void main(String args[]) {

        //????????????????????
        //????????????
        Properties properties = new Properties();
        properties.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        //????KafkaProducer ????
        KafkaProducer<String, String&gt; producer = new 
KafkaProducer<&gt;(properties);
        //????????????????
        //{"user_id": "952483", "item_id":"310884", "category_id": "4580532", 
"behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
        //{"user_id": "794777", "item_id":"5119439", "category_id": "982926", 
"behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
        String[] behaviors = {"pv", "buy", "coll", 
"cart"};//????????????????????????????
        JSONObject jsonObject = new JSONObject();
        HashMap<String, String&gt; info = new HashMap<&gt;();
        Random random = new Random();
        SimpleDateFormat format = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
        long date_long=getDate();
        while (true) {
            jsonObject.put("user_id", random.nextInt(900000) + 100000 + "");
            jsonObject.put("item_id", random.nextInt(900000) + 100000 + "");
            jsonObject.put("category_id", random.nextInt(1000) + "");
            jsonObject.put("behavior", behaviors[random.nextInt(4)]);
            jsonObject.put("ts", format.format(new Date(date_long)));
            String msg = jsonObject.toString();
            System.out.println(msg);
            ProducerRecord<String, String&gt; record = new 
ProducerRecord<&gt;(topic, msg);
            producer.send(record);
//            date_long +=500+random.nextGaussian()*1000;
            date_long +=800+random.nextGaussian()*1500;
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    private static long getDate() {
        Date date = new Date();
        Calendar c = Calendar.getInstance();
        c.setTime(date);
        //??????1??,??????????????????????
        c.set(Calendar.DAY_OF_MONTH, 1);
        //????????0
        c.set(Calendar.HOUR_OF_DAY, 0);
        //????????0
        c.set(Calendar.MINUTE, 0);
        //??????0
        c.set(Calendar.SECOND,0);
        //????????0
        c.set(Calendar.MILLISECOND, 0);
        // ??????????????????????????????
        return c.getTimeInMillis();
    }
}

------------------&nbsp;????????&nbsp;------------------
??????:&nbsp;"Jark Wu"<imj...@gmail.com&gt;;
????????:&nbsp;2020??4??18??(??????) ????10:08
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: ????????-flinksql??kafkasource????



Hi,

????????????????Flink 1.10 ???? SQL CLI ?????? CREATE VIEW ?????? 
TableEnvironment ????????????
??????????????????????????????????"????????????????/partition??????????????????????"
 ????????????????????????????????

Best,
Jark

On Sat, 18 Apr 2020 at 18:38, ?????????????? <cai...@qq.com&gt; wrote:

&gt; ????????
&gt; &amp;nbsp; &amp;nbsp;
&gt; 
&amp;nbsp;????????????????????????????????flinkSQL????????????????????????????????????????????????????????????????????????
&gt; &amp;nbsp; &amp;nbsp;
&gt; 
&amp;nbsp;??????????????????sql-client??????kafkasource??????????????kafka????????????????????????????????????????????????????kafka??????????????????????????kafka????????????????????????????????????kafkasource????????????????????????????????????????????????????/partition??????????????????????????????kafkasource??????????watermark??partition????????????????????????????????????
&gt; &amp;nbsp;&amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; 
????????????sql-client??????????view??????????????tableEnv.sqlUpdate("create
&gt; view as ...")????????????????????
&gt; Exception in thread "main" org.apache.flink.table.api.TableException:
&gt; Unsupported query: CREATE VIEW rich_user_behavior AS
&gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp;
&gt; &amp;nbsp; CASE C.parent_category_id
&gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '????????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '????????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '????????'
&gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '????'
&gt; &amp;nbsp; &amp;nbsp; ELSE '????'
&gt; &amp;nbsp; END AS category_name
&gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
&gt; U.proctime AS C
&gt; ON U.category_id = C.sub_category_id
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
&gt; Source)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.Optional.orElseThrow(Optional.java:290)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ??????????????????

Reply via email to