Hi,
I am running flink job with following functionality:
1. I consume stream1 and stream2 from two kafka topics and assign the
watermarks to the events of two streams by extracting the timestamps from the
events in streams.
2. Then, I am connecting two streams and calling KeyedCoProcessFunction on
connectedStream.
3. I have processElement1 method and processElement2 methods which receive
the events of two streams 1 and 2 and do the join logic as shown in below code
snippet.
4. I have shared mapstate for two streams.
5. When an event comes to processElement method, I register the callback
time for that message to ensure if corresponding matching message is not
arrived from other stream, I will send the message to sideOutput on invocation
of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due
to which for many messages of stream2 the callback is coming earlier than
registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the
timevalue in event message, for stream1 it;s normal TZ only. Though I am weak
in analysing the timeout formats so could be wrong in analysis this side.
Below is code snippets I have implemented for KeyedCoProcessFunctions and
timestamp calculations and watermarks registrations.
/**
* CoProcessFuntion to process cart and pg messages connected using connect
operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM
jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends
KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper,
ResultMessage> {
private static final Logger logger =
LoggerFactory.getLogger(CartPGCoprocessFunction.class);
/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private static MapState<String, CartPG> cartPgState = null;
/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartPG.class)
);
cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
}
/**
*
* @return
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<ResultMessage> out) throws Exception {
logger.info("On timer called key is {}",ctx.getCurrentKey());
String searchKey = ctx.getCurrentKey();
CartPG cartPg = cartPgState.get(searchKey);
if(Objects.nonNull(cartPg)) {
ctx.output(CartPGSideOutput.getOutputTag(),
cartPgState.get(ctx.getCurrentKey()));
cartPgState.remove(searchKey);
}
}
/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry
is present.
* 2. If present, match, checkDescripancy, process and delete entry from
pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in
cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context,
Collector<ResultMessage> collector) throws Exception {
Long cartEventTimeStamp = context.timestamp();
logger.info("cart time : {} ",cartEventTimeStamp);
context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+
ConfigurationsManager.getMaxWaitTimeForPGMessage());
String searchKey = cartMessage.createJoinStringCondition();
CartPG cartPG = cartPgState.get(searchKey);
if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setCartMessage(cartMessage);
cartPgState.put(searchKey,cartPG);
}
}
/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry
is present.
* 2. If present, match, checkDescripancy, process and delete entry from
cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in
pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context
context, Collector<ResultMessage> collector) throws Exception {
Long pgEventTimeStamp = context.timestamp();
logger.info("pg time : {} ",pgEventTimeStamp);
context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());
String searchKey = pgMessage.createJoinStringCondition();
CartPG cartPG = cartPgState.get(searchKey);
if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage()))
{
generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setPgMessage(pgMessage);
cartPgState.put(searchKey,cartPG);
}
}
/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private void generateResultMessage(CartMessage cartMessage,
PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;
//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT,
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER,
pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());
resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());
resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));
resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
checkDescripancyAndCollectResult(resultMessage,collector);
}
/**
* Evaluate if there is descripancy of any fields between the messages from
two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndCollectResult(ResultMessage resultMessage,
Collector<ResultMessage> collector) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(),
resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
if
(!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(),
resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
}
}
/**
* Connect to cart and pg streams and process
*
* @param cartStream
* @param pgStream
* @return
*/
private SingleOutputStreamOperator<ResultMessage>
connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage>
cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new
PGJoinColumnsSelector())
.process(new CartPGCoprocessFunction());
}
private final static SimpleDateFormat cartInputFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
public static Long extractCartTimeStamp(CartMessage cartMessage){
try {
Date orderTimeStamp =
cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
return orderTimeStamp.getTime();
} catch (ParseException e) {
logger.error("Exception in converting cart message timeStamp..",e);
}
return Instant.now().toEpochMilli();
}
public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
try {
Date orderTimeStamp =
pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
return orderTimeStamp.getTime();
} catch (ParseException e) {
logger.error("Exception in converting pg message timeStamp..",e);
}
return Instant.now().toEpochMilli();
}
private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool
parameter, StreamExecutionEnvironment executionEnvironment) {
//1. Consume cartStream
SingleOutputStreamOperator<CartMessage> cartStream =
executionEnvironment.addSource(createCartConsumer());
cartStream.name(Constants.CART_SYSTEM);
//2. Filter cart messages
SingleOutputStreamOperator<CartMessage> filteredCartStream =
cartStream.filter(new CartFilterFunction()) ;
//3. Map carts data
filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);
//4. Assign timestamps and watermarks
filteredCartStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS))))
{
@Override
public long extractTimestamp(CartMessage cartMessage) {
return DateTimeUtils.extractCartTimeStamp(cartMessage);
}
});
return filteredCartStream;
}
private SingleOutputStreamOperator<PaymentNotifyRequestWrapper>
processPgStream(ParameterTool parameter, StreamExecutionEnvironment
executionEnvironment) {
//1. Consume pg streams
SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream =
executionEnvironment.addSource(createPGConsumer());
pgStream.name(Constants.PG_SYSTEM);
//2. Assign timestamps and watermarks to pg messages
pgStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS))))
{
@Override
public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
return DateTimeUtils.extractPGTimeStamp(pgMessage);
}
});
return pgStream;
}
Can anyone please help what can be the issue here and if there is somewrong
time values handled in the code here.
Help will be highly appreciated.