Re: flink-1.10-sql 维表问题

2020-04-15 文章 Zhenghua Gao
JDBC connector 支持作为维表,DDL无需特殊字段指定。部分可选的参数可以控制temporary join行为[1]。
用作维表join时,需要使用特殊的join语法 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins

*Best Regards,*
*Zhenghua Gao*


On Wed, Apr 15, 2020 at 7:48 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:

> hi 大家
> 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀?
>
>
>
> guaishushu1...@163.com
>


Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 文章 Zhenghua Gao
有两个参数可以控制cache大小和cache失效时间 [1],你可以在性能和准确性上做权衡

  -- lookup options, optional, used in temporary join
  'connector.lookup.cache.max-rows' = '5000', -- optional, max number
of rows of lookup cache, over this value, the oldest rows will
  -- be eliminated.
"cache.max-rows" and "cache.ttl" options must all be specified if any
  -- of them is specified.
Cache is not enabled as default.
  'connector.lookup.cache.ttl' = '10s', -- optional, the max time to
live for each rows in lookup cache, over this time, the oldest rows
-- will be expired.
"cache.max-rows" and "cache.ttl" options must all be specified if any
of
-- them is specified. Cache is
not enabled as default.
  'connector.lookup.max-retries' = '3', -- optional, max retry times
if lookup database failed

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector

*Best Regards,*
*Zhenghua Gao*


On Wed, Apr 15, 2020 at 11:28 AM Dino Zhang 
wrote:

> 可以考虑调小cache.ttl
>
> On Wed, Apr 15, 2020 at 11:22 AM tingli ke  wrote:
>
> > 是否有其他的方式来对mysql维表数据进行实时join
> >
> >
> > 13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
> >
> > > 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> > > org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> > > The cacheMaxSize is -1 means not use cache
> > >
> > >
> > >
> > > 13122260...@163.com
> > >
> > > 发件人: tingli ke
> > > 发送时间: 2020-04-15 10:55
> > > 收件人: user-zh
> > > 主题: JDBCLookupFunction被缓存导致数据的不及时性
> > > Hi,
> > >
> > >
> >
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> > > 是否有其他的方式来对mysql维表数据进行实时join
> > >
> >
>
>
> --
> Regards,
> DinoZhang
>


Re: Flink 1.10 JSON 解析

2020-03-26 文章 Zhenghua Gao
Hi 张宇

看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。
开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 20, 2020 at 5:28 PM 宇张  wrote:

> hi,
> 了解了,我重新整理一下:
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test")
> .startFromEarliest()
> .property("zookeeper.connect",
> "localhost:2181")
> .property("bootstrap.servers",
> "localhost:9092")
> )
> .withFormat(
> new Json()
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ARRAY(
> DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
> DataTypes.FIELD("vendor_id",
> DataTypes.DOUBLE()),
> DataTypes.FIELD("status",
> DataTypes.BIGINT()),
> DataTypes.FIELD("create_time",
> DataTypes.BIGINT()),
> DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no",
> DataTypes.STRING()),
> DataTypes.FIELD("parent_id",
> DataTypes.BIGINT()
> .field("database", DataTypes.STRING())
> .field("old",
> DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
> DataTypes.DECIMAL(38,18)
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.BIGINT())
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.BIGINT())
> )
> .createTemporaryTable("Test");
> 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
> 异常:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY> of table field 'old'
> does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at
> org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
>
>

Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 文章 Zhenghua Gao
Hi Jark,

这里的确是有问题的。
目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.

*Best Regards,*
*Zhenghua Gao*


On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:

> Thanks for reporting this Weike.
>
> 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> 其他的一些数据库也都差不多:mysql [2], oracle[3]
>
> Best,
> Jark
>
> [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> [2]:
>
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> [3]:
>
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
>
>
>
> On Tue, 24 Mar 2020 at 17:00, DONG, Weike  wrote:
>
> > Hi Zhenghua,
> >
> > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> >
> > Best,
> > Weike
> >
> > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
> >
> > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > 其语义可参考 java.time.LocalDateTime。
> > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > >
> > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > time_zone_to_string)
> > >
> > > *Best Regards,*
> > > *Zhenghua Gao*
> > >
> > >
> > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > >
> > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > UTC+0
> > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > >
> > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> > > Flink
> > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > >
> > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > >
> > > > 仅仅是个人一点想法,感谢 :)
> > > >
> > >
> >
>


Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-25 文章 Zhenghua Gao
Hi Chief,

目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。
InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。
如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。
你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 3:31 PM Chief  wrote:

> hiJun Zhang
> 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件?
>
>
>
>
>
> --原始邮件--
> 发件人:"Jun Zhang" 发送时间:2020年3月25日(星期三) 上午9:08
> 收件人:"user-zh"
> 主题:Re: 关于flink sql 1.10 source并行度自动推断的疑问
>
>
>
> hi,Chief:
>
>
> 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
> 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。
>
> Kurt Young 
>  你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
> 
>  Best,
>  Kurt
> 
> 
>  On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:
> 
>   hi all:
>   之前用flink sql查询hive的数据,hive的数据文件是150个,sql
>   client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
>   ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
> 


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-25 文章 Zhenghua Gao
请确认一下 kafka connector 的jar包是否在 flink/lib 下。
目前的报错看起来是找不到kafka connector的jar包。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:

> 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>
>
> 
>
> 参考下这个文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> 下面的语法应该是不支持的:
>   'format.type' = 'csv',\n" +
> "'format.field-delimiter' = '|'\n"
>
> 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
> tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
> + "order_no VARCHAR,\n"
> + "status INT\n"
> + ") WITH (\n"
> + "'connector.type' = 'kafka',\n"
> + "'connector.version' = 'universal',\n"
> + "'connector.topic' = 'wanglei_test',\n"
> + "'connector.startup-mode' = 'latest-offset',\n"
> + "'connector.properties.0.key' = 'zookeeper.connect',\n"
> + "'connector.properties.0.value' = 'xxx:2181',\n"
> + "'connector.properties.1.key' = 'bootstrap.servers',\n"
> + "'connector.properties.1.value' = 'xxx:9092',\n"
> + "'update-mode' = 'append',\n"
> + "'format.type' = 'json',\n"
> + "'format.derive-schema' = 'true'\n"
> + ")");
>
> 王磊
>
>
> wangl...@geekplus.com.cn
> 发件人: 赵峰
> 发送时间: 2020-03-24 21:28
> 收件人: user-zh
> 主题: Flink JDBC Driver是否支持创建流数据表
> hi
>
> Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
> Connection connection =
> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
> Statement statement = connection.createStatement();
> statement.executeUpdate(
> "CREATE TABLE table_kafka (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> "proctime as PROCTIME(),\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka', \n" +
> "'connector.version' = 'universal', \n" +
> "'connector.topic' = 'flink_im02', \n" +
> "'connector.properties.group.id' = 'flink_im02_new',\n" +
> "'connector.startup-mode' = 'earliest-offset', \n" +
> "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
> "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
> "'format.type' = 'csv',\n" +
> "'format.field-delimiter' = '|'\n" +
> ")");
> ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
> while (rs1.next()) {
> System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
> }
> statement.close();
> connection.close();
> 报错:
> Reason: Required context properties mismatch.
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
> 赵峰
>
> 
> Quoted from:
> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>
>
>
>
> 赵峰


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 文章 Zhenghua Gao
CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
其语义可参考 java.time.LocalDateTime。
其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。

你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
time_zone_to_string)

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
wrote:

> Hi,
>
> 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> 做时间格式化为字符串时,默认以 UTC+0 为准。
>
> 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 UTC+0
> 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
>
> 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么 Flink
> 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
>
> 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
>
> 仅仅是个人一点想法,感谢 :)
>


Re: FLINK SQL中时间戳怎么处理处理

2020-03-23 文章 Zhenghua Gao
你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types
[1]。
目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446
[2]
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 4:27 PM 吴志勇 <1154365...@qq.com> wrote:

> 如题:
> 我向kafka中输出了json格式的数据
> {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
>
> 
> 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
> -name:bookpojo
> type:source-table
> connector:
> property-version:1
> type:kafka
> version:"universal"
> topic:pojosource
> startup-mode:earliest-offset
> properties:
>
> zookeeper.connect:localhost:2181
>
> bootstrap.servers:localhost:9092
> group.id:testGroup
> format:
> property-version:1
> type:json
>
> schema:"ROW schema:
> -name:id
> data-type:INT
> -name:type
> data-type:STRING
> -name:price
> data-type:INT
> -name:timestamp
>
> data-type:TIMESTAMP(3)
>
>
>
>
> 上述配置,好像有问题。
>
>
> 我在官网中找到这样一句说明:
> 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java
> SQL时间格式进行格式化,并以毫秒为单位。例如:2018-01-01日期,20:43:59时间和2018-01-01
> 20:43:59.999时间戳。
> 时间一定得是字符串类型且带毫秒吗?
>
>
> 谢谢。


Re: checkpoint 代码中如何设置TTL

2020-03-12 文章 Zhenghua Gao
DataStream 用户可以直接在StateDescriptor中设置TTL [1]
详见文档中的代码。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl

*Best Regards,*
*Zhenghua Gao*


On Thu, Mar 12, 2020 at 9:25 PM 潘明文  wrote:

> 您好,
>checkpoint  基于RocksDBStateBackend的如何设置TTL?


Re: FlinkSQL中关于TIMESTAMPDIFF函数官网E.g的疑问

2019-12-16 文章 Zhenghua Gao
1) ML里直接发截图无法展示,可以用第三方图床,然后链接过来。
2) 请确认 time1/time2 类型是否是 TIMESTAMP
3) 文档中的 TIMESTAMP '2003-01-02 10:00:00' 代表标准SQL的时间常量(timestamp literal),你的
TIMESTAMP time1 无法被视作时间常量。

*Best Regards,*
*Zhenghua Gao*


On Mon, Dec 16, 2019 at 3:51 PM 1530130567 <1530130...@qq.com> wrote:

> 各位大佬好:
>由于业务处理逻辑需要计算两个日期的时间差,按照惯例打开官网,查看buildin Functions,ctrC, ctrV,跑起来没问题!
>
>   然后我就改了一下:
>   TIMESTAMPDIFF(DAY, TIMESTAMP time1,TIMESTAMP time2)
>   SQL validate报错!
>   然后我又改了一下:
>   TIMESTAMPDIFF(DAY, cast(time1 as timestamp),cast(time2 as timestamp))
>   通过了!
>
>   我有点疑惑,TIMESTAMP是不是只适用于固定的字符串,而不能用列名这种变量?
>
>


Re: flink on yarn,提交方式是per job的话,如何保证高可用?

2019-08-13 文章 Zhenghua Gao
JM is restarted by YARN on failure [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html#yarn-cluster-high-availability

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 13, 2019 at 4:51 PM 陈帅  wrote:

> 请教一下:flink on yarn,提交方式是per job的话,如何保证高可用?
>


Re: 如何获取Flink table api/sql code gen 代码

2019-08-08 文章 Zhenghua Gao
Currently Flink DO NOT provides a direct way to get code gen code. But
there are indirect ways to try.
1) debug in IDE
Flink use Janino to compile all code gen code, and there is a single entry
point [1]
<https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java>
for
Blink planner, [2]
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/Compiler.scala>
for
old planner, you can set breakpoint there and get the code.

2) enable debug logging
Blink planner logging code in CompileUtils, and old planner logging code in
subclass of Compiler

3) use Janino options
Janino caches code in tmp directory, and you can enable these options[3]
<https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/Scanner.java#L71>.
Note: org.codehaus.janino.source_debugging.keep is not supported in current
Janino version, which means this method can only be used to debug in
IDE(need breakpoint to keep source code)

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
[3]
https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/Scanner.java#L71

*Best Regards,*
*Zhenghua Gao*


On Wed, Aug 7, 2019 at 12:02 AM Vincent Cai  wrote:

> Hi all,
> 在Spark中,可以通过调用Dataset的queryExecution.debug.codegen() 方法获得 Catalyst 产生的代码。
> 在Flink是否有类似的方法可以获得code gen的代码?
>
>
> 参考链接:
> https://medium.com/virtuslab/spark-sql-under-the-hood-part-i-26077f85ebf0
>
>
> Regards
> Vincent  Cai


Re: Flink sql join问题

2019-08-08 文章 Zhenghua Gao
可以试下最新flink 1.9 blink
planner的firstRow/lastRow优化[1]能否满足你的需求,目前的限制是只能基于procTime来去重。

* e.g.
* 1. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps first row.
* 2. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps last row.


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecDeduplicateRule.scala

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 6, 2019 at 2:28 PM huang  wrote:

> Hi all,
>
>
> 请问用Flink
> sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗
>
>
> thanks


Re: 【Flink SQL】无法启动env.yaml

2019-04-01 文章 Zhenghua Gao
format 和 schema 应该在同一层。
参考一下 flink-sql-client 测试里TableNumber1的配置文件: test-sql-client-defaults.yaml

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 4:09 PM 曾晓勇 <469663...@qq.com> wrote:

> @1543332...@qq.com
>  
> 谢谢,格式问题后面我检查了也已经调整正确了,直接从flink官网下载最新的版本在启动的时候报错,具体报错如下,目前想调试下能否将生产的个别脚本直接换成FLINKSQL
> 而不走java编程。如果走程序调整的量很大。
>  FLINK 版本:flink-1.7.2-bin-hadoop28-scala_2.11
>  启动命令:/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e
> /home/hadoop/flink_test/env.yaml
>
>
> [hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
> Validating current environment...
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
> at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
> at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
> ... 2 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
> the classpath.
>
>
> Reason:
> The matching factory
> 'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't
> support 'format.schema.#.type'.
>
>
> Supported properties of this factory are:
> connector.path
> connector.path
> format.comment-prefix
> format.field-delimiter
> format.fields.#.name
> format.fields.#.type
> format.ignore-first-line
> format.ignore-parse-errors
> format.line-delimiter
> format.quote-character
> schema.#.name
> schema.#.type
>
>
> The following properties are requested:
> connector.path=/home/hadoop/flink_test/input.csv
> connector.type=filesystem
> format.comment-prefix=#
> format.fields.0.name=MyField1
> format.fields.0.type=INT
> format.fields.1.name=MyField2
> format.fields.1.type=VARCHAR
> format.line-delimiter=\n
> format.schema.0.name=MyField1
> format.schema.0.type=INT
> format.schema.1.name=MyField2
> format.schema.1.type=VARCHAR
> format.type=csv
> update-mode=append
>
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
>
> at
> org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277)
> at
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136)
> at
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:119)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
> ... 4 more
> [hadoop@server2 bin]$
>
>
>
>
>
>
>
>
>
>
>
> -- 调整后的格式问题。
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
>

Re: 【Flink SQL】无法启动env.yaml

2019-04-01 文章 Zhenghua Gao
yaml格式不对,看起来是缩进导致的。
你可以找个在线yaml编辑器验证一下, 比如 [1]
更多yaml格式的说明,参考 [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM 曾晓勇 <469663...@qq.com> wrote:

> 各位好,
>
>今天在测试Flink SQL 无法启动,错误日志如下。请问下配置yaml文件的格式需要注意下什么,分割符号能否支持特殊的符号如
> hive建表语句中的分隔符'\036',详细报错日志如下。
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --配置文件env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>


Re: Flink tableApi 按列排序,只能按一列,不能按多列排序吗?

2019-03-18 文章 Zhenghua Gao
Try:

xx.orderBy('id.desc, 'value1.asc)


*Best Regards,*
*Zhenghua Gao*


On Sat, Mar 16, 2019 at 10:28 AM 刘 文 
wrote:

>
> 输出结果,只按id降序排序,没有按value1升序排序。
>
>
>
>
>
>
>
> package
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderBy
>
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
>
> object Run {
>
>
>   def main(args: Array[String]): Unit = {
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>
> env.setParallelism(1)
>
> val dataSet = env.fromElements( (1,"a",10),(2,"b",20)
> ,(20,"f",200),(3,"c",30) )
>
>
>
> //从dataset转化为 table
> val table = tableEnv.fromDataSet(dataSet)
>
> //注册table
> tableEnv.registerTable("user1",table)
>
>
> //查询table 所有数据
> tableEnv.scan("user1").as('id,'name,'value1)
>   //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)
>   .orderBy('id.desc)
>   .orderBy('value1.asc)
>
>   .first(1000)
>
>   //print 输出 (相当于sink)
>   .print()
>
>
> /**
>   * 输出结果
>   *
>   * 20,f,200
>   * 3,c,30
>   * 2,b,20
>   * 1,a,10
>   */
>
>
>
>   }
>
> }
>
>


Re: [Blink]sql client kafka sink 失败

2019-02-26 文章 Zhenghua Gao
gt;
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar
> >> > --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar
> >> > --jar /bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar
> >> --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar
> >> > --jar
> >> >
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar
> >> > --jar /bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > 在 2019-02-22 19:32:18,"Becket Qin"  写道:
> >> > >能不能看一下运行sql-client.sh的运行参数。具体做法是:
> >> > >
> >> > >运行sql-client.sh
> >> > >ps | grep sql-client
> >> > >
> >> > >查看一下其中是不是有这个 flink-connector-kafka-0.11 的 jar.
> >> > >
> >> > >Jiangjie (Becket) Qin
> >> > >
> >> > >On Fri, Feb 22, 2019 at 6:54 PM 张洪涛  wrote:
> >> > >
> >> > >>
> >> > >>
> >> > >> 是包含这个类的
> >> > >>
> >> > >>
> >> > >> jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$1.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$ChecksumFactory.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$Java9ChecksumFactory.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$PureJavaChecksumFactory.class
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/PureJavaCrc32C.class
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >> 在 2019-02-22 18:03:18,"Zhenghua Gao"  写道:
> >> > >> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
> >> > >> >
> >> > >> >cd /tmp/blink/opt/connectors/kafka011
> >> > >> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >> > >> >
> >> > >> >On Fri, Feb 22, 2019 at 2:56 PM 张洪涛  wrote:
> >> > >> >
> >> > >> >>
> >> > >> >>
> >> > >> >> 大家好!
> >> > >> >>
> >> > >> >>
> >> > >> >> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
> >> > >> >>
> >> > >> >>
> >> > >> >> 环境配置
> >> > >> >> blink standalone 模式
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >> 1. 配置environment 启动sql client
> >> > >> >>
> >> > >> >>
> >> > >> >> 2. 创建kafka sink table
> >> > >> >> CREATETABLEkafka_sink(
> >> > >> >>messageKeyVARBINARY,
> >> > >> >>messageValueVARBINARY,
> >> > >> >>PRIMARYKEY(messageKey))
> >> > >> >> with(
> >> > >> >>type='KAFKA011',
> >> > >> >>topic='sink-topic',
> >> > >> >>`bootstrap.servers`='172.19.0.108:9092',
> >> > >> >>retries='3'
> >> > >> >> );
> >> > >> >>
> >> > >> >>
> >> > >> >> 3. 创建查询语句并执行
> >> > >> >> INSERT INTO kafka_sink
> >> > >> >> SELECT CAST('123' AS VARBINARY) AS key,
> >

Re: Re: Re: [Blink]sql client kafka sink 失败

2019-02-24 文章 Zhenghua Gao
确认一下standalone cluster 和 sql client 用的是同一份 flink/blink bin
印象中两者不一致会有一些奇怪的问题。


On Mon, Feb 25, 2019 at 9:56 AM 张洪涛  wrote:

>
>
> sql-client.sh 的启动参数首先在classpath里面会包含kafka相关的jar  另外会有--jar
> 包含所有connector的jar
>
>
> 这些jars在sql-client提交job时候会上传到cluster的blob store 但是很奇怪为啥找不到
>
>
>  00:00:06 /usr/lib/jvm/java-1.8.0-openjdk/bin/java
> -Dlog.file=/bigdata/flink-1.5.1/log/flink-root-sql-client-gpu06.log
> -Dlog4j.configuration=file:/bigdata/flink-1.5.1/conf/log4j-cli.properties
> -Dlogback.configurationFile=file:/bigdata/flink-1.5.1/conf/logback.xml
> -classpath
> /bigdata/flink-1.5.1/lib/flink-python_2.11-1.5.1.jar:/bigdata/flink-1.5.1/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/bigdata/flink-1.5.1/lib/log4j-1.2.17.jar:/bigdata/flink-1.5.1/lib/slf4j-log4j12-1.7.7.jar:/bigdata/flink-1.5.1/lib/flink-dist_2.11-1.5.1.jar::/bigdata/hadoop-2.7.5/etc/hadoop::/bigdata/flink-1.5.1/opt/connectors/kafka011/flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka010/flink-connector-kafka-0.10_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-api-jdo-4.2.4.jar:/bigdata/flink-1.5.1/opt/sql-client/javax.jdo-3.2.0-m3.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-core-4.1.17.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-rdbms-4.1.19.jar:/bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
> org.apache.flink.table.client.SqlClient embedded -d
> conf/sql-client-defaults.yaml --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka011/flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka010/flink-connector-kafka-0.10_2.11-1.5.1-sql-jar.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar
> --jar /bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar --jar
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar
> --jar /bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
>
>
>
>
>
>
> 在 2019-02-22 19:32:18,"Becket Qin"  写道:
> >能不能看一下运行sql-client.sh的运行参数。具体做法是:
> >
> >运行sql-client.sh
> >ps | grep sql-client
> >
> >查看一下其中是不是有这个 flink-connector-kafka-0.11 的 jar.
> >
> >Jiangjie (Becket) Qin
> >
> >On Fri, Feb 22, 2019 at 6:54 PM 张洪涛  wrote:
> >
> >>
> >>
> >> 是包含这个类的
> >>
> >>
> >> jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$1.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$ChecksumFactory.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$Java9ChecksumFactory.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$PureJavaChecksumFactory.class
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/PureJavaCrc32C.class
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-02-22 18:03:18,"Zhenghua Gao"  写道:
> >> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
> >> >
> >> >cd /tmp/blink/opt/connectors/kafka011
> >> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >> >
> >> >On Fri, Feb 22, 2019 at 2:56 PM 张洪涛  wrote:
> >> >
> >> >>
> >> >>
> >> >> 大家好!
> >> >>
> >> >>
> >> >> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
> >> >>
> >> >>
> >> >> 环境配置
> >> >> blink standalone 模式
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 1. 配置environment 启动sql client
> >> >>
> >> >>
> >> >> 2. 创建kafka sink table
> >> >> CREATETABLEkafka_sink(
> >>