Re: JDBC table api questions

2020-04-23 Thread Zhenghua Gao
FLINK-16471 introduce a JDBCCatalog, which implements Catalog interface.
Currently we only support PostgresCatalog and listTables().
If you want to get the list of views, you can implement listViews()
(currently return an empty list).

*Best Regards,*
*Zhenghua Gao*


On Thu, Apr 23, 2020 at 8:48 PM Flavio Pompermaier 
wrote:

> Hi all,
> is there a way to get the list of existing views in a JDBC database?
> Is this something that could be supported somehow?
>
> Moreover, it would be interesting for us to also know the original field
> type of a table..is there a way to get it (without implementing a dedicated
> API)? Do you think it makes sense to expose it in the Table API?
>
> Best,
> Flavio
>


Re: flink-1.10-sql 维表问题

2020-04-15 Thread Zhenghua Gao
JDBC connector 支持作为维表,DDL无需特殊字段指定。部分可选的参数可以控制temporary join行为[1]。
用作维表join时,需要使用特殊的join语法 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins

*Best Regards,*
*Zhenghua Gao*


On Wed, Apr 15, 2020 at 7:48 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:

> hi 大家
> 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀?
>
>
>
> guaishushu1...@163.com
>


Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread Zhenghua Gao
有两个参数可以控制cache大小和cache失效时间 [1],你可以在性能和准确性上做权衡

  -- lookup options, optional, used in temporary join
  'connector.lookup.cache.max-rows' = '5000', -- optional, max number
of rows of lookup cache, over this value, the oldest rows will
  -- be eliminated.
"cache.max-rows" and "cache.ttl" options must all be specified if any
  -- of them is specified.
Cache is not enabled as default.
  'connector.lookup.cache.ttl' = '10s', -- optional, the max time to
live for each rows in lookup cache, over this time, the oldest rows
-- will be expired.
"cache.max-rows" and "cache.ttl" options must all be specified if any
of
-- them is specified. Cache is
not enabled as default.
  'connector.lookup.max-retries' = '3', -- optional, max retry times
if lookup database failed

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector

*Best Regards,*
*Zhenghua Gao*


On Wed, Apr 15, 2020 at 11:28 AM Dino Zhang 
wrote:

> 可以考虑调小cache.ttl
>
> On Wed, Apr 15, 2020 at 11:22 AM tingli ke  wrote:
>
> > 是否有其他的方式来对mysql维表数据进行实时join
> >
> >
> > 13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
> >
> > > 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> > > org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> > > The cacheMaxSize is -1 means not use cache
> > >
> > >
> > >
> > > 13122260...@163.com
> > >
> > > 发件人: tingli ke
> > > 发送时间: 2020-04-15 10:55
> > > 收件人: user-zh
> > > 主题: JDBCLookupFunction被缓存导致数据的不及时性
> > > Hi,
> > >
> > >
> >
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> > > 是否有其他的方式来对mysql维表数据进行实时join
> > >
> >
>
>
> --
> Regards,
> DinoZhang
>


Re: Registering UDAF in blink batch app

2020-04-14 Thread Zhenghua Gao
`StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl`
object,
which has several `registerFunction` interface for
ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.

`TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which
is a unify entry point for Table/SQL programs.
And it only has a deprecated `registerFunction` interface for
ScalarFunction.  You should use `createTemporarySystemFunction` instead.

A workaround for batch mode of blink planner is: You can use the public
constructor of `StreamTableEnvironmentImpl` to create
the TableEnvironment and use `registerFunction`s. Pls make sure you pass in
the correct `isStreamingMode = false`

*Best Regards,*
*Zhenghua Gao*


On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan 
wrote:

> Hi All,
>
>
>
> Could you please tell how to register custom Aggregation function in blink
> batch app?
>
> In case of streaming mode:
>
> We create
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.*newInstance*().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.*create*(env, 
> bsSettings);
>
>
>
> Which has:
>
>  void registerFunction(String name, AggregateFunction
> aggregateFunction);
>
>
>
> But in case of batchMode, we need to create TableEnvironment:
>
>
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.*newInstance*().useBlinkPlanner().inBatchMode().build();
> tEnv = TableEnvironment.*create*(bsSettings);
>
>
>
> Which does not have this function to register AggregationFunction, only
> Scalar one.
>
>
>
> Details: Flink 1.10, Java API
>
>
>
>
>


Re: Flink 1.10 JSON 解析

2020-03-26 Thread Zhenghua Gao
Hi 张宇

看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。
开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 20, 2020 at 5:28 PM 宇张  wrote:

> hi,
> 了解了,我重新整理一下:
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test")
> .startFromEarliest()
> .property("zookeeper.connect",
> "localhost:2181")
> .property("bootstrap.servers",
> "localhost:9092")
> )
> .withFormat(
> new Json()
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ARRAY(
> DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
> DataTypes.FIELD("vendor_id",
> DataTypes.DOUBLE()),
> DataTypes.FIELD("status",
> DataTypes.BIGINT()),
> DataTypes.FIELD("create_time",
> DataTypes.BIGINT()),
> DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no",
> DataTypes.STRING()),
> DataTypes.FIELD("parent_id",
> DataTypes.BIGINT()
> .field("database", DataTypes.STRING())
> .field("old",
> DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
> DataTypes.DECIMAL(38,18)
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.BIGINT())
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.BIGINT())
> )
> .createTemporaryTable("Test");
> 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
> 异常:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY> of table field 'old'
> does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at
> org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
>
>

Re: How to make two insert-into sqls orderly

2020-03-25 Thread Zhenghua Gao
Hi izual,
There is a  workaround that you could implement your own sink which write
record sink1 and sink2 in turn.

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 10:41 PM Benchao Li  wrote:

> Hi izual,
>
> AFAIK, there is no way to to this in pure SQL.
>
>
>
>
> izual  于2020年3月25日周三 下午10:33写道:
>
>> We have two output sinks, and the order assurance is achieved by code
>> like this:
>>
>> record.apply(insert_into_sink1).thenApply(
>>
>> recorder_2 = foo(record)
>>
>> recorder_2.insert_into_sink2
>>
>> )
>>
>> So when sink2 receives the record_2, record must be existed in sink1,
>> then we can seek corresponding value of record from sink1, and do next.
>>
>>
>> Is this could be implemented on SQL? It seems like all is done by
>> FLINK-SQL
>>
>>
>> INSERT INTO sink1 SELECT ...
>> INSERT INTO sink2 SELECT ...
>>
>> Is there anyway to make sure the order of these two insert-into sqls.
>>
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 Thread Zhenghua Gao
Hi Jark,

这里的确是有问题的。
目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.

*Best Regards,*
*Zhenghua Gao*


On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:

> Thanks for reporting this Weike.
>
> 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> 其他的一些数据库也都差不多:mysql [2], oracle[3]
>
> Best,
> Jark
>
> [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> [2]:
>
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> [3]:
>
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
>
>
>
> On Tue, 24 Mar 2020 at 17:00, DONG, Weike  wrote:
>
> > Hi Zhenghua,
> >
> > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> >
> > Best,
> > Weike
> >
> > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
> >
> > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > 其语义可参考 java.time.LocalDateTime。
> > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > >
> > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > time_zone_to_string)
> > >
> > > *Best Regards,*
> > > *Zhenghua Gao*
> > >
> > >
> > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > >
> > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > UTC+0
> > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > >
> > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> > > Flink
> > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > >
> > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > >
> > > > 仅仅是个人一点想法,感谢 :)
> > > >
> > >
> >
>


Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-25 Thread Zhenghua Gao
Hi Chief,

目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。
InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。
如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。
你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 3:31 PM Chief  wrote:

> hiJun Zhang
> 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件?
>
>
>
>
>
> --原始邮件--
> 发件人:"Jun Zhang" 发送时间:2020年3月25日(星期三) 上午9:08
> 收件人:"user-zh"
> 主题:Re: 关于flink sql 1.10 source并行度自动推断的疑问
>
>
>
> hi,Chief:
>
>
> 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
> 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。
>
> Kurt Young 
>  你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
> 
>  Best,
>  Kurt
> 
> 
>  On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:
> 
>   hi all:
>   之前用flink sql查询hive的数据,hive的数据文件是150个,sql
>   client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
>   ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
> 


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-25 Thread Zhenghua Gao
请确认一下 kafka connector 的jar包是否在 flink/lib 下。
目前的报错看起来是找不到kafka connector的jar包。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:

> 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>
>
> 
>
> 参考下这个文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> 下面的语法应该是不支持的:
>   'format.type' = 'csv',\n" +
> "'format.field-delimiter' = '|'\n"
>
> 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
> tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
> + "order_no VARCHAR,\n"
> + "status INT\n"
> + ") WITH (\n"
> + "'connector.type' = 'kafka',\n"
> + "'connector.version' = 'universal',\n"
> + "'connector.topic' = 'wanglei_test',\n"
> + "'connector.startup-mode' = 'latest-offset',\n"
> + "'connector.properties.0.key' = 'zookeeper.connect',\n"
> + "'connector.properties.0.value' = 'xxx:2181',\n"
> + "'connector.properties.1.key' = 'bootstrap.servers',\n"
> + "'connector.properties.1.value' = 'xxx:9092',\n"
> + "'update-mode' = 'append',\n"
> + "'format.type' = 'json',\n"
> + "'format.derive-schema' = 'true'\n"
> + ")");
>
> 王磊
>
>
> wangl...@geekplus.com.cn
> 发件人: 赵峰
> 发送时间: 2020-03-24 21:28
> 收件人: user-zh
> 主题: Flink JDBC Driver是否支持创建流数据表
> hi
>
> Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
> Connection connection =
> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
> Statement statement = connection.createStatement();
> statement.executeUpdate(
> "CREATE TABLE table_kafka (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> "proctime as PROCTIME(),\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka', \n" +
> "'connector.version' = 'universal', \n" +
> "'connector.topic' = 'flink_im02', \n" +
> "'connector.properties.group.id' = 'flink_im02_new',\n" +
> "'connector.startup-mode' = 'earliest-offset', \n" +
> "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
> "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
> "'format.type' = 'csv',\n" +
> "'format.field-delimiter' = '|'\n" +
> ")");
> ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
> while (rs1.next()) {
> System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
> }
> statement.close();
> connection.close();
> 报错:
> Reason: Required context properties mismatch.
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
> 赵峰
>
> 
> Quoted from:
> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>
>
>
>
> 赵峰


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 Thread Zhenghua Gao
CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
其语义可参考 java.time.LocalDateTime。
其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。

你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
time_zone_to_string)

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
wrote:

> Hi,
>
> 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> 做时间格式化为字符串时,默认以 UTC+0 为准。
>
> 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 UTC+0
> 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
>
> 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么 Flink
> 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
>
> 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
>
> 仅仅是个人一点想法,感谢 :)
>


Re: FLINK SQL中时间戳怎么处理处理

2020-03-23 Thread Zhenghua Gao
你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types
[1]。
目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446
[2]
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 4:27 PM 吴志勇 <1154365...@qq.com> wrote:

> 如题:
> 我向kafka中输出了json格式的数据
> {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
>
> 
> 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
> -name:bookpojo
> type:source-table
> connector:
> property-version:1
> type:kafka
> version:"universal"
> topic:pojosource
> startup-mode:earliest-offset
> properties:
>
> zookeeper.connect:localhost:2181
>
> bootstrap.servers:localhost:9092
> group.id:testGroup
> format:
> property-version:1
> type:json
>
> schema:"ROW schema:
> -name:id
> data-type:INT
> -name:type
> data-type:STRING
> -name:price
> data-type:INT
> -name:timestamp
>
> data-type:TIMESTAMP(3)
>
>
>
>
> 上述配置,好像有问题。
>
>
> 我在官网中找到这样一句说明:
> 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java
> SQL时间格式进行格式化,并以毫秒为单位。例如:2018-01-01日期,20:43:59时间和2018-01-01
> 20:43:59.999时间戳。
> 时间一定得是字符串类型且带毫秒吗?
>
>
> 谢谢。


Re: Re: dimention table join not work under sql-client fink-1.10.0

2020-03-13 Thread Zhenghua Gao
You are right.
The product on alibaba cloud is based on an internal branch.
There are much discrepancy between them.

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 13, 2020 at 1:09 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks, works now.
>
> Seems the open source version is  different from alibaba cloud:
> https://www.alibabacloud.com/help/doc-detail/62506.htm
>
>
> --
> wangl...@geekplus.com.cn
>
> *From:* Zhenghua Gao 
> *Date:* 2020-03-13 12:12
> *To:* wangl...@geekplus.com.cn
> *CC:* user 
> *Subject:* Re: dimention table join not work under sql-client fink-1.10.0
> We don't support 'PROCTIME()' in a temporal table join. Please use a left
> table's proctime field. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Mar 13, 2020 at 11:57 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> Kafka source table:
>>
>> CREATE TABLE out_order (
>>   out_order_code VARCHAR,
>>   intput_date BIGINT,
>>   owner_code VARCHAR
>>   ) WITH (
>>   'connector.type' = 'kafka',
>>
>> MySQL dimention table:
>>
>> CREATE TABLE dim_owner (
>>   owner_code VARCHAR,
>>   owner_name VARCHAR
>>   ) WITH (
>>   'connector.type' = 'jdbc',
>>
>> When i submit the sql:  SELECT o.out_order_code, o.input_date, o.owner_code, 
>> d.owner_name FROM out_order as o
>>
>> JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME()  as 
>> d ON o.owner_code = d.owner_code;
>>
>> There's error:
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.TableException: Temporal table join currently
>> only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't
>> support 'PROCTIME()'
>>
>> Thanks,
>> Lei
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>


Re: dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread Zhenghua Gao
We don't support 'PROCTIME()' in a temporal table join. Please use a left
table's proctime field. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 13, 2020 at 11:57 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> Kafka source table:
>
> CREATE TABLE out_order (
>   out_order_code VARCHAR,
>   intput_date BIGINT,
>   owner_code VARCHAR
>   ) WITH (
>   'connector.type' = 'kafka',
>
> MySQL dimention table:
>
> CREATE TABLE dim_owner (
>   owner_code VARCHAR,
>   owner_name VARCHAR
>   ) WITH (
>   'connector.type' = 'jdbc',
>
> When i submit the sql:  SELECT o.out_order_code, o.input_date, o.owner_code, 
> d.owner_name FROM out_order as o
>
> JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME()  as d 
> ON o.owner_code = d.owner_code;
>
> There's error:
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Temporal table join currently
> only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't
> support 'PROCTIME()'
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>


Re: checkpoint 代码中如何设置TTL

2020-03-12 Thread Zhenghua Gao
DataStream 用户可以直接在StateDescriptor中设置TTL [1]
详见文档中的代码。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl

*Best Regards,*
*Zhenghua Gao*


On Thu, Mar 12, 2020 at 9:25 PM 潘明文  wrote:

> 您好,
>checkpoint  基于RocksDBStateBackend的如何设置TTL?


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Zhenghua Gao
Congrats Jingsong!


*Best Regards,*
*Zhenghua Gao*


On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:

> Congrats Jingsong! Well deserved.
>
> Best,
> godfrey
>
> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>
>> Congratulations!Jingsong. You deserve it
>>
>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>>
>>> Congrats Jingsong!
>>>
>>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>>
>>> > Congrats Jingsong!
>>> >
>>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>>> > >
>>> > > Congratulations Jingsong! Well deserved.
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>>> > >
>>> > >> Congratulations! Jingsong
>>> > >>
>>> > >>
>>> > >> Best,
>>> > >> Dan Zou
>>> > >>
>>> >
>>> >
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-05 Thread Zhenghua Gao
+1 for making blink planner as the default planner for SQL Client since we
have made a huge improvement in 1.10.

*Best Regards,*
*Zhenghua Gao*


On Sun, Jan 5, 2020 at 2:42 PM Benchao Li  wrote:

> +1
>
> We have used blink planner since 1.9.0 release in our production
> environment, and it behaves really impressive.
>
> Hequn Cheng  于2020年1月5日周日 下午1:58写道:
>
>> +1 to make blink planner as the default planner for SQL Client, hence we
>> can give the blink planner a bit more exposure.
>>
>> Best, Hequn
>>
>> On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  wrote:
>>
>>> Hi Benoît,
>>>
>>> Thanks for the reminder. I will look into the issue and hopefully we can
>>> target it into 1.9.2 and 1.10.
>>>
>>> Cheers,
>>> Jark
>>>
>>> On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
>>>> >  If anyone finds that blink planner has any significant defects and
>>>> has a larger regression than the old planner, please let us know.
>>>>
>>>> Overall, the Blink-exclusive features are must (TopN, deduplicate,
>>>> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
>>>> production are not covered:
>>>> An edge case of Temporal Table Functions does not allow computed Tables
>>>> (as opposed to TableSources) to be used on the query side in Blink (
>>>> https://issues.apache.org/jira/browse/FLINK-14200)
>>>>
>>>> Cheers
>>>> Ben
>>>>
>>>>
>>>> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>>>>
>>>>> +1, I have already made blink as the default planner of flink
>>>>> interpreter in Zeppelin
>>>>>
>>>>>
>>>>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> +1 for default blink planner in SQL-CLI.
>>>>>> I believe this new planner can be put into practice in production.
>>>>>> We've worked hard for nearly a year, but the old planner didn't move
>>>>>> on.
>>>>>>
>>>>>> And I'd like to cc to user@flink.apache.org.
>>>>>> If anyone finds that blink planner has any significant defects and
>>>>>> has a larger regression than the old planner, please let us know. We will
>>>>>> be very grateful.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>>>>>
>>>>>>> +1 for this.
>>>>>>> We bring many SQL/API features and enhance stability in 1.10
>>>>>>> release, and almost all of them happens in Blink planner.
>>>>>>> SQL CLI is the most convenient entrypoint for me, I believe many
>>>>>>> users will have a better experience If we set Blink planner as default
>>>>>>> planner.
>>>>>>>
>>>>>>> Best,
>>>>>>> Leonard
>>>>>>>
>>>>>>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>>>>>>> >
>>>>>>> > Since what blink planner can do is a superset of flink planner,
>>>>>>> big +1 for changing the default planner to Blink planner from my side.
>>>>>>> >
>>>>>>> > Best,
>>>>>>> > Terry Wang
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >> 2020年1月3日 15:00,Jark Wu  写道:
>>>>>>> >>
>>>>>>> >> Hi everyone,
>>>>>>> >>
>>>>>>> >> In 1.10 release, Flink SQL supports many awesome features and
>>>>>>> improvements,
>>>>>>> >> including:
>>>>>>> >> - support watermark statement and computed column in DDL
>>>>>>> >> - fully support all data types in Hive
>>>>>>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>>>>>>> >> - support INSERT OVERWRITE and INSERT PARTITION
>>>>>>> >>
>>>>>>> >> However, all the features and improvements are only avaiable in
>>>>>>> Blink
>>&g

Re: FlinkSQL中关于TIMESTAMPDIFF函数官网E.g的疑问

2019-12-16 Thread Zhenghua Gao
1) ML里直接发截图无法展示,可以用第三方图床,然后链接过来。
2) 请确认 time1/time2 类型是否是 TIMESTAMP
3) 文档中的 TIMESTAMP '2003-01-02 10:00:00' 代表标准SQL的时间常量(timestamp literal),你的
TIMESTAMP time1 无法被视作时间常量。

*Best Regards,*
*Zhenghua Gao*


On Mon, Dec 16, 2019 at 3:51 PM 1530130567 <1530130...@qq.com> wrote:

> 各位大佬好:
>由于业务处理逻辑需要计算两个日期的时间差,按照惯例打开官网,查看buildin Functions,ctrC, ctrV,跑起来没问题!
>
>   然后我就改了一下:
>   TIMESTAMPDIFF(DAY, TIMESTAMP time1,TIMESTAMP time2)
>   SQL validate报错!
>   然后我又改了一下:
>   TIMESTAMPDIFF(DAY, cast(time1 as timestamp),cast(time2 as timestamp))
>   通过了!
>
>   我有点疑惑,TIMESTAMP是不是只适用于固定的字符串,而不能用列名这种变量?
>
>


Re: [DISCUSS] Drop Kafka 0.8/0.9

2019-12-05 Thread Zhenghua Gao
+1 for dropping.

*Best Regards,*
*Zhenghua Gao*


On Thu, Dec 5, 2019 at 11:08 AM Dian Fu  wrote:

> +1 for dropping them.
>
> Just FYI: there was a similar discussion few months ago [1].
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-older-versions-of-Kafka-Connectors-0-9-0-10-for-Flink-1-10-td29916.html#a29997
>
> 在 2019年12月5日,上午10:29,vino yang  写道:
>
> +1
>
> jincheng sun  于2019年12月5日周四 上午10:26写道:
>
>> +1  for drop it, and Thanks for bring up this discussion Chesnay!
>>
>> Best,
>> Jincheng
>>
>> Jark Wu  于2019年12月5日周四 上午10:19写道:
>>
>>> +1 for dropping, also cc'ed user mailing list.
>>>
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Dec 2019 at 03:39, Konstantin Knauf 
>>> wrote:
>>>
>>> > Hi Chesnay,
>>> >
>>> > +1 for dropping. I have not heard from any user using 0.8 or 0.9 for a
>>> long
>>> > while.
>>> >
>>> > Cheers,
>>> >
>>> > Konstantin
>>> >
>>> > On Wed, Dec 4, 2019 at 1:57 PM Chesnay Schepler 
>>> > wrote:
>>> >
>>> > > Hello,
>>> > >
>>> > > What's everyone's take on dropping the Kafka 0.8/0.9 connectors from
>>> the
>>> > > Flink codebase?
>>> > >
>>> > > We haven't touched either of them for the 1.10 release, and it seems
>>> > > quite unlikely that we will do so in the future.
>>> > >
>>> > > We could finally close a number of test stability tickets that have
>>> been
>>> > > lingering for quite a while.
>>> > >
>>> > >
>>> > > Regards,
>>> > >
>>> > > Chesnay
>>> > >
>>> > >
>>> >
>>> > --
>>> >
>>> > Konstantin Knauf | Solutions Architect
>>> >
>>> > +49 160 91394525
>>> >
>>> >
>>> > Follow us @VervericaData Ververica <https://www.ververica.com/>
>>> >
>>> >
>>> > --
>>> >
>>> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> > Conference
>>> >
>>> > Stream Processing | Event Driven | Real Time
>>> >
>>> > --
>>> >
>>> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> >
>>> > --
>>> > Ververica GmbH
>>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> > (Tony) Cheng
>>> >
>>>
>>
>


Re: Table/SQL API to read and parse JSON, Java.

2019-12-03 Thread Zhenghua Gao
the kafka connector jar is missing in your class path

*Best Regards,*
*Zhenghua Gao*


On Mon, Dec 2, 2019 at 2:14 PM srikanth flink  wrote:

> Hi there,
>
> I'm following the link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html>
> to read JSON data from Kafka and convert to table, programmatically. I'd
> try and succeed declarative using SQL client.
>
> My Json data is nested like: {a:1,b,2,c:{x:1,y:2}}.
> Code:
>
>> String schema = "{type: 'object', properties: {'message': {type:
>> 'string'},'@timestamp': {type: 'string'}}}";
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(6, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().getCheckpointTimeout();
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>
>> tableEnv.connect(new
>> Kafka().version("universal").topic("recon-data").startFromEarliest()
>> .property("zookeeper.connect", "localhost:2181")
>> .property("bootstrap.servers", "localhost:9092"))
>> .withFormat(new
>> Json().failOnMissingField(false).jsonSchema(schema).deriveSchema())
>> .withSchema(new Schema().field("message",
>> Types.STRING()).field("@timestamp", Types.LOCAL_DATE_TIME()))
>> .inAppendMode().registerTableSource("reconTableS");
>>
>> Table t = tableEnv.sqlQuery("select * from reconTableS");
>> DataStream out = tableEnv.toAppendStream(t, Row.class);
>> out.print();
>>
>> try {
>> env.execute("Flink Example Json");
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>> }
>>
>
> pom.xml:
>
>> 
>> UTF-8
>> 1.9.0
>> 1.8
>> 2.11
>> ${java.version}
>> ${java.version}
>> 
>>
> 
>> 
>> org.apache.flink
>> flink-streaming-scala_2.11
>> ${flink.version}
>> 
>> 
>> 
>> org.apache.flink
>> flink-table-common
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-planner_2.11
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.11
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> 
>> org.apache.flink
>> flink-streaming-java_${scala.binary.version}
>> ${flink.version}
>> 
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-json
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-cep_2.11
>> ${flink.version}
>> 
>> 
>> mysql
>> mysql-connector-java
>> 5.1.39
>> 
>> 
>>
>
> The code threw the following error:
>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFa

Re: [DISCUSS] Disable conversion between TIMESTAMP and Long in parameters and results of UDXs

2019-12-01 Thread Zhenghua Gao
Since it is unanimously agreed that we should disable conversion between
Timestmap and
long in parameters and results of UDXs, in PR [1] we will disable it in
blink planner. And we
will add a release note in FLINK-14599 [2] of this incompatible
modification.

<https://github.com/apache/flink/pull/10268>

[1] https://github.com/apache/flink/pull/10268
[2] https://issues.apache.org/jira/browse/FLINK-14599

*Best Regards,*
*Zhenghua Gao*


On Sun, Nov 24, 2019 at 8:44 PM Jark Wu  wrote:

> Hi,
>
> +1 to disable it in 1.10. I think it's time to disable and correct the
> behavior now.
>
> Also cc'ed user mailing list to have broader audiences.
>
> Best,
> Jark
>
> On Sat, 23 Nov 2019 at 16:59, Timo Walther  wrote:
>
>> Hi,
>>
>> +1 for disabling it in the Blink planner. Once FLIP-65 is implemented
>> and a UDX is registered with the new
>> TableEnvironment.createTemporaryFunction() we will also have the
>> possibility to be fully compliant with the new type system because we
>> can advertise a new UDF stack with new behavior.
>>
>> Also the mentioned documentation page will be updated as part of FLIP-65.
>>
>> Regards,
>> Timo
>>
>>
>> On 22.11.19 11:08, Jingsong Li wrote:
>> > +1 to disable, It is already introduced by new type system in
>> TimestampType.
>> > I think it is time to update document too.
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Fri, Nov 22, 2019 at 6:05 PM Kurt Young  wrote:
>> >
>> >> +1 to disable, we also need to highlight this in 1.10 release notes.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Fri, Nov 22, 2019 at 5:56 PM Zhenghua Gao  wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> I wanted to bring up the discuss of Disable conversion between
>> TIMESTAMP
>> >>> and Long in parameters and results of UDXs.
>> >>>
>> >>> Since FLINK-12253[1] introduce the new TimestampType and conversion
>> from
>> >>> and
>> >>> to long is not supported, the UDXs with Long parameters should not
>> >> receive
>> >>> TIMESTAMP fields and vice versa.
>> >>>
>> >>> The current situation is we use long as internal representation of
>> >>> TIMESTAMP, the legacy planner and blink planner DO NOT DISABLE this
>> >>> conversion. Now FLINK-14599[2] would introduce a new internal
>> >>> representation of TIMESTAMP and it's time to make a decision to
>> DISABLE
>> >> it.
>> >>>
>> >>> In addition, our document[3] recommends UDXs users use long as
>> >>> representation of SQL_TIMESTAMP, which is obsolete too.
>> >>>
>> >>> Please let me know what you think!
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-12253
>> >>> [2] https://issues.apache.org/jira/browse/FLINK-14599
>> >>> [3]
>> >>>
>> >>>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html#best-practices-for-implementing-udfs
>> >>>
>> >>> *Best Regards,*
>> >>> *Zhenghua Gao*
>> >>>
>> >>
>> >
>> >
>>
>>


Re: Does apache flink support stream input from Postgresql ?

2019-11-20 Thread Zhenghua Gao
The jdbc connector can read data from PostgreSQL for Table/SQL users.
For pyflink, cc @Hequn

*Best Regards,*
*Zhenghua Gao*


On Wed, Nov 20, 2019 at 7:56 PM Yu Watanabe  wrote:

> Hello .
>
> I would like to ask question about possibility of stream read table rows
> from PostgresQL using pyflink.
>
> In use cases in online document, there is a example which is reading data
> from transactional database.
>
> https://flink.apache.org/img/usecases-datapipelines.png
> <https://flink.apache.org/usecases.html>
>
> Is it possible to stream read from PostgresQL  using pyflink ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>


Re: Documentation issue maybe

2019-11-06 Thread Zhenghua Gao
Your are right that it's not thread-safety.
I think we can use Collections.synchronizedList() to get a thread-safety
list[1].
And remove the synchronized keyword from the invoke interface.

I have created a ticket to track this[2], please feel free to fix it by
make a pull request.

[1]
https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)
[2] https://issues.apache.org/jira/browse/FLINK-14650

*Best Regards,*
*Zhenghua Gao*


On Thu, Nov 7, 2019 at 12:12 AM Romain Gilles 
wrote:

> Hi all,
> I think the code example in following section has a thread safety issue:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>
> The class CollectSink is not thread safe as only the write to the values
> collection are synchronized but not the read:
> CollectSink.values.containsAll(...). Maybe if you use
> a CopyOnWriteArrayList instead of an ArrayList and remove the synchronized
> key work it should be ok.
>
> Romain
>


Re: low performance in running queries

2019-11-03 Thread Zhenghua Gao
Hi,

I ran the streaming WordCount with a 2GB text file(copied
/usr/share/dict/words 400 times) last weekend and didn't reproduce your
result(16 minutes in my case).
But i find some clues may help you:

The streaming WordCount job would output all intermedia result in your
output file(if specified) or taskmanager.out.
It's large (about 4GB in my case) and causes the disk writes high.


*Best Regards,*
*Zhenghua Gao*


On Fri, Nov 1, 2019 at 4:40 PM Habib Mostafaei 
wrote:

> I used streaming WordCount provided by Flink and the file contains text
> like "This is some text...". I just copied several times.
>
> Best,
>
> Habib
> On 11/1/2019 6:03 AM, Zhenghua Gao wrote:
>
> 2019-10-30 15:59:52,122 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Split Reader: Custom File Source -> Flat Map (1/1) 
> (6a17c410c3e36f524bb774d2dffed4a4) switched from DEPLOYING to RUNNING.
>
> 2019-10-30 17:45:10,943 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Split Reader: Custom File Source -> Flat Map (1/1) 
> (6a17c410c3e36f524bb774d2dffed4a4) switched from RUNNING to FINISHED.
>
> It's surprise that the source task uses 95 mins to read a 2G file.
>
> Could you give me your code snippets and some sample lines of the 2G file?
>
> I will try to reproduce your scenario and dig the root causes.
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Thu, Oct 31, 2019 at 9:05 PM Habib Mostafaei 
> wrote:
>
>> I enclosed all logs from the run and for this run I used parallelism one.
>> However, for other runs I checked and found that all parallel workers were
>> working properly. Is there a simple way to get profiling information in
>> Flink?
>>
>> Best,
>>
>> Habib
>> On 10/31/2019 2:54 AM, Zhenghua Gao wrote:
>>
>> I think more runtime information would help figure out where the problem
>>  is.
>> 1) how many parallelisms actually working
>> 2) the metrics for each operator
>> 3) the jvm profiling information, etc
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
>> wrote:
>>
>>> Thanks Gao for the reply. I used the parallelism parameter with
>>> different values like 6 and 8 but still the execution time is not
>>> comparable with a single threaded python script. What would be the
>>> reasonable value for the parallelism?
>>>
>>> Best,
>>>
>>> Habib
>>> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>>>
>>> The reason might be the parallelism of your task is only 1, that's too
>>> low.
>>> See [1] to specify proper parallelism  for your job, and the execution
>>> time should be reduced significantly.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am running Flink on a standalone cluster and getting very long
>>>> execution time for the streaming queries like WordCount for a fixed
>>>> text
>>>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
>>>> have a text file with size of 2GB. When I run the Flink on a standalone
>>>> cluster, i.e., one JobManager and one taskManager with 25GB of
>>>> heapsize,
>>>> it took around two hours to finish counting this file while a simple
>>>> python script can do it in around 7 minutes. Just wondering what is
>>>> wrong with my setup. I ran the experiments on a cluster with six
>>>> taskManagers, but I still get very long execution time like 25 minutes
>>>> or so. I tried to increase the JVM heap size to have lower execution
>>>> time but it did not help. I attached the log file and the Flink
>>>> configuration file to this email.
>>>>
>>>> Best,
>>>>
>>>> Habib
>>>>
>>>>
>>


Re: low performance in running queries

2019-10-31 Thread Zhenghua Gao
2019-10-30 15:59:52,122 INFO
org.apache.flink.runtime.taskmanager.Task - Split
Reader: Custom File Source -> Flat Map (1/1)
(6a17c410c3e36f524bb774d2dffed4a4) switched from DEPLOYING to RUNNING.

2019-10-30 17:45:10,943 INFO
org.apache.flink.runtime.taskmanager.Task - Split
Reader: Custom File Source -> Flat Map (1/1)
(6a17c410c3e36f524bb774d2dffed4a4) switched from RUNNING to FINISHED.


It's surprise that the source task uses 95 mins to read a 2G file.

Could you give me your code snippets and some sample lines of the 2G file?

I will try to reproduce your scenario and dig the root causes.


*Best Regards,*
*Zhenghua Gao*


On Thu, Oct 31, 2019 at 9:05 PM Habib Mostafaei 
wrote:

> I enclosed all logs from the run and for this run I used parallelism one.
> However, for other runs I checked and found that all parallel workers were
> working properly. Is there a simple way to get profiling information in
> Flink?
>
> Best,
>
> Habib
> On 10/31/2019 2:54 AM, Zhenghua Gao wrote:
>
> I think more runtime information would help figure out where the problem
>  is.
> 1) how many parallelisms actually working
> 2) the metrics for each operator
> 3) the jvm profiling information, etc
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
> wrote:
>
>> Thanks Gao for the reply. I used the parallelism parameter with different
>> values like 6 and 8 but still the execution time is not comparable with a
>> single threaded python script. What would be the reasonable value for the
>> parallelism?
>>
>> Best,
>>
>> Habib
>> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>>
>> The reason might be the parallelism of your task is only 1, that's too
>> low.
>> See [1] to specify proper parallelism  for your job, and the execution
>> time should be reduced significantly.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am running Flink on a standalone cluster and getting very long
>>> execution time for the streaming queries like WordCount for a fixed text
>>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
>>> have a text file with size of 2GB. When I run the Flink on a standalone
>>> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize,
>>> it took around two hours to finish counting this file while a simple
>>> python script can do it in around 7 minutes. Just wondering what is
>>> wrong with my setup. I ran the experiments on a cluster with six
>>> taskManagers, but I still get very long execution time like 25 minutes
>>> or so. I tried to increase the JVM heap size to have lower execution
>>> time but it did not help. I attached the log file and the Flink
>>> configuration file to this email.
>>>
>>> Best,
>>>
>>> Habib
>>>
>>>
>


Re: low performance in running queries

2019-10-30 Thread Zhenghua Gao
I think more runtime information would help figure out where the problem is.
1) how many parallelisms actually working
2) the metrics for each operator
3) the jvm profiling information, etc

*Best Regards,*
*Zhenghua Gao*


On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
wrote:

> Thanks Gao for the reply. I used the parallelism parameter with different
> values like 6 and 8 but still the execution time is not comparable with a
> single threaded python script. What would be the reasonable value for the
> parallelism?
>
> Best,
>
> Habib
> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>
> The reason might be the parallelism of your task is only 1, that's too
> low.
> See [1] to specify proper parallelism  for your job, and the execution
> time should be reduced significantly.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
> wrote:
>
>> Hi all,
>>
>> I am running Flink on a standalone cluster and getting very long
>> execution time for the streaming queries like WordCount for a fixed text
>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
>> have a text file with size of 2GB. When I run the Flink on a standalone
>> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize,
>> it took around two hours to finish counting this file while a simple
>> python script can do it in around 7 minutes. Just wondering what is
>> wrong with my setup. I ran the experiments on a cluster with six
>> taskManagers, but I still get very long execution time like 25 minutes
>> or so. I tried to increase the JVM heap size to have lower execution
>> time but it did not help. I attached the log file and the Flink
>> configuration file to this email.
>>
>> Best,
>>
>> Habib
>>
>> --
> Habib Mostafaei, Ph.D.
> Postdoctoral researcher
> TU Berlin,
> FG INET, MAR 4.003
> Marchstraße 23, 10587 Berlin
>
>


Re: low performance in running queries

2019-10-30 Thread Zhenghua Gao
The reason might be the parallelism of your task is only 1, that's too low.
See [1] to specify proper parallelism  for your job, and the execution time
should be reduced significantly.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html

*Best Regards,*
*Zhenghua Gao*


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
wrote:

> Hi all,
>
> I am running Flink on a standalone cluster and getting very long
> execution time for the streaming queries like WordCount for a fixed text
> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
> have a text file with size of 2GB. When I run the Flink on a standalone
> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize,
> it took around two hours to finish counting this file while a simple
> python script can do it in around 7 minutes. Just wondering what is
> wrong with my setup. I ran the experiments on a cluster with six
> taskManagers, but I still get very long execution time like 25 minutes
> or so. I tried to increase the JVM heap size to have lower execution
> time but it did not help. I attached the log file and the Flink
> configuration file to this email.
>
> Best,
>
> Habib
>
>


Re: Joining Pojos

2019-09-17 Thread Zhenghua Gao
POJO is available in KeySelector[1].
Could you provide more information about your problem? Version of Flink?
Error messages?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions

*Best Regards,*
*Zhenghua Gao*


On Mon, Sep 16, 2019 at 11:16 PM Benjamin Wootton <
benjamin.wootton.perso...@gmail.com> wrote:

> Hi All
>
> I'm new to Flink.  I am having a lot of success but I'm struggling with
> Windowed joins over Pojos.
>
> In a toy example I am trying to respond to flight delay events and pull
> some fields from flight details:
>
> flightDelaysStream
> .map(new FlightDelaysMapper())
> .join(flightDetailsStream)
>
> * .where( new FlightDelayKeySelector() ) .equalTo(new MyKeySelector() )*
> .window(TumblingEventTimeWindows.of(Time.seconds(10)))
> .apply(new JF())
> .print();
>
> My problem is in the where and equalTo clauses.  I can't seem to specify a
> Key selector for flight details as the equalTo clause doesn't accept
> anything related to FlightDetails.
>
> All of the examples I can find online are Tuples.  Should this kind of
> thing be possible with Pojos?  Much prefer to stay in the domain objects if
> possible.
>
> Thanks
> Ben
>
>
>
>
>
>
>


Re: Can I do a lookup in FlinkSQL?

2019-09-16 Thread Zhenghua Gao
The lookup fashion Temporal Join[1] should be a solution for your case and
there is an ITCase as an example[2]

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala


*Best Regards,*
*Zhenghua Gao*


On Mon, Sep 16, 2019 at 9:23 PM srikanth flink  wrote:

> Hi there,
>
> I'm working with streaming in FlinkSQL. I've two tables created one with
> dynamic stream and the other a periodic updates.
> I would like to keep the periodic table a static(but updates with new data
> every day or so by flushing the old), So at any point of time the static
> table should contain new set of data.
> With dynamic table being populated with stream data, could I do a lookup on
> a column of static table to find if the value exists.
>
> This is what I have done:
> dynamic table: sourceKafka
> static table: badips
>
> Trying to build a list, kind of using ROW() function and done. From dynamic
> table, trying to lookup into the list if the value exists.
> Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
> where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
> Resonse:
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
> operator must have compatible types
>
> Is it possible to solve my use case? If so, where am I going wrong?
>
> Thanks
> Srikanth
>


Re: processing avro data source using DataSet API and output to parquet

2019-08-19 Thread Zhenghua Gao
the DataStream API should fully subsume the DataSet API (through bounded
streams) in the long run [1]
And you can consider use Table/SQL API in your project.

[1]
https://flink.apache.org/roadmap.html#analytics-applications-and-the-roles-of-datastream-dataset-and-table-api

*Best Regards,*
*Zhenghua Gao*


On Fri, Aug 16, 2019 at 11:52 PM Lian Jiang  wrote:

> Thanks. Which api (dataset or datastream) is recommended for file handling
> (no window operation required)?
>
> We have similar scenario for real-time processing. May it make sense to
> use datastream api for both batch and real-time for uniformity?
>
> Sent from my iPhone
>
> On Aug 16, 2019, at 00:38, Zhenghua Gao  wrote:
>
> Flink allows hadoop (mapreduce) OutputFormats in Flink jobs[1]. You can
> have a try with Parquet OutputFormat[2].
>
> And if you can turn to DataStream API,
> StreamingFileSink + ParquetBulkWriter meets your requirement[3][4].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
> [2]
> https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
> [3]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
> [4]
> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java
>
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Aug 16, 2019 at 1:04 PM Lian Jiang  wrote:
>
>> Hi,
>>
>> I am using Flink 1.8.1 DataSet for a batch processing. The data source is
>> avro files and I want to output the result into parquet.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/
>> only has no related information. What's the recommended way for doing this?
>> Do I need to write adapters? Appreciate your help!
>>
>>
>>


Re: processing avro data source using DataSet API and output to parquet

2019-08-16 Thread Zhenghua Gao
Flink allows hadoop (mapreduce) OutputFormats in Flink jobs[1]. You can
have a try with Parquet OutputFormat[2].

And if you can turn to DataStream API,
StreamingFileSink + ParquetBulkWriter meets your requirement[3][4].

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
[2]
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
[3]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
[4]
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java


*Best Regards,*
*Zhenghua Gao*


On Fri, Aug 16, 2019 at 1:04 PM Lian Jiang  wrote:

> Hi,
>
> I am using Flink 1.8.1 DataSet for a batch processing. The data source is
> avro files and I want to output the result into parquet.
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/
> only has no related information. What's the recommended way for doing this?
> Do I need to write adapters? Appreciate your help!
>
>
>


Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-13 Thread Zhenghua Gao
I wrote a demo example for time windowed join which you can pick up [1]

[1] https://gist.github.com/docete/8e78ff8b5d0df69f60dda547780101f1

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao  wrote:

> You can check the plan after optimize to verify it's a regular join or
> time-bounded join(Should have a WindowJoin). The most direct way is
> breakpoint at optimizing phase [1][2].
> And you can use your TestData and create an ITCase for debugging [3]
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
> [2]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
> [3]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi there,
>>
>> Currently, I'm trying to write a SQL query which shall executed a time
>> windowed/bounded JOIN on two data streams.
>>
>> Suppose I have stream1 with attribute id, ts, user and stream2 with
>> attribute id, ts, userName. I want to receive the natural JOIN of both
>> streams with events of the same day.
>>
>> In Oracle (With a ts column as number instead of Timestamp, for
>> historical reasons), I do the following:
>>
>> SELECT *
>>   FROM STREAM1
>>   JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
>> AND TRUNC(TO_DATE('19700101', 'MMDD') + ( 1 / 24 / 60 / 
>> 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'MMDD') + ( 1 / 
>> 24 / 60 / 60 / 1000 ) * STREAM2."ts");
>>
>> which yields 294 rows with my test data (14 elements from stream1 match
>> to 21 elements in stream2 on the one day of test data). Now I want to query
>> the same in Flink. So I registered both streams as table and properly
>> registered the even-time (by specifying ts.rowtime as table column).
>>
>> My goal is to produce a time-windowed JOIN so that, if both streams
>> advance their watermark far enough, an element is written out into an
>> append only stream.
>>
>> First try (to conform time-bounded-JOIN conditions):
>>
>> SELECT s1.id, s2.id
>>   FROM STREAM1 AS s1
>>   JOIN STREAM2 AS s2
>> ON s1.`user` = s2.userName
>>AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL 
>> '24' HOUR
>>AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL 
>> '24' HOUR
>>AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, 
>> INTERVAL'1' DAY) -- Reduce to matchings on the same day.
>>
>> This yielded in the exception "Rowtime attributes must not be in the
>> input rows of a regular join. As a workaround you can cast the time
>> attributes of input tables to TIMESTAMP before.". So I'm still in the area
>> of regular joins, not time-windowed JOINs, even though I made the explicit
>> BETWEEN for both input streams!
>>
>> Then I found [1], which really is my query but without the last condition
>> (reduce to matching on the same day). I tried this one as well, just to
>> have a starting point, but the error is the same.
>> I then reduced the Condition to just one time bound:
>>
>> SELECT s1.id, s2.id
>>   FROM STREAM1 AS s1
>>   JOIN STREAM2 AS s2
>> ON s1.`user` = s2.userName
>>AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL 
>> '24' HOUR
>>
>> which runs as a query but doesn't produce any results. Most likely
>> because Flink still thinks of a regular join instead of a time-window JOIN
>> and doesn't emit any resutls. (FYI interest, after executing the query, I
>> convert the Table back to a stream via tEnv.toAppendStream and I use Flink
>> 1.8.0 for tests).
>>
>> My questions are now:
>> 1. How do I see if Flink treats my table result as a regular JOIN result
>> or a time-bounded JOIN?
>> 2. What is the proper way to formulate my initial query, finding all
>> matching events within the same tumbling window?
>>
>> Best regards
>> Theo Diefenthal
>>
>> [1]
>> https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
>> Slide 18
>>
>


Re: flink on yarn,提交方式是per job的话,如何保证高可用?

2019-08-13 Thread Zhenghua Gao
JM is restarted by YARN on failure [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html#yarn-cluster-high-availability

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 13, 2019 at 4:51 PM 陈帅  wrote:

> 请教一下:flink on yarn,提交方式是per job的话,如何保证高可用?
>


Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-13 Thread Zhenghua Gao
You can check the plan after optimize to verify it's a regular join or
time-bounded join(Should have a WindowJoin). The most direct way is
breakpoint at optimizing phase [1][2].
And you can use your TestData and create an ITCase for debugging [3]


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
[3]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala

*Best Regards,*
*Zhenghua Gao*


On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> Currently, I'm trying to write a SQL query which shall executed a time
> windowed/bounded JOIN on two data streams.
>
> Suppose I have stream1 with attribute id, ts, user and stream2 with
> attribute id, ts, userName. I want to receive the natural JOIN of both
> streams with events of the same day.
>
> In Oracle (With a ts column as number instead of Timestamp, for historical
> reasons), I do the following:
>
> SELECT *
>   FROM STREAM1
>   JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
> AND TRUNC(TO_DATE('19700101', 'MMDD') + ( 1 / 24 / 60 / 
> 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'MMDD') + ( 1 / 
> 24 / 60 / 60 / 1000 ) * STREAM2."ts");
>
> which yields 294 rows with my test data (14 elements from stream1 match to
> 21 elements in stream2 on the one day of test data). Now I want to query
> the same in Flink. So I registered both streams as table and properly
> registered the even-time (by specifying ts.rowtime as table column).
>
> My goal is to produce a time-windowed JOIN so that, if both streams
> advance their watermark far enough, an element is written out into an
> append only stream.
>
> First try (to conform time-bounded-JOIN conditions):
>
> SELECT s1.id, s2.id
>   FROM STREAM1 AS s1
>   JOIN STREAM2 AS s2
> ON s1.`user` = s2.userName
>AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' 
> HOUR
>AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' 
> HOUR
>AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, 
> INTERVAL'1' DAY) -- Reduce to matchings on the same day.
>
> This yielded in the exception "Rowtime attributes must not be in the input
> rows of a regular join. As a workaround you can cast the time attributes of
> input tables to TIMESTAMP before.". So I'm still in the area of regular
> joins, not time-windowed JOINs, even though I made the explicit BETWEEN for
> both input streams!
>
> Then I found [1], which really is my query but without the last condition
> (reduce to matching on the same day). I tried this one as well, just to
> have a starting point, but the error is the same.
> I then reduced the Condition to just one time bound:
>
> SELECT s1.id, s2.id
>   FROM STREAM1 AS s1
>   JOIN STREAM2 AS s2
> ON s1.`user` = s2.userName
>AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' 
> HOUR
>
> which runs as a query but doesn't produce any results. Most likely because
> Flink still thinks of a regular join instead of a time-window JOIN and
> doesn't emit any resutls. (FYI interest, after executing the query, I
> convert the Table back to a stream via tEnv.toAppendStream and I use Flink
> 1.8.0 for tests).
>
> My questions are now:
> 1. How do I see if Flink treats my table result as a regular JOIN result
> or a time-bounded JOIN?
> 2. What is the proper way to formulate my initial query, finding all
> matching events within the same tumbling window?
>
> Best regards
> Theo Diefenthal
>
> [1]
> https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
> Slide 18
>


Re: How to make two SQLs use the same KafkaTableSource?

2019-08-08 Thread Zhenghua Gao
Blink planner support lazy translation for multiple SQLs, and the common
nodes will be reused in a single job.
The only thing you need note here is the unified TableEnvironmentImpl do
not support conversions between Table(s) and Stream(s).
U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).

*Best Regards,*
*Zhenghua Gao*


On Fri, Aug 9, 2019 at 12:38 PM Tony Wei  wrote:

> forgot to send to user mailing list.
>
> Tony Wei  於 2019年8月9日 週五 下午12:36寫道:
>
>> Hi Zhenghua,
>>
>> I didn't get your point. It seems that `isEagerOperationTranslation` is
>> always return false. Is that
>> means even I used Blink planner, the sql translation is still in a lazy
>> manner?
>>
>> Or do you mean Blink planner will recognize and link two SQLs to the same
>> kafka source, if
>> they both use the same kafka table, even if the translation is lazy?
>>
>> I'm not familiar with the details of translation process, but I guessed
>> the translating eagerly is not
>> be an only solution. If the translation of the second SQL can reuse the
>> operators from the first SQL,
>> then it is possible to link them to the same kafka source operator.
>>
>> Best,
>> Tony Wei
>>
>> Zhenghua Gao  於 2019年8月9日 週五 上午11:57寫道:
>>
>>> This needs EagerOperationTranslation[1]
>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413>
>>> support. you can try in Blink planner in 1.9.0.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Fri, Aug 9, 2019 at 10:37 AM Tony Wei  wrote:
>>>
>>>> Hi,
>>>>
>>>> I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)`
>>>> to register my kafka table.
>>>> However, I found that because SQL is a lazy operation, it will convert
>>>> to DataStream under some
>>>> criteria. For example, `Table#toRetractStream`.
>>>>
>>>> So, when I used two SQLs in one application job, the same kafka table
>>>> will be constructed twice. It
>>>> is not a problem from flink side, because two operators held their own
>>>> state for offsets. But from
>>>> kafka side, they will have the same group_id.
>>>>
>>>> I want to make sure that only one kafka source will commit group_id's
>>>> offsets back to kafka. A
>>>> workaround might be registering the same kafka topic twice with
>>>> different name, group_id for
>>>> two SQLs. But I would still like to know if there is any way to make
>>>> two SQLs just read from the
>>>> same KafkaTableSource? Thanks in advance.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>


Re: 如何获取Flink table api/sql code gen 代码

2019-08-08 Thread Zhenghua Gao
Currently Flink DO NOT provides a direct way to get code gen code. But
there are indirect ways to try.
1) debug in IDE
Flink use Janino to compile all code gen code, and there is a single entry
point [1]
<https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java>
for
Blink planner, [2]
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/Compiler.scala>
for
old planner, you can set breakpoint there and get the code.

2) enable debug logging
Blink planner logging code in CompileUtils, and old planner logging code in
subclass of Compiler

3) use Janino options
Janino caches code in tmp directory, and you can enable these options[3]
<https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/Scanner.java#L71>.
Note: org.codehaus.janino.source_debugging.keep is not supported in current
Janino version, which means this method can only be used to debug in
IDE(need breakpoint to keep source code)

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
[3]
https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/Scanner.java#L71

*Best Regards,*
*Zhenghua Gao*


On Wed, Aug 7, 2019 at 12:02 AM Vincent Cai  wrote:

> Hi all,
> 在Spark中,可以通过调用Dataset的queryExecution.debug.codegen() 方法获得 Catalyst 产生的代码。
> 在Flink是否有类似的方法可以获得code gen的代码?
>
>
> 参考链接:
> https://medium.com/virtuslab/spark-sql-under-the-hood-part-i-26077f85ebf0
>
>
> Regards
> Vincent  Cai


Re: Flink sql join问题

2019-08-08 Thread Zhenghua Gao
可以试下最新flink 1.9 blink
planner的firstRow/lastRow优化[1]能否满足你的需求,目前的限制是只能基于procTime来去重。

* e.g.
* 1. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps first row.
* 2. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps last row.


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecDeduplicateRule.scala

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 6, 2019 at 2:28 PM huang  wrote:

> Hi all,
>
>
> 请问用Flink
> sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗
>
>
> thanks


Re: 【Flink SQL】无法启动env.yaml

2019-04-01 Thread Zhenghua Gao
format 和 schema 应该在同一层。
参考一下 flink-sql-client 测试里TableNumber1的配置文件: test-sql-client-defaults.yaml

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 4:09 PM 曾晓勇 <469663...@qq.com> wrote:

> @1543332...@qq.com
>  
> 谢谢,格式问题后面我检查了也已经调整正确了,直接从flink官网下载最新的版本在启动的时候报错,具体报错如下,目前想调试下能否将生产的个别脚本直接换成FLINKSQL
> 而不走java编程。如果走程序调整的量很大。
>  FLINK 版本:flink-1.7.2-bin-hadoop28-scala_2.11
>  启动命令:/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e
> /home/hadoop/flink_test/env.yaml
>
>
> [hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
> Validating current environment...
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
> at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
> at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
> ... 2 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
> the classpath.
>
>
> Reason:
> The matching factory
> 'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't
> support 'format.schema.#.type'.
>
>
> Supported properties of this factory are:
> connector.path
> connector.path
> format.comment-prefix
> format.field-delimiter
> format.fields.#.name
> format.fields.#.type
> format.ignore-first-line
> format.ignore-parse-errors
> format.line-delimiter
> format.quote-character
> schema.#.name
> schema.#.type
>
>
> The following properties are requested:
> connector.path=/home/hadoop/flink_test/input.csv
> connector.type=filesystem
> format.comment-prefix=#
> format.fields.0.name=MyField1
> format.fields.0.type=INT
> format.fields.1.name=MyField2
> format.fields.1.type=VARCHAR
> format.line-delimiter=\n
> format.schema.0.name=MyField1
> format.schema.0.type=INT
> format.schema.1.name=MyField2
> format.schema.1.type=VARCHAR
> format.type=csv
> update-mode=append
>
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
>
> at
> org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277)
> at
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136)
> at
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:119)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
> ... 4 more
> [hadoop@server2 bin]$
>
>
>
>
>
>
>
>
>
>
>
> -- 调整后的格式问题。
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
>

Re: 【Flink SQL】无法启动env.yaml

2019-04-01 Thread Zhenghua Gao
yaml格式不对,看起来是缩进导致的。
你可以找个在线yaml编辑器验证一下, 比如 [1]
更多yaml格式的说明,参考 [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM 曾晓勇 <469663...@qq.com> wrote:

> 各位好,
>
>今天在测试Flink SQL 无法启动,错误日志如下。请问下配置yaml文件的格式需要注意下什么,分割符号能否支持特殊的符号如
> hive建表语句中的分隔符'\036',详细报错日志如下。
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --配置文件env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>


Re: Flink tableApi 按列排序,只能按一列,不能按多列排序吗?

2019-03-18 Thread Zhenghua Gao
Try:

xx.orderBy('id.desc, 'value1.asc)


*Best Regards,*
*Zhenghua Gao*


On Sat, Mar 16, 2019 at 10:28 AM 刘 文 
wrote:

>
> 输出结果,只按id降序排序,没有按value1升序排序。
>
>
>
>
>
>
>
> package
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderBy
>
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
>
> object Run {
>
>
>   def main(args: Array[String]): Unit = {
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>
> env.setParallelism(1)
>
> val dataSet = env.fromElements( (1,"a",10),(2,"b",20)
> ,(20,"f",200),(3,"c",30) )
>
>
>
> //从dataset转化为 table
> val table = tableEnv.fromDataSet(dataSet)
>
> //注册table
> tableEnv.registerTable("user1",table)
>
>
> //查询table 所有数据
> tableEnv.scan("user1").as('id,'name,'value1)
>   //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)
>   .orderBy('id.desc)
>   .orderBy('value1.asc)
>
>   .first(1000)
>
>   //print 输出 (相当于sink)
>   .print()
>
>
> /**
>   * 输出结果
>   *
>   * 20,f,200
>   * 3,c,30
>   * 2,b,20
>   * 1,a,10
>   */
>
>
>
>   }
>
> }
>
>


Re: ProgramInvocationException when I submit job by 'flink run' after running Flink stand-alone more than 1 month?

2019-02-26 Thread Zhenghua Gao
Seem like there is something wrong with RestServer and the RestClient
didn't connect to it.
U can check the standalonesession log for investigating causes.

btw: The cause of  "no cluster was found"  is ur pid information was
cleaned for some reason.
The pid information is stored in ur TMP directory, it should look like
/tmp/flink-user-taskexecutor.pid or /tmp/flink-user-standalonesession.pid

On Wed, Feb 27, 2019 at 10:27 AM Son Mai  wrote:

> Hi,
> I'm having a question regarding Flink.
> I'm running Flink in stand-alone mode on 1 host (JobManager, TaskManager
> on the same host). At first, I'm able to submit and cancel jobs normally,
> the jobs showed up in the web UI and ran.
> However, after ~1month, when I canceled the old job and submitting a new
> one, I faced *org.apache.flink.client.program.ProgramInvocationException:
> Could not retrieve the execution result.*
> At this moment, I was able to run *flink list* to list current jobs and *flink
> cancel* to cancel the job, but *flink run* failed. Exception was thrown
> and the job was now shown in the web UI.
> When I tried to stop the current stand-alone cluster using *stop-cluster*,
> it said 'no cluster was found'. Then I had to find the pid of flink
> processes and stop them manually. Then if I run *start-cluster* to create
> a new stand-alone cluster, I was able to submit jobs normally.
> The shortened stack-trace: (full stack-trace at google docs link
> 
> )
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 7ef1cbddb744cd5769297f4059f7c531)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob
> (RestClusterClient.java:261)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.ConnectionClosedException: Channel became
> inactive.
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> Channel became inactive.
> ... 37 more
> The error is consistent. It always happens after I let Flink run for a
> while, usually more than 1 month). Why am I not able to submit job to flink
> after a while? What happened here?
> Regards,
>
> Son
>


Re: [Blink]sql client kafka sink 失败

2019-02-26 Thread Zhenghua Gao
gt;
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar
> >> > --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar
> >> > --jar /bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar
> >> --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar
> >> > --jar
> >> >
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar
> >> > --jar /bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > 在 2019-02-22 19:32:18,"Becket Qin"  写道:
> >> > >能不能看一下运行sql-client.sh的运行参数。具体做法是:
> >> > >
> >> > >运行sql-client.sh
> >> > >ps | grep sql-client
> >> > >
> >> > >查看一下其中是不是有这个 flink-connector-kafka-0.11 的 jar.
> >> > >
> >> > >Jiangjie (Becket) Qin
> >> > >
> >> > >On Fri, Feb 22, 2019 at 6:54 PM 张洪涛  wrote:
> >> > >
> >> > >>
> >> > >>
> >> > >> 是包含这个类的
> >> > >>
> >> > >>
> >> > >> jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$1.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$ChecksumFactory.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$Java9ChecksumFactory.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$PureJavaChecksumFactory.class
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/PureJavaCrc32C.class
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >> 在 2019-02-22 18:03:18,"Zhenghua Gao"  写道:
> >> > >> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
> >> > >> >
> >> > >> >cd /tmp/blink/opt/connectors/kafka011
> >> > >> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >> > >> >
> >> > >> >On Fri, Feb 22, 2019 at 2:56 PM 张洪涛  wrote:
> >> > >> >
> >> > >> >>
> >> > >> >>
> >> > >> >> 大家好!
> >> > >> >>
> >> > >> >>
> >> > >> >> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
> >> > >> >>
> >> > >> >>
> >> > >> >> 环境配置
> >> > >> >> blink standalone 模式
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >> 1. 配置environment 启动sql client
> >> > >> >>
> >> > >> >>
> >> > >> >> 2. 创建kafka sink table
> >> > >> >> CREATETABLEkafka_sink(
> >> > >> >>messageKeyVARBINARY,
> >> > >> >>messageValueVARBINARY,
> >> > >> >>PRIMARYKEY(messageKey))
> >> > >> >> with(
> >> > >> >>type='KAFKA011',
> >> > >> >>topic='sink-topic',
> >> > >> >>`bootstrap.servers`='172.19.0.108:9092',
> >> > >> >>retries='3'
> >> > >> >> );
> >> > >> >>
> >> > >> >>
> >> > >> >> 3. 创建查询语句并执行
> >> > >> >> INSERT INTO kafka_sink
> >> > >> >> SELECT CAST('123' AS VARBINARY) AS key,
> >

Re: Re: Re: [Blink]sql client kafka sink 失败

2019-02-24 Thread Zhenghua Gao
确认一下standalone cluster 和 sql client 用的是同一份 flink/blink bin
印象中两者不一致会有一些奇怪的问题。


On Mon, Feb 25, 2019 at 9:56 AM 张洪涛  wrote:

>
>
> sql-client.sh 的启动参数首先在classpath里面会包含kafka相关的jar  另外会有--jar
> 包含所有connector的jar
>
>
> 这些jars在sql-client提交job时候会上传到cluster的blob store 但是很奇怪为啥找不到
>
>
>  00:00:06 /usr/lib/jvm/java-1.8.0-openjdk/bin/java
> -Dlog.file=/bigdata/flink-1.5.1/log/flink-root-sql-client-gpu06.log
> -Dlog4j.configuration=file:/bigdata/flink-1.5.1/conf/log4j-cli.properties
> -Dlogback.configurationFile=file:/bigdata/flink-1.5.1/conf/logback.xml
> -classpath
> /bigdata/flink-1.5.1/lib/flink-python_2.11-1.5.1.jar:/bigdata/flink-1.5.1/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/bigdata/flink-1.5.1/lib/log4j-1.2.17.jar:/bigdata/flink-1.5.1/lib/slf4j-log4j12-1.7.7.jar:/bigdata/flink-1.5.1/lib/flink-dist_2.11-1.5.1.jar::/bigdata/hadoop-2.7.5/etc/hadoop::/bigdata/flink-1.5.1/opt/connectors/kafka011/flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka010/flink-connector-kafka-0.10_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-api-jdo-4.2.4.jar:/bigdata/flink-1.5.1/opt/sql-client/javax.jdo-3.2.0-m3.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-core-4.1.17.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-rdbms-4.1.19.jar:/bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
> org.apache.flink.table.client.SqlClient embedded -d
> conf/sql-client-defaults.yaml --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka011/flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka010/flink-connector-kafka-0.10_2.11-1.5.1-sql-jar.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar
> --jar /bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar --jar
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar
> --jar
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar
> --jar /bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
>
>
>
>
>
>
> 在 2019-02-22 19:32:18,"Becket Qin"  写道:
> >能不能看一下运行sql-client.sh的运行参数。具体做法是:
> >
> >运行sql-client.sh
> >ps | grep sql-client
> >
> >查看一下其中是不是有这个 flink-connector-kafka-0.11 的 jar.
> >
> >Jiangjie (Becket) Qin
> >
> >On Fri, Feb 22, 2019 at 6:54 PM 张洪涛  wrote:
> >
> >>
> >>
> >> 是包含这个类的
> >>
> >>
> >> jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$1.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$ChecksumFactory.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$Java9ChecksumFactory.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$PureJavaChecksumFactory.class
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C.class
> >>
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/PureJavaCrc32C.class
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-02-22 18:03:18,"Zhenghua Gao"  写道:
> >> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
> >> >
> >> >cd /tmp/blink/opt/connectors/kafka011
> >> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >> >
> >> >On Fri, Feb 22, 2019 at 2:56 PM 张洪涛  wrote:
> >> >
> >> >>
> >> >>
> >> >> 大家好!
> >> >>
> >> >>
> >> >> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
> >> >>
> >> >>
> >> >> 环境配置
> >> >> blink standalone 模式
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 1. 配置environment 启动sql client
> >> >>
> >> >>
> >> >> 2. 创建kafka sink table
> >> >> CREATETABLEkafka_sink(
> >> 

Re: Filter Date type in Table API

2019-01-30 Thread Zhenghua Gao
Just try: filter("f_date <= '1998-10-02'.toDate")



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-22 Thread Zhenghua Gao
Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
read data from a database.
u can have a try.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Issue with counter metrics for large number of keys

2019-01-16 Thread Zhenghua Gao
So what you want is the counts of every keys ?
Why didn't you use count aggregation?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-15 Thread Zhenghua Gao
May be you're generating non-standard JSON record.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/