If I go with table apis, can I write the streams to hive or it is only for 
batch processing as of now.

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Khachatryan Roman <khachatryan.ro...@gmail.com>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <jaswin.s...@outlook.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Not able to implement an usecase

Hi Jaswin,

Currently, DataStream API doesn't support outer joins.
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API 
[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html

Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah 
<jaswin.s...@outlook.com<mailto:jaswin.s...@outlook.com>> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process 
function catching up the discrepant results on joining. Joining is done on key 
orderId. Now, I want to identify all the messages in both datastreams which are 
not joined. Means, for a message in left stream if I do not find any message in 
right stream over the interval defined, then, that message should be caught and 
same for right stream if there are messages which do not have corresponding 
messages in left streams then, catch them.Need an help how can I achieve the 
use case. I know this can be done with outer join but interval join or tumbling 
event time window joins only support inner join as per my knowledge. I do not 
want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of 
the cases where large large delay can happen and messages in left or right 
streams are missing are not getting supported with my this implementaions:

/**
 * Join cart and pg streams on mid and orderId, and the interval specified.
 *
 * @param leftStream
 * @param rightStream
 * @return
 */
public SingleOutputStreamOperator<ResultMessage> 
intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, 
DataStream<PGMessage> rightStream, ParameterTool parameter) {
    //Descripant results are sent to kafka from CartPGProcessFunction.
    return leftStream
        .keyBy(new CartJoinColumnsSelector())
        .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
        
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
 
Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
        .process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the 
datastreams I am reading from kafka to hive which I want to batch process with 
Flink

Please help me on resolving this use cases.

Thanks,
Jaswin


Get Outlook for Android<https://aka.ms/ghei36>

Reply via email to