stream sink hive 在hdfs ha模式下

2020-08-02 文章 air23
hi 你好
  我这边集群是cdh的。 配置了hdfs ha模式
 在使用 kafka sink 到hive 时候找不到nameservices
java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservices1


请问 在ha模式下 应该怎么配置

Re: pyflink 消费kafka protobuf数据

2020-08-02 文章 Benchao Li
Hi,

Flink暂时还没有内置的protobuf format,社区正在讨论实现一个protobuf format[1],预期在1.12来支持。

目前来讲,你可以考虑自定义一个format,或者直接定义一个没有任何解析直接转发byte[] 数据的format,
然后用UDF来解析。

[1] https://issues.apache.org/jira/browse/FLINK-18202

stephenlee <871826...@qq.com> 于2020年7月31日周五 下午10:27写道:

> hi,各位大佬好:
> 我是flink新手,我想问一下如何使用pyflink 消费kafka protobuf数据?我试了当做string
> 读取没有成功,查了下官方的pyflink文档,没有找到相关资料。还望大佬们帮忙看看
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


flink??ScalarFunction????????

2020-08-02 文章 ??????
  
flinkudf??ScalarFunctionkafkaScalarFunction??udf1??flinkkafka??flinkudf1kafka??flink??udf1??udf1flink
  

Re: flink 1.11.0 conenctor-jdbc 报错

2020-08-02 文章 song wang
对,就是这个原因

Leonard Xu  于2020年8月3日周一 上午10:26写道:

> Hi
>
> > 在 2020年8月3日,10:16,song wang  写道:
> >
> > 查询 integer
>
> 如果MySQL中数据类型是 INT UNSIGNED,Flink 中 对应的类型是 BIGINT,
> 你检查下是不是这个原因,类型映射可以参考[1]
>
> Best
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping
> >


Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 chenxuying
谢谢, 明白了








在 2020-08-03 10:42:53,"Leonard Xu"  写道:
>如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG 
>都支持 upsert, 底层对应的sql语句是
>
>Database   Upsert Grammar
>MySQL  INSERT .. ON DUPLICATE KEY UPDATE ..
>PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
>
>MySQL connector 不支持 replace into, 用的是 on duplicate key update.
>
>祝好
>Leonard 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes
> 
>
>
>
>> 在 2020年8月3日,10:33,chenxuying  写道:
>> 
>> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
>> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
>> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update 
>> , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>>> Hi,
>>> 
>>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>>> 支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  
>>> INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>>> OVERWRITE到DB的场景吗?
>>> 
>>> Best
>>> Leonard
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>>  
>>> 
>>> 
 在 2020年8月1日,19:20,chenxuying  写道:
 
 Hello
 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
 overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
 Exception in thread "main" org.apache.flink.table.api.ValidationException: 
 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
 SupportsOverwrite interface.
 是得自定义connector吗,实现DynamicTableSink?
 
 
 祝好
 chenxuying
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>>> 
>


Flink 1.11.1 消费带SASL的Kafka报错: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config

2020-08-02 文章 RS
Hi, 
我尝试消费SASL机制的Kafka集群


jaas.conf 文件内容:
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin001"
  password="123456";
};


执行命令如下:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf"
./bin/sql-client.sh embedded



CREATE TABLE t1

(vendor STRING)

WITH (

'connector' = 'kafka',

'topic' = 'test',

'properties.bootstrap.servers' = '127.0.0.1:9092',

'properties.group.id' = 'g1',

'properties.sasl.mechanisms'='PLAIN',

'properties.sasl.username'='admin001',

'properties.sasl.password'='123456',

'properties.security.protocol'='SASL_PLAINTEXT',

'format' = 'json',

'scan.startup.mode' = 'earliest-offset',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

);



然后报错提示:
Flink SQL> select * from t1;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: No serviceName defined in either JAAS or 
Kafka config


请教下, 这个该如何解决?


Thx



UDF:Type is not supported: ANY

2020-08-02 文章 zilong xiao
最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
supported:
ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
udf应该怎么操作呢?求前辈指导

udfd代码如下:

public class Json2List extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
  .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;

   public Json2List(){}

   public List eval(String param) {
  List result = new ArrayList<>();
  try {
 List> list =
OBJECT_MAPPER.readValue(param, List.class);
 for(Map map : list){
result.add(OBJECT_MAPPER.writeValueAsString(map));
 }
 return result;
  } catch (JsonProcessingException e){
 LOG.error("failed to convert json to array, param is: {}", param, e);
  }
  return result;
   }


   @Override
   public TypeInformation> getResultType(Class[] signature) {
  return Types.LIST(Types.STRING);
   }

}


Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 chenxuying
你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 
我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持

















在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>Hi,
>
>这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
>OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>OVERWRITE到DB的场景吗?
>
>Best
>Leonard
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> 
>
>
>> 在 2020年8月1日,19:20,chenxuying  写道:
>> 
>> Hello
>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>> SupportsOverwrite interface.
>> 是得自定义connector吗,实现DynamicTableSink?
>> 
>> 
>> 祝好
>> chenxuying
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>


Re: flink 1.11.0 conenctor-jdbc 报错

2020-08-02 文章 Leonard Xu
Hi

> 在 2020年8月3日,10:16,song wang  写道:
> 
> 查询 integer

如果MySQL中数据类型是 INT UNSIGNED,Flink 中 对应的类型是 BIGINT,
你检查下是不是这个原因,类型映射可以参考[1]

Best
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping
 


Re: 数据预览

2020-08-02 文章 godfrey he
如果你想在client端拿到query的结果做preview的话,目前API层面支持直接collect或者print执行结果,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#execute-a-query

Jeff Zhang  于2020年8月1日周六 下午11:01写道:

> Apache Zeppelin有自己的rest api,你可以用rest api来提交flink sql
> 以及拿sql结果,目前Zeppelin社区正在做一个Client API (Zeppelin SDK),
> 用户可以更加方便的调用Zeppelin的功能。具体可以参考
> https://issues.apache.org/jira/browse/ZEPPELIN-4981
>
> 这里有Sample code 可以参考
>
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L298
>
> 对于Flink on Zeppelin感兴趣的,可以加入钉钉群:32803524
>
>
>
> forideal  于2020年8月1日周六 下午7:49写道:
>
> > 你好,我的朋友
> >
> >
> >最近我看 Flink doc 中的文档中有了如下 connector
> >   DataGen
> >   Print
> >   BlackHole
> >这大大的方便了开发和调试。不过,我还是不太满足,想了解一下数据预览相关的做法。
> >比如我想,如果我有一个 Flink 的 `driver` ,然后,我使用这个 driver 提交一条 SQL,我从
> ResultSet
> > 中获取数据。这样又可以大大的方面我们的 Flink SQL 开发者。
> >在社区中,我已经体验了 Apache Zeppelin ,他可以让我提交 Flink SQL,然后在页面上面等待刷新的结果,但是
> > Zeppelin 目前不能很好的集成到我们的 Flink web IDE 中。想了解一下如何实现数据预览。
> >
> >
> >Best forideal
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 文章 Shuiqiang Chen
Hi jincheng,

Thanks for the discussion. +1 for the FLIP.

A well-organized documentation will greatly improve the efficiency and
experience for developers.

Best,
Shuiqiang

Hequn Cheng  于2020年8月1日周六 上午8:42写道:

> Hi Jincheng,
>
> Thanks a lot for raising the discussion. +1 for the FLIP.
>
> I think this will bring big benefits for the PyFlink users. Currently, the
> Python TableAPI document is hidden deeply under the TableAPI tab which
> makes it quite unreadable. Also, the PyFlink documentation is mixed with
> Java/Scala documentation. It is hard for users to have an overview of all
> the PyFlink documents. As more and more functionalities are added into
> PyFlink, I think it's time for us to refactor the document.
>
> Best,
> Hequn
>
>
> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
> wrote:
>
>> Hi, Jincheng!
>>
>> Thanks for creating this detailed FLIP, it will make a big difference in
>> the experience of Python developers using Flink. I'm interested in
>> contributing to this work, so I'll reach out to you offline!
>>
>> Also, thanks for sharing some information on the adoption of PyFlink, it's
>> great to see that there are already production users.
>>
>> Marta
>>
>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>>
>> > Hi Jincheng,
>> >
>> > Thanks a lot for bringing up this discussion and the proposal.
>> >
>> > Big +1 for improving the structure of PyFlink doc.
>> >
>> > It will be very friendly to give PyFlink users a unified entrance to
>> learn
>> > PyFlink documents.
>> >
>> > Best,
>> > Xingbo
>> >
>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>> >
>> >> Hi Jincheng,
>> >>
>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
>> >> improve the Python API doc.
>> >>
>> >> I have received many feedbacks from PyFlink beginners about
>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>> mixed
>> >> with the Java doc and it's not easy to find the docs he wants to know.
>> >>
>> >> I think it would greatly improve the user experience if we can have one
>> >> place which includes most knowledges PyFlink users should know.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>> >>
>> >> Hi folks,
>> >>
>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>> grow.
>> >> As far as I know there are many companies have used PyFlink for data
>> >> analysis, operation and maintenance monitoring business has been put
>> into
>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
>> to
>> >> the feedback we received, current documentation is not very friendly to
>> >> PyFlink users. There are two shortcomings:
>> >>
>> >> - Python related content is mixed in the Java/Scala documentation,
>> which
>> >> makes it difficult for users who only focus on PyFlink to read.
>> >> - There is already a "Python Table API" section in the Table API
>> document
>> >> to store PyFlink documents, but the number of articles is small and the
>> >> content is fragmented. It is difficult for beginners to learn from it.
>> >>
>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
>> >> documents will be added for those new APIs. In order to increase the
>> >> readability and maintainability of the PyFlink document, Wei Zhong and
>> me
>> >> have discussed offline and would like to rework it via this FLIP.
>> >>
>> >> We will rework the document around the following three objectives:
>> >>
>> >> - Add a separate section for Python API under the "Application
>> >> Development" section.
>> >> - Restructure current Python documentation to a brand new structure to
>> >> ensure complete content and friendly to beginners.
>> >> - Improve the documents shared by Python/Java/Scala to make it more
>> >> friendly to Python users and without affecting Java/Scala users.
>> >>
>> >> More detail can be found in the FLIP-133:
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>> >>
>> >>
>> >>
>>
>


Re: RocksDBKeyedStateBackend如何写磁盘

2020-08-02 文章 jun su
hi congxian,

感谢回复 , 我会再调查下 , 多谢

Congxian Qiu  于2020年8月2日周日 下午2:11写道:

> Hi  jiafu
> RocksDB 刷磁盘是它自己负责的(writebuffer 满了,就会 flush 到磁盘,具体的可以看下 RocksDB 的文档),另外在
> snapshot 的时候会 flush 一次 writebuffer 来保证一致性。
> 你上面截图的是 Flink 中 wrapper 的一个 write batch 操作,这个方法的含义是,积攒一批 操作 再去写
> rocksdb,和 rocksdb 刷磁盘还不是一回事。
> Best,
> Congxian
>
>
> jun su  于2020年7月31日周五 下午4:57写道:
>
> > hi,
> >
> > 看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?
> >
> >  private void flushIfNeeded() throws RocksDBException {
> > boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
> > getDataSize() >= batchSize);
> > if (needFlush) {
> > flush();
> > }
> > }
> >
> > batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置
> >
> > jiafu <530496...@qq.com> 于2020年7月31日周五 下午4:41写道:
> >
> > >
> > >
> >
> writerbuffer写满会flush到磁盘,checkpoint启动的时候会有一次snapshot过程,会让rocksdb做checkpoint,然后将数据刷到磁盘形成sst文件。
> > >
> > >
> > >
> > >
> > > --原始邮件--
> > > 发件人:
> > >   "user-zh"
> > > <
> > > sujun891...@gmail.com;
> > > 发送时间:2020年7月31日(星期五) 下午4:37
> > > 收件人:"user-zh" > >
> > > 主题:RocksDBKeyedStateBackend如何写磁盘
> > >
> > >
> > >
> > > hi all,
> > >
> > > 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢
> > >
> > > --
> > > Best,
> > > Jun Su
> >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


??????????: Re: Flink????Kafka??Mysql?? End-To-End Exactly-Once????????????

2020-08-02 文章 ??????
MySQL??Connection??
??Connection A T ??commit()Connection 
A??A TwoPhaseCommitSinkFunction 
pendingCommitTransactions??Connection 
BcommitT


??MySQL??2PC??





----
??: 
   "user-zh"

https://github.com/lusecond/flink_help --depth=1
 gt;
 gt;
 gt; 
TwoPhaseCommitSinkFunction4??beginTransaction??preCommit??commit??abort
 gt; 
jdbc

Re: RocksDBKeyedStateBackend如何写磁盘

2020-08-02 文章 Congxian Qiu
Hi  jiafu
RocksDB 刷磁盘是它自己负责的(writebuffer 满了,就会 flush 到磁盘,具体的可以看下 RocksDB 的文档),另外在
snapshot 的时候会 flush 一次 writebuffer 来保证一致性。
你上面截图的是 Flink 中 wrapper 的一个 write batch 操作,这个方法的含义是,积攒一批 操作 再去写
rocksdb,和 rocksdb 刷磁盘还不是一回事。
Best,
Congxian


jun su  于2020年7月31日周五 下午4:57写道:

> hi,
>
> 看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?
>
>  private void flushIfNeeded() throws RocksDBException {
> boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
> getDataSize() >= batchSize);
> if (needFlush) {
> flush();
> }
> }
>
> batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置
>
> jiafu <530496...@qq.com> 于2020年7月31日周五 下午4:41写道:
>
> >
> >
> writerbuffer写满会flush到磁盘,checkpoint启动的时候会有一次snapshot过程,会让rocksdb做checkpoint,然后将数据刷到磁盘形成sst文件。
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > sujun891...@gmail.com;
> > 发送时间:2020年7月31日(星期五) 下午4:37
> > 收件人:"user-zh" >
> > 主题:RocksDBKeyedStateBackend如何写磁盘
> >
> >
> >
> > hi all,
> >
> > 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢
> >
> > --
> > Best,
> > Jun Su
>
>
>
> --
> Best,
> Jun Su
>