Flink SQL中动态嵌套字段如何定义DDL

2020-03-30 文章 111
Hi, 我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如: {database:a, table: b, type:update, data:{a:1,b:2,c:3}} {database:a, table: c, type:update, data:{c:1,d:2}} 请问这种类型该如何定义DDL? Best, Xinghalo

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn
我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。 @Override public DataStreamSink consumeDataStream(DataStream> dataStream) { DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -

Re: keyby的乱序处理

2020-03-30 文章 jun su
hi, keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱, 不会影响后续的窗口触发 tingli ke 于2020年3月31日周二 上午9:54写道: > 您好, > 针对您的回复,现在的场景是这样子的 > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton > 发射 watermark; > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? > 3、是否存在可以不用经过

ProcessWindowFunction??????????????state??

2020-03-30 文章 ????
-- FLINK 1.10.0 ON YARN -- 1.  .window(TumblingProcessingTimeWindows.of(Time.days(1))) 2.new Trigger(.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 3.new ProcessWindowFunction(),0?

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 Benchao Li
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? wangl...@geekplus.com.cn 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSin

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍 但这个应该怎样改才合适呢? 137 private static T findSingleInternal( 138 Class factoryClass, 139 Map properties, 140

Re: RE: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn
我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.Ta

?????? keyby??????????

2020-03-30 文章 ??????(Jiacheng Jiang)
keybywatermark??windowAll??keyby?? --  -- ??: "tingli ke"

Re: keyby的乱序处理

2020-03-30 文章 tingli ke
您好, 针对您的回复,现在的场景是这样子的 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton 发射 watermark; 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? Jimmy Wong 于2020年3月30日周一 下午9:13写道: > Hi, > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy >

Re: keyby的乱序处理

2020-03-30 文章 tingli ke
您好, 非常感谢您的回复! key化就是keyby之后的,个人理解为keyed(key 化),和您回答的“watermark 可以在 keyBy 后分配”是同一个话题。 Jimmy Wong 于2020年3月30日周一 下午9:13写道: > Hi, > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > | | > Jimmy Wong > | > | > wangzmk...@1

回复:keyby的乱序处理

2020-03-30 文章 Jimmy Wong
Hi, watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy 或其他分配策略,可能导致数据更大的延迟(EventTime)。 “想做key化的乱序处理” 这句没太理解,麻烦解释下。 | | Jimmy Wong | | wangzmk...@163.com | 签名由网易邮箱大师定制 在2020年03月30日 20:58,tingli ke 写道: 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗

keyby的乱序处理

2020-03-30 文章 tingli ke
请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗

Flink(≥1.9) Table/SQL Trigger

2020-03-30 文章 Jimmy Wong
Hi,all: 我记得 Flink ( ≥1.9) 的 SQL/Table 是不支持 CountTrigger.of(1),这种自定义Trigger的吧 请问对于 Flink ( ≥1.9) 的 SQL/Table 如何实现自定义 Trigger?比如 CountTrigger (per-record Trigger),ContinuousEventTimeTrigger(specifical-time Trigger) 等。 | | Jimmy Wong | | wangzmk...@163.com | 签名由网易邮箱大师定制

Re: Re:Re: flink savepoint问题

2020-03-30 文章 Yun Tang
Hi 首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。 1. 打开block-cache usage [1] 观察metrics中block cache的使用量。 2. 麻烦回答一下几个问题,有助于进一步定位 * 单个TM有几个slot * 单个TM的managed memory配置了多少 * 一共声明了多少个keyed state,(如果使用了window,也相当于会使用一个state),其中有多少个map state,是否经常遍历那个map state * 被kill的contai

请教两个关于 kafka sink 的问题

2020-03-30 文章 whirly
Hi,大家好: 我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。 1. kafka sink 没有像 elasticsearch sink 一样提供一个 ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常? 在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception ,不能拿到 Record,而且 callback 是private的,无法通过继承重写 if (logFailuresOnly) { call

flink无法提交jars

2020-03-30 文章 Jerome
在local模式下也无法提交jar, 一提交就出错了,请问什么原因呢?unbuntu18.04版本,请教一下,谢谢~!