Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn

我只保留 KafkaRetractTableSourceSinkFactory  一个, KafkaRetractTableSinkBase 实现 
RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。
@Override
public DataStreamSink consumeDataStream(DataStream> 
dataStream) {
   DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> 
x.f1);

INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 
RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。
INSERT INTO table1 SELECT feild, 1 from table2  
  我理解这不是一个 RetractStream, 上面  dataStream.filter(x -> x.f0 == 
Boolean.TRUE) 的代码应该会出错,但实际上没有出错

还不是完全能理解,我再看一下吧。

谢谢,
王磊


wangl...@geekplus.com.cn 

Sender: Benchao Li
Send Time: 2020-03-31 12:02
Receiver: user-zh
Subject: Re: Re: 实现 KafkaUpsertTableSink
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?
 
wangl...@geekplus.com.cn  于2020年3月31日周二 上午11:17写道:
 
> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
> 137 private static  T
> findSingleInternal(
> 138 Class factoryClass,
> 139 Map properties,
> 140 Optional classLoader) {
> 141
> 142 List tableFactories =
> discoverFactories(classLoader);
> 143 List filtered = filter(tableFactories,
> factoryClass, properties);
> 144
> 145 if (filtered.size() > 1) {
> 146 throw new AmbiguousTableFactoryException(
> 147 filtered,
> 148 factoryClass,
> 149 tableFactories,
> 150 properties);
> 151 } else {
> 152 return filtered.get(0);
> 153 }
> 154 }
>
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: wangl...@geekplus.com.cn
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
> ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: wxchunj...@163.com
> Send Time: 2020-03-29 10:32
> Receiver: user-zh@flink.apache.org
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -Original Message-
> From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org
>  On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh 
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
>  于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> > KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >
> >
> >
> >
> > /**
> > * Searches for factories using Java service providers.
> > *
> > * @return all factories in the classpath */ private static
> > List discoverFactories(Optional
> > classLoader) {
> >try {
> >   List result = new LinkedList<>();
> >   ClassLoader cl =
> > classLoader.orElse(Thread.currentThread().getContextClassLoader());
> >   ServiceLoader
> >  

Re: keyby的乱序处理

2020-03-30 文章 jun su
hi,
keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱,
不会影响后续的窗口触发

tingli ke  于2020年3月31日周二 上午9:54写道:

> 您好,
> 针对您的回复,现在的场景是这样子的
> 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton
> 发射 watermark;
> 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗?
> 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark?
>
> Jimmy Wong  于2020年3月30日周一 下午9:13写道:
>
> > Hi,
> > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy
> > 或其他分配策略,可能导致数据更大的延迟(EventTime)。
> >
> >
> > “想做key化的乱序处理” 这句没太理解,麻烦解释下。
> >
> >
> > | |
> > Jimmy Wong
> > |
> > |
> > wangzmk...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年03月30日 20:58,tingli ke 写道:
> > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
> >
>


-- 
Best,
Jun Su


Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 Benchao Li
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?

wangl...@geekplus.com.cn  于2020年3月31日周二 上午11:17写道:

> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
> 137 private static  T
> findSingleInternal(
> 138 Class factoryClass,
> 139 Map properties,
> 140 Optional classLoader) {
> 141
> 142 List tableFactories =
> discoverFactories(classLoader);
> 143 List filtered = filter(tableFactories,
> factoryClass, properties);
> 144
> 145 if (filtered.size() > 1) {
> 146 throw new AmbiguousTableFactoryException(
> 147 filtered,
> 148 factoryClass,
> 149 tableFactories,
> 150 properties);
> 151 } else {
> 152 return filtered.get(0);
> 153 }
> 154 }
>
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: wangl...@geekplus.com.cn
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
> ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: wxchunj...@163.com
> Send Time: 2020-03-29 10:32
> Receiver: user-zh@flink.apache.org
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -Original Message-
> From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org
>  On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh 
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
>  于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> > KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >
> >
> >
> >
> > /**
> > * Searches for factories using Java service providers.
> > *
> > * @return all factories in the classpath */ private static
> > List discoverFactories(Optional
> > classLoader) {
> >try {
> >   List result = new LinkedList<>();
> >   ClassLoader cl =
> > classLoader.orElse(Thread.currentThread().getContextClassLoader());
> >   ServiceLoader
> >  .load(TableFactory.class, cl)
> >  .iterator()
> >  .forEachRemaining(result::add);
> >   //todo add
> >   result.add(new KafkaUpsertTableSourceSinkFactory());
> >   return result;
> >} catch (ServiceConfigurationError e) {
> >   LOG.error("Could not load service provider for table factories.",
> e);
> >   throw new TableException("Could not load service provider for
> > table factories.", e);
> >}
> >
> > }
> >
> >
> >
> >
> >
> > 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> > 以成功运行的。
> >
> > 非常感谢
> >
> >
> >
> >
> >
> > --
> >
> > Thanks
> >
> > venn
> >
> >
> >
> >
> --
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; 

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 
KafkaRetractTableSourceSinkFactory 写了一遍
但这个应该怎样改才合适呢?

137 private static  T findSingleInternal(
138 Class factoryClass,
139 Map properties,
140 Optional classLoader) {
141
142 List tableFactories = 
discoverFactories(classLoader);
143 List filtered = filter(tableFactories, factoryClass, 
properties);
144
145 if (filtered.size() > 1) {
146 throw new AmbiguousTableFactoryException(
147 filtered,
148 factoryClass,
149 tableFactories,
150 properties);
151 } else {
152 return filtered.get(0);
153 }
154 }


谢谢,
王磊


wangl...@geekplus.com.cn 

 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-03-31 10:50
Receiver: user-zh
Subject: Re: RE: 实现 KafkaUpsertTableSink
 
我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
 
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more
 
这个改怎样解决呢?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 
Sender: wxchunj...@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-Original Message-
From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh 
Subject: Re: 实现 KafkaUpsertTableSink
Hi,
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 于2020年3月28日周六 下午5:38写道:
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List discoverFactories(Optional
> classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   //todo add
>   result.add(new KafkaUpsertTableSourceSinkFactory());
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for 
> table factories.", e);
>}
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> --
>
> Thanks
>
> venn
>
>
>
>
-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: RE: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn

我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:

org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more

这个改怎样解决呢?

谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: wxchunj...@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
 
 
-Original Message-
From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh 
Subject: Re: 实现 KafkaUpsertTableSink
 
Hi,
 
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 
 于2020年3月28日周六 下午5:38写道:
 
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List discoverFactories(Optional
> classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   //todo add
>   result.add(new KafkaUpsertTableSourceSinkFactory());
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for 
> table factories.", e);
>}
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> --
>
> Thanks
>
> venn
>
>
>
>
 
-- 
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: keyby的乱序处理

2020-03-30 文章 tingli ke
您好,
针对您的回复,现在的场景是这样子的
1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton
发射 watermark;
2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗?
3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark?

Jimmy Wong  于2020年3月30日周一 下午9:13写道:

> Hi,
> watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy
> 或其他分配策略,可能导致数据更大的延迟(EventTime)。
>
>
> “想做key化的乱序处理” 这句没太理解,麻烦解释下。
>
>
> | |
> Jimmy Wong
> |
> |
> wangzmk...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月30日 20:58,tingli ke 写道:
> 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
>


Re: keyby的乱序处理

2020-03-30 文章 tingli ke
您好,
非常感谢您的回复!
key化就是keyby之后的,个人理解为keyed(key 化),和您回答的“watermark 可以在 keyBy 后分配”是同一个话题。

Jimmy Wong  于2020年3月30日周一 下午9:13写道:

> Hi,
> watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy
> 或其他分配策略,可能导致数据更大的延迟(EventTime)。
>
>
> “想做key化的乱序处理” 这句没太理解,麻烦解释下。
>
>
> | |
> Jimmy Wong
> |
> |
> wangzmk...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月30日 20:58,tingli ke 写道:
> 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
>


回复:keyby的乱序处理

2020-03-30 文章 Jimmy Wong
Hi, 
watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy 
或其他分配策略,可能导致数据更大的延迟(EventTime)。


“想做key化的乱序处理” 这句没太理解,麻烦解释下。


| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制


在2020年03月30日 20:58,tingli ke 写道:
请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗


keyby的乱序处理

2020-03-30 文章 tingli ke
请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗


Flink(≥1.9) Table/SQL Trigger

2020-03-30 文章 Jimmy Wong
Hi,all:
我记得 Flink ( ≥1.9) 的 SQL/Table 是不支持 CountTrigger.of(1),这种自定义Trigger的吧


请问对于  Flink ( ≥1.9) 的 SQL/Table 如何实现自定义 Trigger?比如 CountTrigger (per-record 
Trigger),ContinuousEventTimeTrigger(specifical-time Trigger) 等。


| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制



Re: Re:Re: flink savepoint问题

2020-03-30 文章 Yun Tang
Hi

首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。

  1.  打开block-cache usage [1] 观察metrics中block cache的使用量。
  2.  麻烦回答一下几个问题,有助于进一步定位
 *   单个TM有几个slot
 *   单个TM的managed memory配置了多少
 *   一共声明了多少个keyed  state,(如果使用了window,也相当于会使用一个state),其中有多少个map 
state,是否经常遍历那个map state
 *   被kill的container内一共有几个rocksDB 实例,可以通过搜索日志 "Obtained shared RocksDB 
cache of size" 计数
 *   是否对RocksDB单独配置了options factory或者相关options

state.backend.rocksdb.memory.managed 
这个参数的语义是RocksDB使用的内存从Flink来,一个slot内的若干RocksDB实例会共享一块share 
cache。如果将这个参数设置为false,那么就回退到1.9以前的场景,rocksDB的内存将完全不由Flink管理,在某种程度上来说,更容易被conatiner
 kill。

如果想要快速缓解这个问题,一种办法是增大 taskmanager.memory.task.off-heap.size 
[2],使得提供多一部分内存以供RocksDB超用。其他的缓解办法需要根据您对上面问题的回答来实施

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-task-off-heap-size

Best
唐云


From: xyq 
Sent: Monday, March 30, 2020 10:41
To: user-zh@flink.apache.org 
Subject: Re:Re: flink savepoint问题

Hi,您好:
我这边有个小流 left join大流的需求,小流的数据夜间基本没有 
可能会4-5个小时没数据,目前的情况是一到晚上container老是被kill掉,报的是内存溢出。我想问下,我想把托管内存这设置成false,会有什么弊端吗?或者该问题怎么解决?困扰了好久了,请您指点一谢谢。
state.backend.rocksdb.memory.managed : false

















在 2020-03-28 11:04:09,"Congxian Qiu"  写道:
>Hi
>
>对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign
>Checkpoint 可以解决反压情况下的 checkpoint
>对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成
>snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。
>[1] https://issues.apache.org/jira/browse/FLINK-14551
>Best,
>Congxian
>
>
>大数据开发面试_夏永权  于2020年3月27日周五 下午4:19写道:
>
>> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!
>>
>> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id
>> 在程序有背压的时候停不掉
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.util.FlinkException: Could not cancel job
>> 1f768e4ca9ad5792a4844a5d12163b73.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
>> ... 9 more
>> stop flink job failed!!!
>>
>>
>>
>>
>> 2.再用flink
>> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> at
>> 

请教两个关于 kafka sink 的问题

2020-03-30 文章 whirly
Hi,大家好:
我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。


1. kafka sink 没有像 elasticsearch sink 一样提供一个 
ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常?
在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception 
,不能拿到 Record,而且 callback 是private的,无法通过继承重写
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
如果 kafkaSink.setLogFailuresOnly(true); 
那么只打印异常信息,消息将丢失了,消息丢失是不能容忍的,需要写到错误队列,但是拿不到消息。
如果 kafkaSink.setLogFailuresOnly(false) 作业将异常重启,但是又会重复消费,又遇到相同的消息,又异常重启。
注:目前只遇到 消息太大 的异常,默认最大消息为1M,已经调大,但是这样没有治本。


2. 在给 kafka sink 设置了 exactly-once 语义之后,作业默认并行度设置为2,启动后打印一百多次的 kafka config 
连接信息,很是疑惑为什么会有这么多?
FlinkKafkaProducer 中有这两个参数,在生成 事务ID时,一个并行度应该只生成 5 * 5 等于 25个事务ID?
public static final int SAFE_SCALE_DOWN_FACTOR = 5;
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
默认的checkpoint并行度是1,所以 poolsize应该可以调小点,但是对 SAFE_SCALE_DOWN_FACTOR  还不是很清楚功能


感谢您

flink无法提交jars

2020-03-30 文章 Jerome



在local模式下也无法提交jar, 一提交就出错了,请问什么原因呢?unbuntu18.04版本,请教一下,谢谢~!