Re: 想知道local,flink 在window完成时,发送给sink的数据顺序,这个顺序是怎么确定的?

2019-03-01 文章 thinktothi...@gmail.com
 ).明白了整理笔记如下: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-sink-order.md

Sink 接收数据的顺序(Window发送数据顺序)

概述
   
   -
InternalTimerServiceImpl.processingTimeTimersQueue存储着同一个Window中所有Key,取第一个key,调用WindowOperator.onProcessingTime进行处理,并发送给Sink

   -
InternalTimerServiceImpl.processingTimeTimersQueue 
key处理的顺序是,先处理第一个,然后依次把最后一个元素放到第一个元素进行处理

   -
Key,处理的顺序,如 1 2 1 3 2 4 5,就会变成
1
 5
 4
 3
 2


输入数据
1 2 1 3 2 4 5

源码分析

RecordWriter.emit
   
   -
当WordCount中的数据经过Operator(Source,FlatMap,Map) 处理后,通过RecordWriter.emit()函数发射数据

   -
此时发这样的数据格式发送
   WordWithCount(1,1)
WordWithCount(2,1)
WordWithCount(1,1)
WordWithCount(3,1)
WordWithCount(2,1)
WordWithCount(4,1)
WordWithCount(5,1)


   -
WindowOperator.processElement会接收并处理


private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
serializer.serializeRecord(record);

boolean pruneAfterCopying = false;
for (int channel : targetChannels) {
if (copyFromSerializerToTargetChannel(channel)) {
pruneAfterCopying = true;
}
}

// Make sure we don't hold onto the large intermediate 
serialization buffer for too long
if (pruneAfterCopying) {
serializer.prune();
}
}



WindowOperator.processElement(StreamRecord element)
   
   -
WindowOperator.processElement,给每一个WordWithCount(1,1) 
这样的元素分配window,也就是确认每一个元素属于哪一个窗口,因为需要对同一个窗口的相同key进行聚合操作
   final Collection elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);

   -
把当前元素增加到state中保存,add函数中会对相同key进行聚合操作(reduce),对同一个window中相同key进行求和就是在这个方法中进行的
   windowState.add(element.getValue());

   -
triggerContext.onElement(element),对当前元素设置trigger,也就是当前元素的window在哪个时间点触发(结束的时间点),
 
把当前元素的key,增加到InternalTimerServiceImpl.processingTimeTimersQueue中,每一条数据会加一次,加完后会去重,相当于Set,对相同Key的处理,
 
后面发送给Sink的数据,就是遍历这个processingTimeTimersQueue中的数据,当然,每次发送第一个元素,发送后,会把最后一个元素放到第一个元素
   TriggerResult triggerResult = triggerContext.onElement(element);


public void processElement(StreamRecord element) throws Exception {
final Collection elementWindows = 
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), 
windowAssignerContext);

//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;

final K key = this.getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet mergingWindows = 
getMergingWindowSet();

for (W window: elementWindows) {

// adding the new window might result in a 
merge, in that case the actualWindow
// is the merged window and we work with that. 
If we don't merge then
// actualWindow == window
W actualWindow = 
mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction() {
@Override
public void merge(W mergeResult,
Collection 
mergedWindows, W stateWindowResult,
Collection 
mergedStateWindows) throws Exception {

if 
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness 
<= internalTimerService.currentWatermark())) {
throw new 
UnsupportedOperationException("The end timestamp of an " +

"event-time window cannot become earlier than the current watermark " +
"by 
merging. Current watermark: " + internalTimerService.currentWatermark() +
" 
window: " + mergeResult);
} else if 
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= 
internalTimerService.currentProcessingTime()) {
throw new 
UnsupportedOperationException("The end timestamp of a " +

"processing-time window cannot become earlier than the current processing time 
" +
"by 
merging. Current processing time: " + 
internalTimerService.currentProcessingTime() +
  

想知道local,flink 在window完成时,发送给sink的数据顺序,这个顺序是怎么确定的?

2019-03-01 文章 thinktothi...@gmail.com
【问题】).想知道local,flink 
在window完成时,发送给sink的数据顺序,这个顺序是怎么确定的?---).输入数据:1 2 1 3 
2).程序:Flink 1.7.2  local wordCount,  dataStream.timeWindow(Time.seconds(10))
).WindowOperator.onProcessingTime    windowState.stateTable.primaryTable 数据结构   
       167 = {CopyOnWriteStateTable$StateTableEntry@7257} 
"((1)|TimeWindow{start=155150034, end=155150035})=WordWithCount(1,2)"   
 419 = {CopyOnWriteStateTable$StateTableEntry@7258} 
"((3)|TimeWindow{start=155150034, end=155150035})=WordWithCount(3,1)"   
 767 = {CopyOnWriteStateTable$StateTableEntry@7259} 
"((2)|TimeWindow{start=155150034, end=155150035})=WordWithCount(2,2)"   
 ). 发送给sink时  发送的顺序是:      WordWithCount(1,2)      WordWithCount(3,1)      
WordWithCount(2,2)  ??问题是,这个顺序是怎么确定的?     ).keyContext.getCurrentKey() 的顺序是     
  1       3       2  keyContext.getCurrentKey()  这个key是怎遍历顺序的?      
---             

Re: 订阅

2019-03-01 文章 Evans Ye
要訂閱請寄信到user-zh-subscr...@flink.apache.org
收到確認信後按照指示回覆信件,即可完成訂閱

刘文  於 2019年3月2日 週六 上午1:41寫道:

> 订阅
>
>
>
>
> *姓名* 刘文
> * thinktothi...@163.com  *
> *公司名称:*
> 地址:
> 电话
> 手机:15910540132
> QQ:372065525
> [image: 二维码]
>
> 扫描该二维码,可以将电子名片迅速保存到手机 使用帮助
> 
>
>
>
>


订阅

2019-03-01 文章 刘文
订阅





|
姓名刘文
thinktothi...@163.com
公司名称:
地址:
电话
手机:15910540132
QQ:372065525
|

扫描该二维码,可以将电子名片迅速保存到手机 使用帮助

|