Hi Rakesh,

Could you explain a little bit what is the actual problem? What do you
expect as the ouput and what actually happens? It is hard to guess what
is the problem you're facing.

Best,

Dawid

On 03/12/2018 12:19, Rakesh Kumar wrote:
>
> Hello Team,
>
>
> public class FlinkJoinDataStream {
>
>
> @SuppressWarnings("serial")
>
> public static void main(String[] args) {
>
>
> Properties props = new Properties();
>
> props.setProperty("zookeeper.connect", "localhost:2181");
>
> props.setProperty("bootstrap.servers", "localhost:9092");
>
> props.setProperty("group.id <http://group.id>", "myGroup");
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.enableCheckpointing(1000);
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> DataStream<Tuple4<Integer, Integer, String, Long>> order_details =
> env.addSource(new FlinkKafkaConsumer010<String>("test1", new
> SimpleStringSchema(), props)).map(new Mapper1());
>
>
> DataStream<Tuple4<Integer, Integer, String, Long>> invoice_details =
> env.addSource(new FlinkKafkaConsumer010<String>("test2", new
> SimpleStringSchema(), props)).map(new Mapper2());
>
> longmaxOutOfOrderness=550000L;
>
>
> DataStream<Tuple4<Integer, Integer, String, Long>> invoice_watermark =
> invoice_details.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<Tuple4<Integer, Integer, String, Long>>(){
>
>
> longcurrentTimestamp;
>
>
> @Override
>
> public long extractTimestamp(Tuple4<Integer, Integer, String, Long>
> element, long previousElementTimestamp) {
>
> currentTimestamp= element.f3;
>
> returncurrentTimestamp;
>
> }
>
>
> @Override
>
> public Watermark getCurrentWatermark() {
>
> return new Watermark(currentTimestamp);
>
> }
>
> });
>
> invoice_watermark.print();
>
> DataStream<Tuple4<Integer, Integer, String, Long>> order_watermark =
> order_details.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<Tuple4<Integer,Integer,String,Long>>() {
>
> longcurrentTimestamp;
>
> @Override
>
> public long extractTimestamp(Tuple4<Integer, Integer, String, Long>
> element, long previousElementTimestamp) {
>
> currentTimestamp= element.f3;
>
> returncurrentTimestamp;
>
> }
>
> @Override
>
> public Watermark getCurrentWatermark() {
>
> return new Watermark(currentTimestamp-maxOutOfOrderness);
>
> }
>
> });
>
> order_watermark.print();
>
> DataStream<Tuple4<Integer, Integer, String, Integer>> joinedData =
> order_watermark.keyBy(0).join(invoice_watermark.keyBy(0))
>
> .where(new KeySelector<Tuple4<Integer, Integer, String, Long>,
> Integer>() {
>
> @Override
>
> public Integer getKey(
>
> Tuple4<Integer, Integer, String, Long>value)
>
> throws Exception {
>
> return value.f0;
>
> }
>
> })
>
> .equalTo(new KeySelector<Tuple4<Integer, Integer, String, Long>,
> Integer>() {
>
>
> @Override
>
> public Integer getKey(Tuple4<Integer, Integer, String, Long> value)
> throws Exception {
>
> return value.f0;
>
> }
>
> })
>
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>
> .apply(new JoinFunction<Tuple4<Integer, Integer, String, Long>,
> Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer,
> String,Integer>>() {
>
>
> @Override
>
> public Tuple4<Integer, Integer, String,Integer> join(
>
> Tuple4<Integer, Integer, String, Long> first,
>
> Tuple4<Integer, Integer, String, Long> second) throws Exception {
>
> return new Tuple4<Integer, Integer,
> String,Integer>(first.f0,first.f1,first.f2,second.f1);
>
> }
>
> });
>
> joinedData.print();
>
> try {
>
> env.execute();
>
> } catch (Exception e) {
>
> e.printStackTrace();
>
> }
>
> }
>
> private static class Mapper1 implements MapFunction<String,
> Tuple4<Integer, Integer, String, Long>>{
>
>
> privatestaticfinallongserialVersionUID= 1L;
>
> //{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"}
>
> @Override
>
> public Tuple4<Integer, Integer, String, Long> map(String value) throws
> Exception {
>
> JSONObject jsonObject = new JSONObject(value);
>
> final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");
>
>
> return new Tuple4<Integer, Integer, String, Long>(
>
> jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"),
>
> jsonObject.getString("tstamp_trans"),
>
> dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);
>
> }
>
> }
>
> private static class Mapper2 implements MapFunction<String,
> Tuple4<Integer, Integer, String, Long>>{
>
>
> privatestaticfinallongserialVersionUID= 1L;
>
> //{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"}
>
>
> @Override
>
> public Tuple4<Integer, Integer, String, Long> map(String value) throws
> Exception {
>
> JSONObject jsonObject = new JSONObject(value);
>
> final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");
>
>
> return new Tuple4<Integer, Integer, String, Long>(
>
> jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"),
>
> jsonObject.getString("tstamp_trans"),
>
> dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);
>
> }
>
> }
>
>
> }
>
>
> *_If I'm reading the same data using collection, everything is working
> fine:_*
>
>
> private static List<String> createOrderRecords() {
>
> List<String>orderRecords=new ArrayList<>();
>
> orderRecords.add("{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}");
>
> orderRecords.add("{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}");
>
> returnorderRecords;
>
> }
>
> private static List<String> createInvoiceRecords() {
>
> List<String>invoiceRecords=new ArrayList<>();
>
> invoiceRecords.add("{\"order_id\":312,\"invoice_status\":1,\"tstamp_trans\":\"20181130090300\"}");
>
> invoiceRecords.add("{\"order_id\":318,\"invoice_status\":1,\"tstamp_trans\":\"20181130099000\"}");
>
> invoiceRecords.add("{\"order_id\":317,\"invoice_status\":1,\"tstamp_trans\":\"20181130096300\"}");
>
> invoiceRecords.add("{\"order_id\":311,\"invoice_status\":1,\"tstamp_trans\":\"20181130050300\"}");
>
> returninvoiceRecords;
>
> }
>
>
> If I'm excluding Kafka as data source and these collections as data
> source then thing's working fine.
>
>
> Thank you,
>
> Rakesh Kumar
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to