Re:Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 Lynn Chen



hi ,all


我的问题解决了.   出现该问题的原因如下:


因为通过堡垒机端口转发, 所以需要在bootstrap.servers 写上所有 kafka borker即可


1. 修改 kafka 外网配置
>> broker1 配置:
>> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT

对应的broker2  broker3 分别改为 9798  9799



2. 让运维对三个端口进行映射转发
xxx-b-1:9797 --> xxx-a-1:9797
xxx-b-1:9798 --> xxx-a-2:9798
xxx-b-1:9799 --> xxx-a-3:9799



3. properties.setProperty("bootstrap.servers", 
"xxx-b-1:9797,xxx-b-1:9798,xxx-b-1:9799")





确实是少转发了数据节点, 导致的只能读取一个节点的数据  (正好这个节点对应的 partition 是 2,  才反应过来可能是少转发节点的问题)


感谢 zhisheng 的帮助!



在 2020-10-23 11:56:08,"zhisheng"  写道:
>hi
>
>既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说
>
>> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>建议看看是不是这个转发有问题,只转发了一个节点
>
>Best
>zhisheng
>
>Lynn Chen  于2020年10月23日周五 上午11:01写道:
>
>>
>>
>>
>> hi, zhisheng:
>>
>>
>> 我解析 json 后:
>> (xxx, xxx, xxx, topic, partition, offset)
>> =>
>>
>>
>> (false,1603420582310,"INSERT","test3.order",2,75)
>> (false,1603421312803,"INSERT","test3.order",2,76)
>> (false,1603421344819,"INSERT","test3.order",2,77)
>> (false,1603421344819,"INSERT","test3.order",2,78)
>>
>>
>> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>>
>>
>> 我的猜想:
>>
>>
>> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>>
>>
>> broker1 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>> broker2 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>>
>>
>>
>>
>>
>> broker3 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>> 本机连接kafka:
>> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>>
>>
>> 是跟这个配置有关吗?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-10-23 08:37:14,"zhisheng"  写道:
>> >hi
>> >
>> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>> >
>> >eg:
>> >
>> >  env.addSource(new FlinkKafkaConsumer011<>(
>> >parameters.get("topic"),new
>> >JSONKeyValueDeserializationSchema(true),
>> >buildKafkaProps(parameters))).flatMap(new
>> >FlatMapFunction() {
>> >@Overridepublic void flatMap(ObjectNode jsonNodes,
>> >Collector collector) throws Exception {
>> >System.out.println(jsonNodes.get("value"));
>> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
>> >
>> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
>> >
>> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
>> >   collector.collect(jsonNodes);
>> >}}).print();
>> >
>> >Best
>> >
>> >zhisheng
>> >
>> >
>> >Lynn Chen  于2020年10月23日周五 上午12:13写道:
>> >
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> hi,  Qijun Feng:
>> >>
>> >>
>> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-04-03 09:27:52,"LakeShen"  写道:
>> >> >Hi Qijun,
>> >> >
>> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>> >> >
>> >> >Best,
>> >> >LakeShen
>> >> >
>> >> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
>> >> >
>> >> >> Dear All,
>> >> >>
>> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >> >>  现在改成了所有地址,也换了 group.id
>> >> >>
>> >> >>
>> >> >> Properties properties = new Properties();
>> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >> >>
>> >> >> FlinkKafkaConsumer010 kafkaConsumer010 =
>> >> >>new FlinkKafkaConsumer010("behavior-logs_dev",
>> new
>> >> >> BehaviorLogDeserializationSchema(), properties);
>> >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>> >> >>
>> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
>> partiton=1,或者
>> >> 2
>> >> >> 的,
>> >> >>
>> >> >> 2020-04-02 14:54:58,532 INFO
>> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> >> Consumer subtask 0 creating fetcher with offsets
>> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >> >>
>> >> >>
>> >> >> 是哪里有问题吗?
>> >> >>
>> >> >

答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-22 文章 刘首维
Hi,


我的做法如我所说,是用反射将parser拿出来的,比较hack但是很简单而且很稳妥


代码差不多就是下面这个样子


Flink version: custom version base on 1.11.x


@PostConstruct

private void setup() throws NoSuchFieldException, IllegalAccessException {
final StreamTableEnvironmentImpl env = (StreamTableEnvironmentImpl) 
support.getStreamTableEnvironment();
final Field field = 
env.getParser().getClass().getDeclaredField("calciteParserSupplier");
field.setAccessible(true);
// 普通的parser
final Supplier defaultSupplier = 
(Supplier) field.get(env.getParser());
this.defaultSupplier = defaultSupplier;
env.getConfig().setSqlDialect(SqlDialect.HIVE);
final Field field2 = 
env.getParser().getClass().getDeclaredField("calciteParserSupplier");
field2.setAccessible(true);
// hive的parser
final Supplier hiveSupplier = (Supplier) 
field.get(env.getParser());
this.hiveSupplier = hiveSupplier;
}

WARN:
这种做法带来的比较不好的点是,项目的依赖会比较多。我这边恰好是一个在一个单独的且包含Flink依赖的服务中做的这件事从而规避了这个问题。
如果要参考上面这种做法的话,注意评估依赖的问题啦~





发件人: silence 
发送时间: 2020年10月22日 12:05:38
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.11里如何parse出未解析的执行计划

我简单写了一下仅供参考

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;

/**
 * @author: silence
 * @date: 2020/10/22
 */
public class Test {
public static void main(String[] args) throws SqlParseException {
String sql = "xxx";
SqlParser.Config sqlParserConfig = SqlParser
.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setConformance(FlinkSqlConformance.DEFAULT)
.setLex(Lex.JAVA)
.setIdentifierMaxLength(256)
.build();
SqlParser sqlParser = SqlParser.create(sql, sqlParserConfig);
SqlNodeList sqlNodes = sqlParser.parseStmtList();
for (SqlNode sqlNode : sqlNodes) {
//do something
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-10-22 文章 air23
你好,
这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。
这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。

请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-22 文章 chenxuying
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 
但是我发现好像是10 , 
同时我也设置了其他的属性,比如
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
是可行,所以我的设置应该没有什么问题


[1]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained



Re: pyflink和flink版本的兼容性问题

2020-10-22 文章 zhisheng
估计可能会有问题,很多变动

whh_960101  于2020年10月23日周五 上午11:41写道:

> Hi,各位大佬,
>  想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink
> 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12
> 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗


Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 zhisheng
hi

既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说

> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)

建议看看是不是这个转发有问题,只转发了一个节点

Best
zhisheng

Lynn Chen  于2020年10月23日周五 上午11:01写道:

>
>
>
> hi, zhisheng:
>
>
> 我解析 json 后:
> (xxx, xxx, xxx, topic, partition, offset)
> =>
>
>
> (false,1603420582310,"INSERT","test3.order",2,75)
> (false,1603421312803,"INSERT","test3.order",2,76)
> (false,1603421344819,"INSERT","test3.order",2,77)
> (false,1603421344819,"INSERT","test3.order",2,78)
>
>
> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>
>
> 我的猜想:
>
>
> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>
> broker1 配置:
>
>
> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> broker2 配置:
>
>
> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
>
>
>
>
>
> broker3 配置:
>
>
> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> 本机连接kafka:
> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>
>
> 是跟这个配置有关吗?
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-23 08:37:14,"zhisheng"  写道:
> >hi
> >
> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
> >
> >eg:
> >
> >  env.addSource(new FlinkKafkaConsumer011<>(
> >parameters.get("topic"),new
> >JSONKeyValueDeserializationSchema(true),
> >buildKafkaProps(parameters))).flatMap(new
> >FlatMapFunction() {
> >@Overridepublic void flatMap(ObjectNode jsonNodes,
> >Collector collector) throws Exception {
> >System.out.println(jsonNodes.get("value"));
> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
> >   collector.collect(jsonNodes);
> >}}).print();
> >
> >Best
> >
> >zhisheng
> >
> >
> >Lynn Chen  于2020年10月23日周五 上午12:13写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >> hi,  Qijun Feng:
> >>
> >>
> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-04-03 09:27:52,"LakeShen"  写道:
> >> >Hi Qijun,
> >> >
> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
> >> >
> >> >Best,
> >> >LakeShen
> >> >
> >> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
> >> >
> >> >> Dear All,
> >> >>
> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >> >>  现在改成了所有地址,也换了 group.id
> >> >>
> >> >>
> >> >> Properties properties = new Properties();
> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >> >>
> >> >> FlinkKafkaConsumer010 kafkaConsumer010 =
> >> >>new FlinkKafkaConsumer010("behavior-logs_dev",
> new
> >> >> BehaviorLogDeserializationSchema(), properties);
> >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
> >> >>
> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
> partiton=1,或者
> >> 2
> >> >> 的,
> >> >>
> >> >> 2020-04-02 14:54:58,532 INFO
> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> >> Consumer subtask 0 creating fetcher with offsets
> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >> >>
> >> >>
> >> >> 是哪里有问题吗?
> >> >>
> >> >>
> >>
>


pyflink和flink版本的兼容性问题

2020-10-22 文章 whh_960101
Hi,各位大佬, 
想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 
1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 
升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

Re:Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 Lynn Chen



hi, zhisheng:


我解析 json 后:
(xxx, xxx, xxx, topic, partition, offset)
=>


(false,1603420582310,"INSERT","test3.order",2,75)
(false,1603421312803,"INSERT","test3.order",2,76)
(false,1603421344819,"INSERT","test3.order",2,77)
(false,1603421344819,"INSERT","test3.order",2,78)


我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到


我的猜想:


我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)


broker1 配置:


listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


broker2 配置:


listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT







broker3 配置:


listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


本机连接kafka:
properties.setProperty("bootstrap.servers", "xxx-b-1:9797")


是跟这个配置有关吗? 










在 2020-10-23 08:37:14,"zhisheng"  写道:
>hi
>
>如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>
>eg:
>
>  env.addSource(new FlinkKafkaConsumer011<>(
>parameters.get("topic"),new
>JSONKeyValueDeserializationSchema(true),
>buildKafkaProps(parameters))).flatMap(new
>FlatMapFunction() {
>@Overridepublic void flatMap(ObjectNode jsonNodes,
>Collector collector) throws Exception {
>System.out.println(jsonNodes.get("value"));
>System.out.println(jsonNodes.get("metadata").get("topic").asText());
>
>System.out.println(jsonNodes.get("metadata").get("offset").asText());
>
>System.out.println(jsonNodes.get("metadata").get("partition").asText());
>   collector.collect(jsonNodes);
>}}).print();
>
>Best
>
>zhisheng
>
>
>Lynn Chen  于2020年10月23日周五 上午12:13写道:
>
>>
>>
>>
>>
>>
>>
>> hi,  Qijun Feng:
>>
>>
>> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-03 09:27:52,"LakeShen"  写道:
>> >Hi Qijun,
>> >
>> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>> >
>> >Best,
>> >LakeShen
>> >
>> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
>> >
>> >> Dear All,
>> >>
>> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >>  现在改成了所有地址,也换了 group.id
>> >>
>> >>
>> >> Properties properties = new Properties();
>> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >>
>> >> FlinkKafkaConsumer010 kafkaConsumer010 =
>> >>new FlinkKafkaConsumer010("behavior-logs_dev", new
>> >> BehaviorLogDeserializationSchema(), properties);
>> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>> >>
>> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
>> 2
>> >> 的,
>> >>
>> >> 2020-04-02 14:54:58,532 INFO
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> Consumer subtask 0 creating fetcher with offsets
>> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >>
>> >>
>> >> 是哪里有问题吗?
>> >>
>> >>
>>


Re: pyflink和pyspark中引用包不兼容,li例如pyspark 中pandas版本1.1.4 而pyflink是>=0.23<0.25

2020-10-22 文章 Xingbo Huang
Hi,
pyspark对pandas版本的限制是>=0.23.2,你安装的话就默认安装了最新的版本的pandas,这是有很大的潜在风险的。
在pyflink
1.11版本的时候pdandas的版本限制是pandas>=0.23.4,<=0.25.3,使用更稳定的pandas的版本可以规避很多风险。而且这个版本范围也在pyspark的范围内,是能一起用的。

Best,
Xingbo

xuzh  于2020年10月23日周五 上午9:39写道:

> pyflink和pyspark中引用包不兼容,li例如pyspark 中pandas版本1.1.4 而pyflink是>=0.23<0.25.
> 官方有没有升级pandas版的计划。
> 为了能让pyflink和pyspak兼容。在某些主版本上的包能保持一致嘛
> 个人建议


pyflink??pyspark????????????????li????pyspark ??pandas????1.1.4 ??pyflink??>=0.23<0.25

2020-10-22 文章 xuzh
pyflink??pysparklipyspark ??pandas1.1.4 
??pyflink??>=0.23<0.25.
??pandas??
pyflink??pyspak


Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 zhisheng
hi

如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。

eg:

  env.addSource(new FlinkKafkaConsumer011<>(
parameters.get("topic"),new
JSONKeyValueDeserializationSchema(true),
buildKafkaProps(parameters))).flatMap(new
FlatMapFunction() {
@Overridepublic void flatMap(ObjectNode jsonNodes,
Collector collector) throws Exception {
System.out.println(jsonNodes.get("value"));
System.out.println(jsonNodes.get("metadata").get("topic").asText());

System.out.println(jsonNodes.get("metadata").get("offset").asText());

System.out.println(jsonNodes.get("metadata").get("partition").asText());
   collector.collect(jsonNodes);
}}).print();

Best

zhisheng


Lynn Chen  于2020年10月23日周五 上午12:13写道:

>
>
>
>
>
>
> hi,  Qijun Feng:
>
>
> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-03 09:27:52,"LakeShen"  写道:
> >Hi Qijun,
> >
> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
> >
> >Best,
> >LakeShen
> >
> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
> >
> >> Dear All,
> >>
> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >>  现在改成了所有地址,也换了 group.id
> >>
> >>
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >>
> >> FlinkKafkaConsumer010 kafkaConsumer010 =
> >>new FlinkKafkaConsumer010("behavior-logs_dev", new
> >> BehaviorLogDeserializationSchema(), properties);
> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
> >>
> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
> 2
> >> 的,
> >>
> >> 2020-04-02 14:54:58,532 INFO
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> Consumer subtask 0 creating fetcher with offsets
> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >>
> >>
> >> 是哪里有问题吗?
> >>
> >>
>


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 amen...@163.com
是的,正如@chenxuying 和@zhisheng 所说,

我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf 
jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~

best,
amenhub



 
发件人: zhisheng
发送时间: 2020-10-22 23:28
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
hi
 
flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持
 
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs
 
Best
zhisheng
 
Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:
 
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


请问Oracle作为维表注册到flinksql环境怎么支持?

2020-10-22 文章 Bruce
你好,我看目前jdbc 
connector仅支持mysql,postgresql可以注册到flinksql,我想把Oracle维表注册进去,怎样扩展connector可以实现呢?

发自我的iPhone

Re: Re: Flink-1.11.1 Rest API使用

2020-10-22 文章 amen...@163.com
还真是不支持,多谢解惑~



 
发件人: Peidian Li
发送时间: 2020-10-22 19:13
收件人: user-zh
主题: Re: Flink-1.11.1 Rest API使用
Yarn 的proxy server不支持POST请求,这是前几天yarn同事给我截的图:
我们改了下proxy server的逻辑来支持POST请求就可以了


一个stop with savepoint例子:
http://zjy-hadoop-prc-ct11.bj:21001/proxy/application_1600936402499_375893/jobs/790e4740baa52b43c0ceb9a5cdaf6135/stop?proxyapproved=true

Request body:
{
"drain" : true,
"targetDirectory" : "hdfs://zjyprc-hadoop/user/s_flink_tst/checkpoints4"
}

Response:
{
"request-id": "69416efc4538f56759f77a3001c38ff8"
}

2020年10月22日 下午2:30,Husky Zeng <568793...@qq.com> 写道:

其他接口大多不是post类型,你要修改成get或者其他的。可以先仔细阅读一下你发的这个页面上面的介绍,看看部署有没有出错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re:FlinkSQL 窗口使用问题

2020-10-22 文章 hailongwang
Hi Roc,
这边涉及到 order by 和 limit 一起使用时如何执行的问题。
1. 只对找到 limit n 的数据后,然后进行 order by,并不是对所有的数据;
2. 对所有的数据进行 order by 后,再 limit;
目前看 flink 对 `StreamExecSortLimit` 只保证输出 limit n,但是不保证输出的 limit n 是排序的。
如果业务允许的话,可以在 limit 后面加个 offset,这样可以使用 `emitRecordsWithRowNumber`,保证 limit n 最后是 
order by 的。
个人觉得 应该将 `StreamExecSortLimit` 的 `outputRankNumber` 设置为true。
不知道理解的对不对,CC @Jark @ godfrey




Best,
Hailong Wang




在 2020-10-22 10:09:09,"Roc Marshal"  写道:
>Hi,
>
>
>
>
>SELECT
>
>TUMBLE_START(ts, INTERVAL '1' day) as window_start,
>
>TUMBLE_END(ts, INTERVAL '1' day) as window_end,
>
>c1,
>
>sum(c2) as sc2
>
>FROM sourcetable
>
>GROUP BY TUMBLE(ts, INTERVAL '1' day), c1
>
>ORDER BY window_start, sc2 desc limit 10
>
>
>这个sql希望能够以一天为窗口(翻滚)进行计算窗口  
>按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。
>能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪?
>
>
>谢谢!
>
>
>Best Roc.


Re:Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 Lynn Chen






hi,  Qijun Feng:


我也遇到了类似的问题, 请问您后来是怎么解决的哈?

















在 2020-04-03 09:27:52,"LakeShen"  写道:
>Hi Qijun,
>
>看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>
>Best,
>LakeShen
>
>Qijun Feng  于2020年4月2日周四 下午5:44写道:
>
>> Dear All,
>>
>> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>>  现在改成了所有地址,也换了 group.id
>>
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> 10.216.77.170:9092,10.216.77.188:9092");
>> properties.setProperty("group.id", "behavior-logs-aggregator");
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 =
>>new FlinkKafkaConsumer010("behavior-logs_dev", new
>> BehaviorLogDeserializationSchema(), properties);
>> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>>
>> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
>> 的,
>>
>> 2020-04-02 14:54:58,532 INFO
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> Consumer subtask 0 creating fetcher with offsets
>> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>>
>>
>> 是哪里有问题吗?
>>
>>


Re:使用Flink Table API & SQL编写流应用,SQL中的NOW()等时间函数如何理解

2020-10-22 文章 hailongwang
 Hi Longdexin,
根据文档[1]描述,now 函数是非确定性的,意思是不会在 RelNode 优化阶段将其 
常量折叠优化掉,所以这个函数是会不断更新的,并不是启动的时间,并且一直不变。
在自定义 UDF 时候,可以覆盖方法
`default boolean isDeterministic
` 来决定是确定性的还是非确定性的,默认是true。




[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions



Best,
Hailong Wang

在 2020-10-22 19:13:03,"Longdexin" <274522...@qq.com> 写道:
>请问,当流应用运行起来后,随着时间的推移,比如,到第二天了,SQL中的NOW()会随着处理时间不断更新,从而保证处理逻辑的正确性吗?在我的理解中,在流应用启动的那一刻,NOW()的值就确定了,以后也不会再改变了,那么,使用什么方式可以让SQL中的时间比较逻辑与时俱进呢?非常感谢。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 zhisheng
hi

flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持

 [1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs

Best
zhisheng

Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:

>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 文章 zhisheng
Hi Robin:

1、是不是更改了刷新时间?一直不显示吗?

2、running 的作业不会显示的,你可以之间在 yarn 查看,history server 应该是只提供展示挂掉的作业

PS:另外提几个 history server 的问题

1、挂掉的作业展示能否支持分页呢?目前直接在一个页面全部展示了历史所有的作业,打开会很卡

2、有办法可以查看挂掉作业的 jm 和 tm 日志吗?因为 HDFS
其实是有日志,按道理是可以拿到日志信息然后解析展示出来的,Spark history server 也是可以查看挂掉作业的日志


Best!
zhisheng

Robin Zhang  于2020年10月22日周四 下午6:11写道:

>
> 如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
> api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
> 使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
> job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
>
> ,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t447/%E5%8E%86%E5%8F%B2%E6%9C%8D%E5%8A%A1%E5%99%A8.png>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


使用Flink Table API & SQL编写流应用,SQL中的NOW()等时间函数如何理解

2020-10-22 文章 Longdexin
请问,当流应用运行起来后,随着时间的推移,比如,到第二天了,SQL中的NOW()会随着处理时间不断更新,从而保证处理逻辑的正确性吗?在我的理解中,在流应用启动的那一刻,NOW()的值就确定了,以后也不会再改变了,那么,使用什么方式可以让SQL中的时间比较逻辑与时俱进呢?非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 Xingbo Huang
Hi,

从源码编译安装把。可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink

Best,
Xingbo

whh_960101  于2020年10月22日周四 下午6:47写道:

> 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo"  写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>


Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 文章 Robin Zhang

如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。


 





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗

2020-10-22 文章 Danny Chan
你可以了解下 Calcite 的 metadata 系统,其中有一个 metadata: RelMdColumnOrigins 可以拿到 column 
的血缘,前提是你要拿到 SQL 对的关系表达式树。

Best,
Danny Chan
在 2020年10月20日 +0800 PM8:43,dawangli ,写道:
> 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗


Re: flink sql 写入hive问题

2020-10-22 文章 Jingsong Li
writer的并行度是根据上游并行度来的

committer的并行度才是1

On Thu, Oct 22, 2020 at 5:22 PM 酷酷的浑蛋  wrote:

> 我用flink sql实时写入hive表时发现sink的并行度为1?
> 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?  并行度1的写入速度很慢
>
>
>
>

-- 
Best, Jingsong Lee


flink sql 写入hive问题

2020-10-22 文章 酷酷的浑蛋
我用flink sql实时写入hive表时发现sink的并行度为1? 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?  
并行度1的写入速度很慢





Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?

















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>
> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>  TABLE myUserTable (
>   user_id STRING,
>   user_name STRING
>   uv BIGINT,
>   pv BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://localhost:9200',
>   'index' = 'users'
> );Connector Options
> | Option | Required | Default | Type | Description |
> |
> connector
> | required | (none) | String | Specify what connector to use, valid values 
> are:
> elasticsearch-6: connect to Elasticsearch 6.x cluster
> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> |
> |
> hosts
> | required | (none) | String | One or more Elasticsearch hosts to connect to, 
> e.g. 'http://host_name:9092;http://host_name:9093'. |
> |
> index
> | required | (none) | String | Elasticsearch index for every record. Can be a 
> static index (e.g. 'myIndex') or a dynamic index (e.g. 
> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for more 
> details. |
> |
> document-type
> | required in 6.x | (none) | String | Elasticsearch document type. Not 
> necessary anymore in elasticsearch-7. |
> |
> document-id.key-delimiter
> | optional | _ | String | Delimiter for composite keys ("_" by default), 
> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> |
> failure-handler
> | optional | fail | String | Failure handling strategy in case a request to 
> Elasticsearch fails. Valid strategies are:
> fail: throws an exception if a request fails and thus causes a job failure.
> ignore: ignores failures and drops the request.
> retry_rejected: re-adds requests that have failed due to queue capacity 
> saturation.
> custom class name: for failure handling with a ActionRequestFailureHandler 
> subclass.
> |
> |
> sink.flush-on-checkpoint
> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
> sink will not wait for all pending action requests to be acknowledged by 
> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
> guarantees for at-least-once delivery of action requests. |
> |
> sink.bulk-flush.max-actions
> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
> request. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.max-size
> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
> per bulk request. Must be in MB granularity. Can be set to '0' to disable it. 
> |
> |
> sink.bulk-flush.interval
> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
> allowing for complete async processing of buffered actions. |
> |
> sink.bulk-flush.backoff.strategy
> | optional | DISABLED | String | Specify how to perform retries if any flush 
> actions failed due to a temporary request error. Valid strategies are:
> DISABLED: no retry performed, i.e. fail after the first request error.
> CONSTANT: wait for backoff delay between retries.
> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
> between retries.
> |
> |
> sink.bulk-flush.backoff.max-retries
> | optional | 8 | Integer | Maximum number of backoff retries. |
> |
> sink.bulk-flush.backoff.delay
> | optional | 50ms | Duration | Delay between each backoff attempt. For 
> CONSTANT backoff, this is simply the delay between each retry. For 
> EXPONENTIAL backoff, this is the initial base delay. |
> |
> connection.max-retry-timeout
> | optional | (none) | Duration | Maximum timeout between retries. |
> |
> connection.path-prefix
> | optional | (none) | String | Prefix string to be added to every REST 
> communication, e.g., '/v1' |
> |
> format
> | optional | json | String | Elasticsearch connector supports to specify a 
> format. The format must produce a valid json document. By default uses 
> built-in 'json' format. Please refer to JSON Format page for more details. |
>
>
>
>
>
>
>


pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
 TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);Connector Options
| Option | Required | Default | Type | Description |
|
connector
| required | (none) | String | Specify what connector to use, valid values are:
elasticsearch-6: connect to Elasticsearch 6.x cluster
elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
|
|
hosts
| required | (none) | String | One or more Elasticsearch hosts to connect to, 
e.g. 'http://host_name:9092;http://host_name:9093'. |
|
index
| required | (none) | String | Elasticsearch index for every record. Can be a 
static index (e.g. 'myIndex') or a dynamic index (e.g. 
'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for more 
details. |
|
document-type
| required in 6.x | (none) | String | Elasticsearch document type. Not 
necessary anymore in elasticsearch-7. |
|
document-id.key-delimiter
| optional | _ | String | Delimiter for composite keys ("_" by default), e.g., 
"$" would result in IDs "KEY1$KEY2$KEY3"." |
|
failure-handler
| optional | fail | String | Failure handling strategy in case a request to 
Elasticsearch fails. Valid strategies are:
fail: throws an exception if a request fails and thus causes a job failure.
ignore: ignores failures and drops the request.
retry_rejected: re-adds requests that have failed due to queue capacity 
saturation.
custom class name: for failure handling with a ActionRequestFailureHandler 
subclass.
|
|
sink.flush-on-checkpoint
| optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink 
will not wait for all pending action requests to be acknowledged by 
Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
guarantees for at-least-once delivery of action requests. |
|
sink.bulk-flush.max-actions
| optional | 1000 | Integer | Maximum number of buffered actions per bulk 
request. Can be set to '0' to disable it. |
|
sink.bulk-flush.max-size
| optional | 2mb | MemorySize | Maximum size in memory of buffered actions per 
bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
|
sink.bulk-flush.interval
| optional | 1s | Duration | The interval to flush buffered actions. Can be set 
to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
allowing for complete async processing of buffered actions. |
|
sink.bulk-flush.backoff.strategy
| optional | DISABLED | String | Specify how to perform retries if any flush 
actions failed due to a temporary request error. Valid strategies are:
DISABLED: no retry performed, i.e. fail after the first request error.
CONSTANT: wait for backoff delay between retries.
EXPONENTIAL: initially wait for backoff delay and increase exponentially 
between retries.
|
|
sink.bulk-flush.backoff.max-retries
| optional | 8 | Integer | Maximum number of backoff retries. |
|
sink.bulk-flush.backoff.delay
| optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT 
backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, 
this is the initial base delay. |
|
connection.max-retry-timeout
| optional | (none) | Duration | Maximum timeout between retries. |
|
connection.path-prefix
| optional | (none) | String | Prefix string to be added to every REST 
communication, e.g., '/v1' |
|
format
| optional | json | String | Elasticsearch connector supports to specify a 
format. The format must produce a valid json document. By default uses built-in 
'json' format. Please refer to JSON Format page for more details. |