Flink SQL中动态嵌套字段如何定义DDL
Hi, 我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如: {database:a, table: b, type:update, data:{a:1,b:2,c:3}} {database:a, table: c, type:update, data:{c:1,d:2}} 请问这种类型该如何定义DDL? Best, Xinghalo
Re: Re: 实现 KafkaUpsertTableSink
我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。 @Override public DataStreamSink consumeDataStream(DataStream> dataStream) { DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> x.f1); INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。 INSERT INTO table1 SELECT feild, 1 from table2 我理解这不是一个 RetractStream, 上面 dataStream.filter(x -> x.f0 == Boolean.TRUE) 的代码应该会出错,但实际上没有出错 还不是完全能理解,我再看一下吧。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Benchao Li Send Time: 2020-03-31 12:02 Receiver: user-zh Subject: Re: Re: 实现 KafkaUpsertTableSink 我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? wangl...@geekplus.com.cn 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static T > findSingleInternal( > 138 Class factoryClass, > 139 Map properties, > 140 Optional classLoader) { > 141 > 142 List tableFactories = > discoverFactories(classLoader); > 143 List filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > > Sender: wangl...@geekplus.com.cn > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > Sender: wxchunj...@163.com > Send Time: 2020-03-29 10:32 > Receiver: user-zh@flink.apache.org > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -Original Message- > From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org > On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > > > > > > > > > > /** > > * Searches for factories using Java service providers. > > * > > * @return all factories in the classpath */ private static > > List discoverFactories(Optional > > classLoader) { > >try { > > List result = new LinkedList<>(); > > ClassLoader cl = > > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > > ServiceLoader > > .load(TableFactory.cl
Re: keyby的乱序处理
hi, keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱, 不会影响后续的窗口触发 tingli ke 于2020年3月31日周二 上午9:54写道: > 您好, > 针对您的回复,现在的场景是这样子的 > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton > 发射 watermark; > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? > 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? > > Jimmy Wong 于2020年3月30日周一 下午9:13写道: > > > Hi, > > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > > > > | | > > Jimmy Wong > > | > > | > > wangzmk...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年03月30日 20:58,tingli ke 写道: > > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗 > > > -- Best, Jun Su
ProcessWindowFunction??????????????state??
-- FLINK 1.10.0 ON YARN -- 1. .window(TumblingProcessingTimeWindows.of(Time.days(1))) 2.new Trigger(.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 3.new ProcessWindowFunction(),0?? -- ?? new ProcessWindowFunction()ValueState0ValueState??ValueState??.clear()?? -- .window(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) .process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] { private var pv_st: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long])) } override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = { var c_st = 0 val elementsIterator = elements.iterator // ??word while (elementsIterator.hasNext) { val ac_name = elementsIterator.next()._2 if(!ac_name.isEmpty && ac_name.equals("listentime")){ c_st +=1 } } val time: Date = new Date() val dateFormat: SimpleDateFormat = new SimpleDateFormat("-MM-dd") val date = dateFormat.format(time) // add current pv_st.update(pv_st.value() + c_st) var jsonStr = ""+key.getField(0)+"_"+date+"&" // json jsonStr += "{"+ "\"yesterday_foreground_play_pv\":\""+pv_st.value()+ "\"}"; //?? if(stateDate.equals("") || stateDate.equals(date)){ stateDate=date out.collect(jsonStr) }else{ out.collect(jsonStr) pv_st.clear() stateDate=date } } })
Re: Re: 实现 KafkaUpsertTableSink
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? wangl...@geekplus.com.cn 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static T > findSingleInternal( > 138 Class factoryClass, > 139 Map properties, > 140 Optional classLoader) { > 141 > 142 List tableFactories = > discoverFactories(classLoader); > 143 List filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > > Sender: wangl...@geekplus.com.cn > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > Sender: wxchunj...@163.com > Send Time: 2020-03-29 10:32 > Receiver: user-zh@flink.apache.org > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -Original Message- > From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org > On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > > > > > > > > > > /** > > * Searches for factories using Java service providers. > > * > > * @return all factories in the classpath */ private static > > List discoverFactories(Optional > > classLoader) { > >try { > > List result = new LinkedList<>(); > > ClassLoader cl = > > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > > ServiceLoader > > .load(TableFactory.class, cl) > > .iterator() > > .forEachRemaining(result::add); > > //todo add > > result.add(new KafkaUpsertTableSourceSinkFactory()); > > return result; > >} catch (ServiceConfigurationError e) { > > LOG.error("Could not load service provider for table factories.", > e); > > throw new TableException("Could not load service provider for > > table factories.", e); > >} > > > > } > > > > > > > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > > 以成功运行的。 > > > > 非常感谢 > > > > > > > > > > > > -- > > > > Thanks > > > > venn > > > > > > > > > -- > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; lib
Re: Re: 实现 KafkaUpsertTableSink
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍 但这个应该怎样改才合适呢? 137 private static T findSingleInternal( 138 Class factoryClass, 139 Map properties, 140 Optional classLoader) { 141 142 List tableFactories = discoverFactories(classLoader); 143 List filtered = filter(tableFactories, factoryClass, properties); 144 145 if (filtered.size() > 1) { 146 throw new AmbiguousTableFactoryException( 147 filtered, 148 factoryClass, 149 tableFactories, 150 properties); 151 } else { 152 return filtered.get(0); 153 } 154 } 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-03-31 10:50 Receiver: user-zh Subject: Re: RE: 实现 KafkaUpsertTableSink 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wxchunj...@163.com Send Time: 2020-03-29 10:32 Receiver: user-zh@flink.apache.org Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -Original Message- From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List discoverFactories(Optional > classLoader) { >try { > List result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; >} catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: RE: 实现 KafkaUpsertTableSink
我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wxchunj...@163.com Send Time: 2020-03-29 10:32 Receiver: user-zh@flink.apache.org Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -Original Message- From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List discoverFactories(Optional > classLoader) { >try { > List result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; >} catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
?????? keyby??????????
keybywatermark??windowAll??keyby?? -- -- ??: "tingli ke"
Re: keyby的乱序处理
您好, 针对您的回复,现在的场景是这样子的 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton 发射 watermark; 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? Jimmy Wong 于2020年3月30日周一 下午9:13写道: > Hi, > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > | | > Jimmy Wong > | > | > wangzmk...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年03月30日 20:58,tingli ke 写道: > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗 >
Re: keyby的乱序处理
您好, 非常感谢您的回复! key化就是keyby之后的,个人理解为keyed(key 化),和您回答的“watermark 可以在 keyBy 后分配”是同一个话题。 Jimmy Wong 于2020年3月30日周一 下午9:13写道: > Hi, > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > | | > Jimmy Wong > | > | > wangzmk...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年03月30日 20:58,tingli ke 写道: > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗 >
回复:keyby的乱序处理
Hi, watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy 或其他分配策略,可能导致数据更大的延迟(EventTime)。 “想做key化的乱序处理” 这句没太理解,麻烦解释下。 | | Jimmy Wong | | wangzmk...@163.com | 签名由网易邮箱大师定制 在2020年03月30日 20:58,tingli ke 写道: 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
keyby的乱序处理
请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
Flink(≥1.9) Table/SQL Trigger
Hi,all: 我记得 Flink ( ≥1.9) 的 SQL/Table 是不支持 CountTrigger.of(1),这种自定义Trigger的吧 请问对于 Flink ( ≥1.9) 的 SQL/Table 如何实现自定义 Trigger?比如 CountTrigger (per-record Trigger),ContinuousEventTimeTrigger(specifical-time Trigger) 等。 | | Jimmy Wong | | wangzmk...@163.com | 签名由网易邮箱大师定制
Re: Re:Re: flink savepoint问题
Hi 首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。 1. 打开block-cache usage [1] 观察metrics中block cache的使用量。 2. 麻烦回答一下几个问题,有助于进一步定位 * 单个TM有几个slot * 单个TM的managed memory配置了多少 * 一共声明了多少个keyed state,(如果使用了window,也相当于会使用一个state),其中有多少个map state,是否经常遍历那个map state * 被kill的container内一共有几个rocksDB 实例,可以通过搜索日志 "Obtained shared RocksDB cache of size" 计数 * 是否对RocksDB单独配置了options factory或者相关options state.backend.rocksdb.memory.managed 这个参数的语义是RocksDB使用的内存从Flink来,一个slot内的若干RocksDB实例会共享一块share cache。如果将这个参数设置为false,那么就回退到1.9以前的场景,rocksDB的内存将完全不由Flink管理,在某种程度上来说,更容易被conatiner kill。 如果想要快速缓解这个问题,一种办法是增大 taskmanager.memory.task.off-heap.size [2],使得提供多一部分内存以供RocksDB超用。其他的缓解办法需要根据您对上面问题的回答来实施 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-task-off-heap-size Best 唐云 From: xyq Sent: Monday, March 30, 2020 10:41 To: user-zh@flink.apache.org Subject: Re:Re: flink savepoint问题 Hi,您好: 我这边有个小流 left join大流的需求,小流的数据夜间基本没有 可能会4-5个小时没数据,目前的情况是一到晚上container老是被kill掉,报的是内存溢出。我想问下,我想把托管内存这设置成false,会有什么弊端吗?或者该问题怎么解决?困扰了好久了,请您指点一谢谢。 state.backend.rocksdb.memory.managed : false 在 2020-03-28 11:04:09,"Congxian Qiu" 写道: >Hi > >对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign >Checkpoint 可以解决反压情况下的 checkpoint >对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成 >snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。 >[1] https://issues.apache.org/jira/browse/FLINK-14551 >Best, >Congxian > > >大数据开发面试_夏永权 于2020年3月27日周五 下午4:19写道: > >> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! >> >> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id >> 在程序有背压的时候停不掉 >> >> >> The program finished with the following exception: >> org.apache.flink.util.FlinkException: Could not cancel job >> 1f768e4ca9ad5792a4844a5d12163b73. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.TimeoutException >> at >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) >> ... 9 more >> stop flink job failed!!! >> >> >> >> >> 2.再用flink >> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) >> >> >> The program finished with the following exception: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >> at >> org.apache.flink.streaming.api.envi
请教两个关于 kafka sink 的问题
Hi,大家好: 我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。 1. kafka sink 没有像 elasticsearch sink 一样提供一个 ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常? 在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception ,不能拿到 Record,而且 callback 是private的,无法通过继承重写 if (logFailuresOnly) { callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); } acknowledgeMessage(); } }; } 如果 kafkaSink.setLogFailuresOnly(true); 那么只打印异常信息,消息将丢失了,消息丢失是不能容忍的,需要写到错误队列,但是拿不到消息。 如果 kafkaSink.setLogFailuresOnly(false) 作业将异常重启,但是又会重复消费,又遇到相同的消息,又异常重启。 注:目前只遇到 消息太大 的异常,默认最大消息为1M,已经调大,但是这样没有治本。 2. 在给 kafka sink 设置了 exactly-once 语义之后,作业默认并行度设置为2,启动后打印一百多次的 kafka config 连接信息,很是疑惑为什么会有这么多? FlinkKafkaProducer 中有这两个参数,在生成 事务ID时,一个并行度应该只生成 5 * 5 等于 25个事务ID? public static final int SAFE_SCALE_DOWN_FACTOR = 5; public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; 默认的checkpoint并行度是1,所以 poolsize应该可以调小点,但是对 SAFE_SCALE_DOWN_FACTOR 还不是很清楚功能 感谢您
flink无法提交jars
在local模式下也无法提交jar, 一提交就出错了,请问什么原因呢?unbuntu18.04版本,请教一下,谢谢~!