Re: is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-13 Thread Tony Wei
Hi Hangxiang, David,

Thank you for your replies. Your responses are very helpful.

Best regards,
Tony Wei

David Anderson  於 2023年3月14日 週二 下午12:12寫道:

> I believe there is some noticeable overhead if you are using the
> heap-based state backend, but with RocksDB I think the difference is
> negligible.
>
> David
>
> On Tue, Mar 7, 2023 at 11:10 PM Hangxiang Yu  wrote:
> >
> > Hi, Tony.
> > "be detrimental to performance" means that some extra space overhead of
> the field of the key-group may influence performance.
> > As we know, Flink will write the key group as the prefix of the key to
> speed up rescaling.
> > So the format will be like: key group | key len | key | ..
> > You could check the relationship between max parallelism and bytes of
> key group as below:
> > --
> > max parallelism   bytes of key group
> >1281
> >   32768 2
> > --
> > So I think the cost will be very small if the real key length >> 2 bytes.
> >
> > On Wed, Mar 8, 2023 at 1:06 PM Tony Wei  wrote:
> >>
> >> Hi experts,
> >>
> >>> Setting the maximum parallelism to a very large value can be
> detrimental to performance because some state backends have to keep
> internal data structures that scale with the number of key-groups (which
> are the internal implementation mechanism for rescalable state).
> >>>
> >>> Changing the maximum parallelism explicitly when recovery from
> original job will lead to state incompatibility.
> >>
> >>
> >> I read the section above from Flink official document [1], and I'm
> wondering what the detail is regarding to the side-effect.
> >>
> >> Suppose that I have a Flink SQL job with large state, large parallelism
> and using RocksDB as my state backend.
> >> I would like to set the max parallelism as 32768, so that I don't
> bother if the max parallelism can be divided by the parallelism whenever I
> want to scale my job,
> >> because the number of key groups will not differ too much between each
> subtask.
> >>
> >> I'm wondering if this is a good practice, because based on the official
> document it is not recommended actually.
> >> If possible, I would like to know the detail about this side-effect.
> Which state backend will have this issue? and Why?
> >> Please give me an advice. Thanks in advance.
> >>
> >> Best regards,
> >> Tony Wei
> >>
> >> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism
> >
> >
> >
> > --
> > Best,
> > Hangxiang.
>


is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-07 Thread Tony Wei
Hi experts,

Setting the maximum parallelism to a very large value can be detrimental to
> performance because some state backends have to keep internal data
> structures that scale with the number of key-groups (which are the internal
> implementation mechanism for rescalable state).
>
> Changing the maximum parallelism explicitly when recovery from original
> job will lead to state incompatibility.
>

I read the section above from Flink official document [1], and I'm
wondering what the detail is regarding to the side-effect.

Suppose that I have a Flink SQL job with large state, large parallelism and
using RocksDB as my state backend.
I would like to set the max parallelism as 32768, so that I don't bother if
the max parallelism can be divided by the parallelism whenever I want to
scale my job,
because the number of key groups will not differ too much between each
subtask.

I'm wondering if this is a good practice, because based on the
official document it is not recommended actually.
If possible, I would like to know the detail about this side-effect. Which
state backend will have this issue? and Why?
Please give me an advice. Thanks in advance.

Best regards,
Tony Wei

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism


Re: flink 1.16 kafka 流和自定义流collect后,watermark 消失

2022-11-14 Thread Tony Wei
Hi Peihui,

確認下你想調用的方法是不是 connect?因為看起來 stream1.collect(stream2) 不像是 DataStream 支援的 API
如果是的話,想請問你 ConfigSource() 有沒有配置 WatermarkStrategy?connect 後的算子是透過上游兩個算子的
watermark 取最小作為輸出。
因此,如果只定義其中一邊的 WatermarkStrategy 會導致這個算子的 watermark 無法推進。

詳細可以參考這個章節
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#watermarks-in-parallel-streams

Best regards,

Peihui He  於 2022年11月14日 週一 下午2:40寫道:

> Hi,
>
> 如题,代码大概如下:
>
> stream1  = env.fromSource(kafkaSource, wartermarkStrategy)
> stream2 = env.addSource(ConfigSource())
>
> stream1.collect(stream2).process(ProcessFunction()).print()
>
> 这种情况下在collect时没有watermark, 是什么原因呢?
>


Re: Re: flink1.14 注册mysql connector报错

2022-02-24 Thread Tony Wei
Hi xiaoyue,

看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。

best regards,

xiaoyue  於 2022年2月25日 週五 下午2:36寫道:

> Hi tony,
>完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
> # 执行环境
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> # hive源
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> String confSite = "src\\main\\resources";
>
> String version = "3.1.2";
>
> String defaultDatabase = "fund_analysis";
>
> HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
> tEnv.registerCatalog("hive", hiveCat);
>
> tEnv.useCatalog("hive");
> # hive 取数SQL
> String biz_date = "20211130";
> String tblSource = String.format("select " +
> "coalesce(a.rate,0) as yldrate, " +
> "coalesce(c.rate,0) as riskless_yldrate, " +
> "a.ccy_type, " +
> "a.biz_date, " +
> "b.is_exch_dt, " +
> "a.pf_id " +
> "from " +
> "ts_pf_yldrate a " +
> "inner join td_gl_day b on b.dt = a.biz_date " +
> "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
> "where a.biz_date <= '%s'", biz_date);
> Table table = tEnv.sqlQuery(tblSource);
>
> // 注册flatmap函数
> tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
> // 注册聚合函数
> tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
> // 执行flatmap操作
> Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
> $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
> $("is_exch_dt"),$("pf_id"), biz_date));
>
>  // 切换catalog,并注册表
> tEnv.useCatalog("default_catalog");
> tEnv.createTemporaryView("tagTable",tagTbl);
>
> // 调用函数SharpeRatioAgg 计算结果
>  Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
> // result.execute().print(); (--> 该步 result 可成功打印)
>
> // 下沉操作
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string" +
> ") with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'xxx', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
> tEnv.execute("mysql_sink_test");
>
>
> xiao...@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string, " +
> 

Re: flink1.14 注册mysql connector报错

2022-02-24 Thread Tony Wei
Hi xiaoyue,

請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。

public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);

String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string, " +
"  primary key(dmo_index_code) not enforced) " +
"  with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'yss300377@ZT', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink).print();
//tEnv.execute("mysql_sink_test");
}

輸出的結果為:
++
| result |
++
| OK |
++
1 row in set

best regards,

xiaoyue  於 2022年2月25日 週五 下午1:37寫道:

> flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
>String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string, " +
> "  primary key(dmo_index_code) not enforced) " +
> "  with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'yss300377@ZT', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
>  tEnv.executeSql(mysqlSink);
>  tEnv.execute("mysql_sink_test");
>
> 报错:
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
> "DISABLE" ...
> "ENABLE" ...
> "NORELY" ...
> "NOVALIDATE" ...
> "RELY" ...
> "VALIDATE" ...
> ")" ...
> "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiao...@ysstech.com
>


Re: flink 不触发checkpoint

2022-02-20 Thread Tony Wei
Hi,

有考慮升級 1.14 嗎?Flink 1.14 支持了 FLIP-147,讓 Flink 在 task 為 finished 狀態時仍能觸發
checkpoint [1, 2]。

[1]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

best regards,

RS  於 2022年2月18日 週五 下午5:27寫道:

> 1. 图片挂了,看不到,尽量用文字,或者用图床等工具
> 2. 启动任务有配置checkpoint吗?
>
>
>
>
>
>
>
>
>
>
> 在 2022-02-17 11:40:04,"董少杰"  写道:
>
>
> flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
> flink版本1.12.2。
> 谢谢!
>
>
>
>
>
>
> | |
> 董少杰
> |
> |
> eric21...@163.com
> |


Re: flink sql lookup join中维表不可以是视图吗?

2021-12-01 Thread Tony Wei
Hi,

如果兩次 left join 的話是否滿足你的需求呢?
然後在取 temporal table 的字段時,用 IF 去判斷取值。參考 SQL 如下

SELECT
  c.mer_cust_id,
  *IF(k.mer_cust_id IS NOT NULL AND a.mercust_id IS NOT NULL AND
k.mer_cust_id <> '', k.update_time, NULL) AS update_time*
FROM charge_log as c
  LEFT JOIN ka_mer_info FOR SYSTEM_TIME AS OF c.proc_time AS k
ON c.mer_cust_id = k.mer_cust_id
  LEFT JOIN adp_mer_user_info FOR SYSTEM_TIME AS OF c.proc_time AS a
ON c.mer_cust_id = a.mer_cust_id

不過,這種寫法只能適用在兩張 MySQL 表都保證 mer_cust_id 是唯一主鍵的狀況下。如果 mer_cust_id
不是唯一的話,輸出的結果數量會跟原本提供的 SQL 期望的輸出不一致
比如說 ka_mer_info 有 0 筆數據, adp_mer_user_info 有 2 筆數據,原先的 SQL 會得到 1 筆 left
join 沒成功的數據,上面提供的 SQL 則會輸出 2 筆。


casel.chen  於 2021年12月1日 週三 下午6:33寫道:

> lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup
> join关联视图,会抛
> Temporal Table Join requires primary key in versioned table, but no
> primary key can be found.
>
>
> 请问这种情况要怎么解决?
>
>
> CREATE VIEW query_mer_view (mer_cust_id, update_time) AS
> SELECT a.mer_cust_id, k.update_time
> FROM ka_mer_info k INNER JOIN adp_mer_user_info a on k.mer_cust_id =
> a.mer_cust_id
> where k.mer_cust_id <> '';
>
>
> SELECT
> DATE_FORMAT(c.create_time, '-MM-dd') AS payment_date,
> c.mer_cust_id,
>
>
> c.trans_amt,
> CASE c.trans_stat WHEN 'S' THEN c.trans_amt ELSE 0 END as
> succ_amt ,
>
>
> 1 as trans_cnt,
> CASE c.trans_stat WHEN 'S' THEN 1 ELSE 0  END as succ_cnt ,
> CASE c.trans_stat WHEN 'F' THEN 1 ELSE 0  END as fail_cnt
>
>
> FROM charge_log as c
>  LEFT JOIN query_mer_view FOR SYSTEM_TIME AS OF
> c.proc_time AS q
>  ON c.mer_cust_id = q.mer_cust_id;


Re: FlinkSql回撤流

2021-11-24 Thread Tony Wei
上一封的 sql 稍微有誤,不需要 group by user, ord 才對:

select user, sum(num) as num
> from (
>   select user, ord, num * IF(flag=1, 1, -1) as num
>   from tmpTable
>   ) t1
> group by user


或者也可以考慮這種寫法:

select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
>   select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
>   from tmpTable,
>   group by user, ord
>   ) t1
> group by user


best regards,

Tony Wei  於 2021年11月25日 週四 上午11:01寫道:

> Hi,
>
> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
>
> +--+---+
> | user | num |
> +--+---+
> | b  | 20|
> +--+---+
>
> 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
>
> 或許可以考慮把 sql 寫法改為這樣試試?
>
> select user, sum(num) as num
>> from (
>>   select user, ord, num * IF(flag=1, 1, -1) as num
>>   from tmpTable,
>>   group by user, ord
>>   ) t1
>> group by user
>
>
> best regards,
>
> wushijjian5  於 2021年11月25日 週四 上午10:21寫道:
>
>> Hi,
>> 这三条数据的话:
>> new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
>> Tuple4<>("a","a1",30,0)
>>
>> 计算结果是:
>> | +I |  a |  30 |
>> | +I |  b |  20 |
>> | -D |  a |  30 |
>>
>> 实际想要的是
>> a 30
>> b   20
>> a   0
>> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
>> 不知道说的清不清楚。
>> 只通过flink-sql方式好像实现不了
>>
>> > 2021年11月25日 09:45,Caizhi Weng  写道:
>> >
>> > Hi!
>> >
>> > 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
>> CloseableIterator,然后通过 Row#getKind 获得该 row 对应的 op。
>> >
>> > 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
>> >
>> > wushijjian5 mailto:wsjwoods...@163.com>>
>> 于2021年11月24日周三 下午9:05写道:
>> >
>> >
>> > DataStream> dataStream =
>> env.fromElements(
>> > new Tuple4<>("a", "a1",30,1),
>> > new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>> > new Tuple4<>("a","a2",30,1),
>> > new Tuple4<>("a","a3",30,1));
>> > tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
>> $("num"), $("flag"));
>> > Table table = tEnv.sqlQuery(
>> > " select user,sum(num) as num" +
>> > " from (" +
>> > "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
>> flag " +
>> > "from tmpTable " +
>> > "group by user,ord " +
>> > ") t1" +
>> > " where flag=1 " +
>> > " group by user" +
>> > "");
>> > table.execute().print();
>> >
>> > 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
>> >
>>
>>


Re: FlinkSql回撤流

2021-11-24 Thread Tony Wei
Hi,

對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:

+--+---+
| user | num |
+--+---+
| b  | 20|
+--+---+

因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。

或許可以考慮把 sql 寫法改為這樣試試?

select user, sum(num) as num
> from (
>   select user, ord, num * IF(flag=1, 1, -1) as num
>   from tmpTable,
>   group by user, ord
>   ) t1
> group by user


best regards,

wushijjian5  於 2021年11月25日 週四 上午10:21寫道:

> Hi,
> 这三条数据的话:
> new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
> Tuple4<>("a","a1",30,0)
>
> 计算结果是:
> | +I |  a |  30 |
> | +I |  b |  20 |
> | -D |  a |  30 |
>
> 实际想要的是
> a 30
> b   20
> a   0
> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
> 不知道说的清不清楚。
> 只通过flink-sql方式好像实现不了
>
> > 2021年11月25日 09:45,Caizhi Weng  写道:
> >
> > Hi!
> >
> > 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
> CloseableIterator,然后通过 Row#getKind 获得该 row 对应的 op。
> >
> > 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
> >
> > wushijjian5 mailto:wsjwoods...@163.com>>
> 于2021年11月24日周三 下午9:05写道:
> >
> >
> > DataStream> dataStream =
> env.fromElements(
> > new Tuple4<>("a", "a1",30,1),
> > new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
> > new Tuple4<>("a","a2",30,1),
> > new Tuple4<>("a","a3",30,1));
> > tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
> $("num"), $("flag"));
> > Table table = tEnv.sqlQuery(
> > " select user,sum(num) as num" +
> > " from (" +
> > "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
> flag " +
> > "from tmpTable " +
> > "group by user,ord " +
> > ") t1" +
> > " where flag=1 " +
> > " group by user" +
> > "");
> > table.execute().print();
> >
> > 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
> >
>
>


Re: 如何实现event triggered window?

2021-11-22 Thread Tony Wei
Hi Pinjie,

如果是需要 event triggered 的累計統計更新的話,可以考慮使用 SQL over aggregation
[1]。例如文件中提供的如下範例,計算當前 row  往前一小時內的加總結果。

> SELECT order_id, order_time, amount,
>   SUM(amount) OVER (
> PARTITION BY product
> ORDER BY order_time
> RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
>   ) AS one_hour_prod_amount_sumFROM Orders
>
> 但是這種作法只能根據收到的事件來觸發,無法根據處理時間。換句話說,如果 t=X 沒有數據進來的話,就不會有 t=(X-1) ~ X 的累計統計輸出。
考慮更複雜的情況需要結合事件和處理時間來觸發的話,需要透過 Process Function API 或者用 DataStream API 自定義
Trigger 的方式實現。

best regards,

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/

tison  於 2021年11月23日 週二 下午2:03寫道:

> 如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window
>
> Best,
> tison.
>
>
> tison  于2021年11月23日周二 下午2:00写道:
>
> > 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。
> >
> > Best,
> > tison.
> >
> >
> > tison  于2021年11月23日周二 下午1:59写道:
> >
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
> >>
> >> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Pinjie Huang  于2021年11月23日周二
> 下午1:18写道:
> >>
> >>> Hi Yidan,
> >>>
> >>> Tumbling window 只有
> >>> t=0~1h
> >>> t=1~2h
> >>> 等等的window
> >>>
> >>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
> >>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
> >>>
> >>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao 
> wrote:
> >>>
> >>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
> >>> >
> >>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
> >>> >
> >>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> >>> > >
> >>> > >
> >>> >
> >>>
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1
> >>> > > 具体在第2章第一节
> >>> > >
> >>> > > Pinjie Huang  于2021年11月22日周一
> >>> > 下午3:52写道:
> >>> > >
> >>> > > > Hi friends,
> >>> > > >
> >>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event
> triggerred。
> >>> > > >
> >>> > > > 比如说 想知道过去1小时event A trigger的次数,
> >>> > > >
> >>> > > > 如果使用tumbling window和1h window
> >>> > > > |1h | 1h |
> >>> > > > t=0
> >>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> >>> > > >
> >>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> >>> > > >
> >>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Pinjie Huang
> >>>
> >>
>


Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 Thread Tony Wei
Hi yidan,

你可以試試 SQL Hints [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/


yidan zhao  於 2021年11月2日 週二 下午1:03寫道:

> 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
> 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在
> flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。
> 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'sink.partition-commit.delay'='1 h',
> >   'sink.partition-commit.policy.kind'='metastore,success-file');
>
> Caizhi Weng  于2021年11月2日周二 上午10:47写道:
>
> > Hi!
> >
> > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
> > hive 里表的结构等信息。但 kafka 的表定义仍然要写。
> >
> > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。
> >
> > yidan zhao  于2021年11月1日周一 下午3:05写道:
> >
> > > 如题,我看了官方文档,定义好kafka和hive表。
> > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
> > >
> > > SET table.sql-dialect=hive;CREATE TABLE hive_table (
> > >   user_id STRING,
> > >   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> > > parquet TBLPROPERTIES (
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'sink.partition-commit.delay'='1 h',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file');
> > > SET table.sql-dialect=default;CREATE TABLE kafka_table (
> > >   user_id STRING,
> > >   order_amount DOUBLE,
> > >   log_ts TIMESTAMP(3),
> > >   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> > > watermark on TIMESTAMP column) WITH (...);
> > >
> > >
> > >
> >
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
> > >
> > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
> > >
> > >
> > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
> > >
> >
>


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-12 Thread Tony Wei
Hi

從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector
目前是不支持流式數據源的
你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink
checkpoint 還不支持在 FINISHED task 上執行

你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka
消費的數據會實時的去查 hbase table 的當前數據做關聯。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

chang li  於 2021年9月10日 週五 下午7:40寫道:

> 没有开启Checkpoint
> execEnv.enableCheckpointing(checkpointInterval);
>
> On 2021/09/10 07:41:10, "xia_...@163.com"  wrote:
> > Hi:
> > 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> >
> > DataStream kafkaSource = env.addSource(source);
> > Map> sideOutStreamMap = new HashMap<>();
> > for (RowToColumnBean bean : lists) {
> > OutputTag app = new
> OutputTag(bean.getMainTable()) {
> > };
> > sideOutStreamMap.put(bean.getMainTable(), app);
> > }
> >
> > RowToNumberProcessFunction rowToNumberProcessFunction = new
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> > SingleOutputStreamOperator process =
> kafkaSource.process(rowToNumberProcessFunction);
> >
> > EnvironmentSettings settings = EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env,
> settings, new TableConfig());
> > //设置checkpoint
> >
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
> "10 s");
> >
> > for (RowToColumnBean bean : lists) {
> > DataStream dataStream =
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> >
> > String mainTable = bean.getMainTable().split("
> ")[0].split("\\.")[1].toLowerCase();
> >
> > //Table tmpTable = tableEnv.fromDataStream(dataStream,
> StrUtil.list2Str(bean.getQueryColumns()));
> >
> > tableEnv.createTemporaryView(mainTable, dataStream);
> >
> > String joinTable = mainTable + "_join";
> > tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> > "rowkey STRING,\n" +
> > "info ROW,\n" +
> > "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> > ") WITH (\n" +
> > "'connector' = 'hbase-2.2',\n" +
> > "'table-name' =
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> > "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> > "'zookeeper.znode.parent' = '/hbase'\n" +
> > ")");
> >
> >
> > //查询数据
> > //Table table = tableEnv.sqlQuery("select b.* from tmp a left join
> dformfiled b on a.key = b.rowkey");
> > Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable +
> " a left join " + joinTable + " b on a.key = lower(b.rowkey) and
> b.formid='550' where b.rowkey is not null");
> >
> > TableSchema schema = table.getSchema();
> > schema.getTableColumns().forEach(column -> {
> >
> > System.err.println(column.asSummaryString());
> > });
> >
> > DataStream> tuple2DataStream =
> tableEnv.toRetractStream(table, Row.class);
> > tuple2DataStream.print(mainTable);
> > dataStream.print(mainTable);
> > }
> >
> >
> > xia_...@163.com
> >
>


Re: 请问同一个flink history server能够支持多个flink application cluster吗?

2021-08-20 Thread Tony Wei
Hi Chenyu,

這確實是目前尚未解決的一個問題,相關的 jira issue 可以看這 [1]。
jira issue 底下的討論串有提到一個替代方案是:使用 -D\$internal.pipeline.job-id=$(cat
/proc/sys/kernel/random/uuid|tr -d "-") 主動為 application 模式的任務產生隨機的 jobid。
但由於此配置參數屬於 flink 內部參數,可能不保證未來任何改動後的向後兼容性,請謹慎考慮後再使用。

[1] https://issues.apache.org/jira/browse/FLINK-19358


Chenyu Zheng  於 2021年8月20日 週五 下午12:16寫道:

> History Server的API也是使用jobid作为区分
>
>   *   /config
>   *   /jobs/overview
>   *   /jobs/
>   *   /jobs//vertices
>   *   /jobs//config
>   *   /jobs//exceptions
>   *   /jobs//accumulators
>   *   /jobs//vertices/
>   *   /jobs//vertices//subtasktimes
>   *   /jobs//vertices//taskmanagers
>   *   /jobs//vertices//accumulators
>   *   /jobs//vertices//subtasks/accumulators
>   *   /jobs//vertices//subtasks/
>   *
>  /jobs//vertices//subtasks//attempts/
>   *
>  
> /jobs//vertices//subtasks//attempts//accumulators
>   *   /jobs//plan
>
>
> From: Chenyu Zheng 
> Reply-To: "user-zh@flink.apache.org" 
> Date: Friday, August 20, 2021 at 11:43 AM
> To: "user-zh@flink.apache.org" 
> Subject: 请问同一个flink history server能够支持多个flink application cluster吗?
>
> 您好,
>
> 我们目前在k8s上以flink application模式运行作业,现在希望部署一个history
> server方便debug。但是根据文档,flink
> historyserver貌似只支持单个cluster下不同job的使用方法,如果存在多个cluster,相同的jobID将会出现错误。
>
> 请问对于多个application cluster,history使用的最佳姿势是什么样的?
>
> 谢谢[cid:image001.png@01D795B8.6430A670]
>


Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 Thread Tony Wei
Hi Caizhi,

我測試了 sink.rolling-policy.rollover-interval 這個配置,並且改使用 csv hive table 作為
sink table,結果是符合預期的。再次謝謝你的幫忙。

Tony Wei  於 2021年8月5日 週四 上午10:40寫道:

> Hi,
>
> 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是
> 單純因為這個配置比較好觀察結果。
> 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。
>
> Caizhi Weng  於 2021年8月5日 週四 上午10:36寫道:
>
>> Hi!
>>
>> 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。
>>
>> Tony Wei  于2021年8月5日周四 上午10:21写道:
>>
>> > Hi Experts,
>> >
>> > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
>> > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)
>> >
>> > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
>> > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
>> > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。
>> >
>> > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。
>> >
>> > CREATE CATALOG MyCatalog
>> >> WITH (
>> >>   'type' = 'hive',
>> >>   'hive-conf-dir' = '/etc/hive/conf',
>> >>   'hadoop-conf-dir' = '/etc/hadoop/conf'
>> >> );
>> >>
>> >
>> >
>> > CREATE TABLE gen_users (
>> >>   name STRING,
>> >>   age INT
>> >> ) WITH (
>> >>   'connector' = 'datagen',
>> >>   'rows-per-second' = '50'
>> >> );
>> >
>> >
>> >
>> > insert into `MyCatalog`.`default`.`user_hive_2` /*+
>> >> OPTIONS('sink.parallelism'='2') */ select * from gen_users;
>> >
>> >
>> > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
>> > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
>> > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png]
>> >
>> >
>>
>


Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 Thread Tony Wei
Hi,

感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是
單純因為這個配置比較好觀察結果。
我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。

Caizhi Weng  於 2021年8月5日 週四 上午10:36寫道:

> Hi!
>
> 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。
>
> Tony Wei  于2021年8月5日周四 上午10:21写道:
>
> > Hi Experts,
> >
> > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
> > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)
> >
> > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
> > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
> > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。
> >
> > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。
> >
> > CREATE CATALOG MyCatalog
> >> WITH (
> >>   'type' = 'hive',
> >>   'hive-conf-dir' = '/etc/hive/conf',
> >>   'hadoop-conf-dir' = '/etc/hadoop/conf'
> >> );
> >>
> >
> >
> > CREATE TABLE gen_users (
> >>   name STRING,
> >>   age INT
> >> ) WITH (
> >>   'connector' = 'datagen',
> >>   'rows-per-second' = '50'
> >> );
> >
> >
> >
> > insert into `MyCatalog`.`default`.`user_hive_2` /*+
> >> OPTIONS('sink.parallelism'='2') */ select * from gen_users;
> >
> >
> > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
> > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
> > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png]
> >
> >
>


sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 Thread Tony Wei
Hi Experts,

我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)

我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。

請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。

CREATE CATALOG MyCatalog
> WITH (
>   'type' = 'hive',
>   'hive-conf-dir' = '/etc/hive/conf',
>   'hadoop-conf-dir' = '/etc/hadoop/conf'
> );
>


CREATE TABLE gen_users (
>   name STRING,
>   age INT
> ) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '50'
> );



insert into `MyCatalog`.`default`.`user_hive_2` /*+
> OPTIONS('sink.parallelism'='2') */ select * from gen_users;


[image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
[image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
[image: Screen Shot 2021-08-04 at 4.34.13 PM.png]


Re: 回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 Thread Tony Wei
你好,

如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧?
從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現,
或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下:

insert into t select a, last_value(b ignore nulls) as b, last_value(c
> ignore nulls) as c from $(original_select_statement) group by a;


不過目前 last_value 似乎不支持 ignore nulls,你可以考慮自己實現一個 UDAF 來達成。
另外,這樣的做法也會造成 flink state 不斷增長 (由於 group by 的緣故),所以需要多加小心,比如適當的配置 state ttl。

best regards,

[1]
https://stackoverflow.com/questions/48144641/mysql-using-on-duplicate-key-update-coalesce
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/insert/#syntax

Ye Chen  於 2021年8月2日 週一 下午4:08寫道:

> 你好,我们用的1.11版本。
>
> 需求:table t 有三个字段(a,b,c)
> 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,
>
> 例如mysql 支持:
> insert into t(a,b) select 1,2 on duplicate key update b=2;
> 主键重复的时候只更新字段b,字段c的值不变。
> 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。
> 我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update
> b=2;  会报错 不支持 on duplicate key update
> 同时也测试了一下:insert into t(a,b) select 1,2  也会报错,字段数量不匹配;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table 'default_catalog.default_database.t'
> do not match.
> Cause: Different number of columns.
> 我查看了https://issues.apache.org/jira/browse/FLINK-18726
> 使用最新版的1.13  sql-client测试了一下 insert into t(a,b) select 1,2
> ,相比1.11版本报错,1.13可以执行成功.但是发现和文档中描述的一样,字段c会插入null。如果原有的一条数据是1,2,3执行sql后会变成1,2,null,会造成字段c的数据丢失,这是不允许的。
> 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
>
> 在 2021-08-02 15:39:09,"silence"  写道:
> >用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
> >不行的话可以在ddl中限制列的数量
> >
> >
> >--
> >发件人:Ye Chen 
> >发送时间:2021年8月2日(星期一) 11:37
> >收件人:user-zh ; silence 
> >主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
> >
> >你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
> >[ERROR] Could not execute SQL statement. Reason:
> >org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table 'default_catalog.default_database.t'
> do not match.
> >Cause: Different number of columns.
> >我们的需求是想根据主键更新部分字段
> >-
> >需求:现有table
> >CREATE TABLE t (
> > abigint,
> > bbigint,
> > cbigint,
> >  PRIMARY KEY (a) NOT ENFORCED
> >) WITH (
> >...
> >);
> >我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
> >例如mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key
> update b='4';主键重复的时候只更新字段b,字段c的值不变。
> >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
> >请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
> >
> >
> >在 2021-08-02 10:47:55,"silence"  写道:
> >>如果只想更新部分字段的话可以试下
> >>insert into t(a,b) select a,b from x
> >>
> >>
> >>--
> >>发件人:Ye Chen 
> >>发送时间:2021年7月30日(星期五) 17:57
> >>收件人:user-zh 
> >>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
> >>
> >>现有table
> >>CREATE TABLE t (
> >> abigint,
> >> bbigint,
> >> cbigint,
> >>  PRIMARY KEY (a) NOT ENFORCED
> >>) WITH (
> >>...
> >>);
> >>
> >>
> >>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
> >>mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key
> update b='4';
> >>主键重复的时候只更新字段b,字段c的值不变
> >>
> >>
> >>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
> >>请问这种部分字段更新的场景 使用flink sql应该怎么处理?
> >>
> >>
> >
> >
> >
>


Re: confused about `TO_TIMESTAMP` document description

2021-06-10 Thread Tony Wei
Hi Leonard,

Thanks for confirmation. I have created the jira ticket [1]. The pull
request will be submitted later.

best regards,

[1] https://issues.apache.org/jira/browse/FLINK-22970

Leonard Xu  於 2021年6月10日 週四 下午8:58寫道:

> Hi,Tony
>
> > I found this code snippet [2] might be related to `TO_TIMESTAMP` udf,
> and seems like it won't set use any timezone configuration, so maybe the
> document might be wrong.
>
> Your analysis is right,the document is wrong, we should correct it.
>  Would you like to create an jira ticket and fix this?
>
> Best,
> Leonard
>
>


confused about `TO_TIMESTAMP` document description

2021-06-09 Thread Tony Wei
Hi Expert,

this document [1] said `TO_TIMESTAMP` will use the session time zone to
convert date time string into a timestamp.
If I understand correctly, when I set session time zone to `Asia/Shanghai`
and query `SELECT TO_TIMESTAMP('1970-01-01 08:00:00');`,
the result should be epoch timestamp `0` (i.e. '1970-01-01 08:00:00 UTC+8').

TO_TIMESTAMP(string1[, string2])

Converts date time string *string1* with format *string2* (by default:
'-MM-dd HH:mm:ss') under the session time zone (specified by
TableConfig) to a timestamp.

Only supported in blink planner.
However, I found that result is not same as I expected. I tested it by
running the below query under the `Asia/Shanghai` timezone:

SELECT
> CAST(TO_TIMESTAMP(FROM_UNIXTIME(0)) AS BIGINT),

FROM_UNIXTIME(0),

TO_TIMESTAMP(FROM_UNIXTIME(0));


and I got the result like

  EXPR$0EXPR$1EXPR$2
> 28800   1970-01-01 08:00:00  1970-01-01T08:00


The `FROM_UNIXTIME` did convert the epoch timestamp to string format based
on session time zone, but `FROM_UNIXTIME` didn't.
Therefore, I got the `28800` when I cast timestamp into bigint. The result
actually shift 8 hours.

I found this code snippet [2] might be related to `TO_TIMESTAMP` udf, and
seems like it won't set use any timezone configuration, so maybe the
document might be wrong.

Please correct me if I misunderstood something. Thank you.

best regards,

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#temporal-functions
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java#L322:L377


Re: prometheus metric中如何设置label

2021-05-11 Thread Tony Wei
Hi,

1、是否是这种方式增加label
>
是的,MetricGroup#addGroup(key, value) 的設計其中一個目的就是為了支援 prometheus 的 label 。


> 2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.'

可以透過配置 filterLabelValueCharacters: false 來關閉過濾功能 [1],但使用者需要自行確保不會有非法字元混入
label value 之中。詳細可參考文檔說明。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#prometheus


best regards,

suisuimu <726400...@qq.com> 於 2021年5月10日 週一 下午6:32寫道:

> 基于RichMapFunction自定义Prometheus metric时增加label
> ```
> counter = this.metricGroup
> .addGroup("app", value.getAppName())
> .addGroup("exp", value.getExceptionName())
> .counter("myExpCounter");
> ```
> 通过add group可以在metric中看到label信息
> flink_taskmanager_job_task_operator_app_exp_myExpCounter{app="",endpoint="pushgateway",exp="java_io_IOException",flink_namespace="xxx",host="11_7_9_11",job="xxx",job_id="xxx",job_name="custom_log",namespace="monitoring",operator_id="cf155f65686cb012844f7c745ec70a3c",operator_name="Map",pod="pushgateway-c7648cd5c-tvfb9",service="pushgateway",subtask_index="0",task_attempt_id="7d6fd088c0628eb564753939978086eb",task_attempt_num="0",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_name="Source:_Custom_SourceMapProcessMapSink:_Print_to_Std__Out",tm_id="10_7_9_71:6122_96edca"}
>
> 想问下:
> 1、是否是这种方式增加label
> 2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.'
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Tony Wei
Hi Till,

I have created the ticket to extend the description of `execution.targe`.
https://issues.apache.org/jira/browse/FLINK-22476

best regards,

Tony Wei  於 2021年4月26日 週一 下午5:24寫道:

> Hi Till, Yangze,
>
> I think FLINK-15852 should solve my problem.
> It is my fault that my flink version is not 100% consistent with the
> community version, and FLINK-15852 is the one I missed.
> Thanks for your information.
>
> best regards,
>
> Till Rohrmann  於 2021年4月26日 週一 下午5:14寫道:
>
>> I think you are right that the `GenericCLI` should be the first choice.
>> From the top of my head I do not remember why FlinkYarnSessionCli is still
>> used. Maybe it is in order to support some Yarn specific cli option
>> parsing. I assume it is either an oversight or some parsing has not been
>> completely migrated to the GenericCLI.
>>
>> Cheers,
>> Till
>>
>> On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:
>>
>>> Hi, Till,
>>>
>>> I agree that we need to resolve the issue by overriding the
>>> configuration before selecting the CustomCommandLines. However, IIUC,
>>> after FLINK-15852 the GenericCLI should always be the first choice.
>>> Could you help me to understand why the FlinkYarnSessionCli can be
>>> activated?
>>>
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann 
>>> wrote:
>>> >
>>> > Hi Tony,
>>> >
>>> > I think you are right that Flink's cli does not behave super
>>> consistent at the moment. Case 2. should definitely work because `-t
>>> yarn-application` should overwrite what is defined in the Flink
>>> configuration. The problem seems to be that we don't resolve the
>>> configuration wrt the specified command line options before calling into
>>> `CustomCommandLine.isActive`. If we parsed first the command line
>>> configuration options which can overwrite flink-conf.yaml options and then
>>> replaced them, then the custom command lines (assuming that they use the
>>> Configuration as the ground truth) should behave consistently.
>>> >
>>> > For your questions:
>>> >
>>> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
>>> purpose when introducing the yarn application mode.
>>> > 2. See answer 1.
>>> >
>>> > I think it is a good idea to extend the description of the config
>>> option `execution.target`. Do you want to create a ticket and a PR for it?
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>>> >>
>>> >> Hi, Tony.
>>> >>
>>> >> What is the version of your flink-dist. AFAIK, this issue should be
>>> >> addressed in FLINK-15852[1]. Could you give the client log of case
>>> >> 2(set the log level to DEBUG would be better).
>>> >>
>>> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
>>> >>
>>> >> Best,
>>> >> Yangze Guo
>>> >>
>>> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei 
>>> wrote:
>>> >> >
>>> >> > Hi Experts,
>>> >> >
>>> >> > I recently tried to run yarn-application mode on my yarn cluster,
>>> and I had a problem related to configuring `execution.target`.
>>> >> > After reading the source code and doing some experiments, I found
>>> that there should be some room of improvement for `FlinkYarnSessionCli` or
>>> `AbstractYarnCli`.
>>> >> >
>>> >> > My experiments are:
>>> >> >
>>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>>> run `flink run-application -t yarn-application`: run job successfully.
>>> >> >
>>> >> > `FlinkYarnSessionCli` is not active
>>> >> > `GenericCLI` is active
>>> >> >
>>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>>> `flink run-application -t yarn-application`: run job failed
>>> >> >
>>> >> > failed due to `ClusterDeploymentException` [1]
>>> >> > `FlinkYarnSessionCli` is active
>>> >> >
>>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>>> run `flink run -t yarn-per-job`: run job successfully.

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Tony Wei
Hi Till, Yangze,

I think FLINK-15852 should solve my problem.
It is my fault that my flink version is not 100% consistent with the
community version, and FLINK-15852 is the one I missed.
Thanks for your information.

best regards,

Till Rohrmann  於 2021年4月26日 週一 下午5:14寫道:

> I think you are right that the `GenericCLI` should be the first choice.
> From the top of my head I do not remember why FlinkYarnSessionCli is still
> used. Maybe it is in order to support some Yarn specific cli option
> parsing. I assume it is either an oversight or some parsing has not been
> completely migrated to the GenericCLI.
>
> Cheers,
> Till
>
> On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:
>
>> Hi, Till,
>>
>> I agree that we need to resolve the issue by overriding the
>> configuration before selecting the CustomCommandLines. However, IIUC,
>> after FLINK-15852 the GenericCLI should always be the first choice.
>> Could you help me to understand why the FlinkYarnSessionCli can be
>> activated?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann 
>> wrote:
>> >
>> > Hi Tony,
>> >
>> > I think you are right that Flink's cli does not behave super consistent
>> at the moment. Case 2. should definitely work because `-t yarn-application`
>> should overwrite what is defined in the Flink configuration. The problem
>> seems to be that we don't resolve the configuration wrt the specified
>> command line options before calling into `CustomCommandLine.isActive`. If
>> we parsed first the command line configuration options which can overwrite
>> flink-conf.yaml options and then replaced them, then the custom command
>> lines (assuming that they use the Configuration as the ground truth) should
>> behave consistently.
>> >
>> > For your questions:
>> >
>> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
>> purpose when introducing the yarn application mode.
>> > 2. See answer 1.
>> >
>> > I think it is a good idea to extend the description of the config
>> option `execution.target`. Do you want to create a ticket and a PR for it?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>> >>
>> >> Hi, Tony.
>> >>
>> >> What is the version of your flink-dist. AFAIK, this issue should be
>> >> addressed in FLINK-15852[1]. Could you give the client log of case
>> >> 2(set the log level to DEBUG would be better).
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei 
>> wrote:
>> >> >
>> >> > Hi Experts,
>> >> >
>> >> > I recently tried to run yarn-application mode on my yarn cluster,
>> and I had a problem related to configuring `execution.target`.
>> >> > After reading the source code and doing some experiments, I found
>> that there should be some room of improvement for `FlinkYarnSessionCli` or
>> `AbstractYarnCli`.
>> >> >
>> >> > My experiments are:
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>> run `flink run-application -t yarn-application`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>> `flink run-application -t yarn-application`: run job failed
>> >> >
>> >> > failed due to `ClusterDeploymentException` [1]
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>> run `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>> `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > From `AbstractYarnCli#isActive` [2] and
>> `FlinkYarnSessionCli#isActive` [3], `FlinkYarnSessionCli` will be active
>> when `execution.target` is specified with

when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-24 Thread Tony Wei
Hi Experts,

I recently tried to run yarn-application mode on my yarn cluster, and I had
a problem related to configuring `execution.target`.
After reading the source code and doing some experiments, I found that
there should be some room of improvement for `FlinkYarnSessionCli` or
`AbstractYarnCli`.

My experiments are:

   1. setting `execution.target: yarn-application` in flink-conf.yaml and
   run `flink run-application -t yarn-application`: run job successfully.
  1. `FlinkYarnSessionCli` is not active
  2. `GenericCLI` is active
   2. setting `execution.target: yarn-per-job` in flink-conf.yaml and
run `flink
   run-application -t yarn-application`: run job failed
  1. failed due to `ClusterDeploymentException` [1]
  2. `FlinkYarnSessionCli` is active
   3. setting `execution.target: yarn-application` in flink-conf.yaml and
   run `flink run -t yarn-per-job`: run job successfully.
  1. `FlinkYarnSessionCli` is not active
  2. `GenericCLI` is active
   4. setting `execution.target: yarn-per-job` in flink-conf.yaml and
run `flink
   run -t yarn-per-job`: run job successfully.
  1. `FlinkYarnSessionCli` is active

>From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive` [3],
`FlinkYarnSessionCli` will be active when `execution.target` is specified
with `yarn-per-job` or `yarn-session`.

According to the flink official document [4], I thought the 2nd experiment
should also work well, but it didn't.

> The --target will overwrite the execution.target
> 
>  specified
> in the config/flink-config.yaml.


The root cause is that `FlinkYarnSessionCli` only overwrite the
`execution.target` with `yarn-session` or `yarn-per-job` [5], but no
`yarn-application`.
So, my question is

   1. should we use `FlinkYarnSessionCli` in case 2?
   2. if we should, how we can improve `FlinkYarnSessionCli` so that we can
   overwrite `execution.target` via `--target`?

and one more improvement, the config description for `execution.target` [6]
should include `yarn-application` as well.

[1]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
[2]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
[3]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
[4]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
[5]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
[6]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46

best regards,


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi Piotrek,

There was already an issue [1] and PR for this thread. Should we mark it as
duplicated or related issue?

Best,
Tony Wei

[1] https://issues.apache.org/jira/browse/FLINK-10377

Piotr Nowojski  於 2019年11月28日 週四 上午12:17寫道:

> Hi Tony,
>
> Thanks for the explanation. Assuming that’s what’s happening, then I
> agree, this checkStyle should be removed. I created a ticket for this issue
> https://issues.apache.org/jira/browse/FLINK-14979
>
> Piotrek
>
> On 27 Nov 2019, at 16:28, Tony Wei  wrote:
>
> Hi Piotrek,
>
> The case here was that the first snapshot is a savepoint. I know that if
> the following checkpoint succeeded before the previous one, the previous
> one will be subsumed by JobManager. However, if that previous one is a
> savepoint, it won't be subsumed. That leads to the case that Chesney said.
> The following checkpoint succeeded before the previous savepoint, handling
> both of their pending transaction, but savepoint still succeeded and sent
> the notification to each TaskManager. That led to this exception. Could you
> double check if this is the case? Thank you.
>
> Best,
> Tony Wei
>
> Piotr Nowojski  於 2019年11月27日 週三 下午8:50 寫道:
>
>> Hi,
>>
>> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
>> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
>> has the same logic [1]. If I remember the discussions with Stephan/Till,
>> the way how Flink is using Akka probably guarantees that messages will be
>> always delivered, except of some failure, so `notifyCheckpointComplete`
>> could be missed probably only if a failure happens between snapshot and
>> arrival of the notification. Receiving the same notification twice should
>> be impossible (based on the knowledge passed to me from Till/Stephan).
>>
>> However, for one thing, if that’s possible, then the code should adjusted
>> accordingly. On the other hand, maybe there is no harm in relaxing the
>> contract? Even if we miss this notification (because of some re-ordering?),
>> next one will subsume the missed one and commit everything.
>>
>> Piotrek
>>
>> [1]
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>>
>> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
>>
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
>> The notification for complete checkpoints is not reliable; it may be late,
>> not come at all, possibly even in different order than expected.
>>
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
>> the sink will always fail with an exception.
>>
>> What it should do imo is either a) don't check that there is a pending
>> transaction or b) track the highest checkpoint id received and optionally
>> don't fail if the notification is for an older CP.
>>
>> @piotr WDYT?
>>
>> On 27/11/2019 08:59, Tony Wei wrote:
>>
>> Hi,
>>
>> As the follow up, it seem that savepoint can't be subsumed, so that its
>> notification could still be send to each TMs.
>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>
>> Best,
>> Tony Wei
>>
>> Tony Wei  於 2019年11月27日 週三 下午3:43寫道:
>>
>>> Hi,
>>>
>>> I want to raise this question again, since I have had this exception on
>>> my production job.
>>>
>>> The exception is as follows
>>>
>>>
>>>> 2019-11-27 14:47:29
>>>
>>>
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
>>>> .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:
>>>> 748) Caused by: java.lang.IllegalStateException: checkpoint completed,
>>>> but no transaction pending at org.apache.flink.util.Preconditions
>>>> .checkState(Preconditions.java:195) at
>>>> org.apache.flink.streaming.api.functions.sink.
>>>> TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
>>>> TwoPhaseCommitSinkFunction.java:267) at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfS

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi Piotrek,

The case here was that the first snapshot is a savepoint. I know that if
the following checkpoint succeeded before the previous one, the previous
one will be subsumed by JobManager. However, if that previous one is a
savepoint, it won't be subsumed. That leads to the case that Chesney said.
The following checkpoint succeeded before the previous savepoint, handling
both of their pending transaction, but savepoint still succeeded and sent
the notification to each TaskManager. That led to this exception. Could you
double check if this is the case? Thank you.

Best,
Tony Wei

Piotr Nowojski  於 2019年11月27日 週三 下午8:50 寫道:

> Hi,
>
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
> has the same logic [1]. If I remember the discussions with Stephan/Till,
> the way how Flink is using Akka probably guarantees that messages will be
> always delivered, except of some failure, so `notifyCheckpointComplete`
> could be missed probably only if a failure happens between snapshot and
> arrival of the notification. Receiving the same notification twice should
> be impossible (based on the knowledge passed to me from Till/Stephan).
>
> However, for one thing, if that’s possible, then the code should adjusted
> accordingly. On the other hand, maybe there is no harm in relaxing the
> contract? Even if we miss this notification (because of some re-ordering?),
> next one will subsume the missed one and commit everything.
>
> Piotrek
>
> [1]
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>
> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
>
> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
> The notification for complete checkpoints is not reliable; it may be late,
> not come at all, possibly even in different order than expected.
>
> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
> the sink will always fail with an exception.
>
> What it should do imo is either a) don't check that there is a pending
> transaction or b) track the highest checkpoint id received and optionally
> don't fail if the notification is for an older CP.
>
> @piotr WDYT?
>
> On 27/11/2019 08:59, Tony Wei wrote:
>
> Hi,
>
> As the follow up, it seem that savepoint can't be subsumed, so that its
> notification could still be send to each TMs.
> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>
> Best,
> Tony Wei
>
> Tony Wei  於 2019年11月27日 週三 下午3:43寫道:
>
>> Hi,
>>
>> I want to raise this question again, since I have had this exception on
>> my production job.
>>
>> The exception is as follows
>>
>>
>>> 2019-11-27 14:47:29
>>
>>
>>
>> java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
>>> .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but
>>> no transaction pending at org.apache.flink.util.Preconditions
>>> .checkState(Preconditions.java:195) at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> .notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267) at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .notifyCheckpointComplete(StreamTask.java:822) at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ...
>>> 5 more
>>
>>
>> And these are the checkpoint / savepoint before the job failed.
>> 
>>
>> It seems that checkpoint # 675's notification handled the savepoint #
>> 674's pending transaction holder, but savepoint #674's notification didn't
>> be subsumed or be ignored by JM.
>> Therefore, during the checkpoint #676, some tasks got notification before
>> getting the checkpoint barrier and led to this exception happened, because
>> there was no pending transaction in queue.
>>
>> Does anyone know the details about subsum

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi,

As the follow up, it seem that savepoint can't be subsumed, so that its
notification could still be send to each TMs.
Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?

Best,
Tony Wei

Tony Wei  於 2019年11月27日 週三 下午3:43寫道:

> Hi,
>
> I want to raise this question again, since I have had this exception on my
> production job.
>
> The exception is as follows
>
>
>> 2019-11-27 14:47:29
>
>
>
> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>> .java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>> transaction pending
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:
>> 195)
>> at org.apache.flink.streaming.api.functions.sink.
>> TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
>> TwoPhaseCommitSinkFunction.java:267)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .notifyCheckpointComplete(StreamTask.java:822)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>> ... 5 more
>
>
> And these are the checkpoint / savepoint before the job failed.
> [image: checkoint.png]
>
> It seems that checkpoint # 675's notification handled the savepoint #
> 674's pending transaction holder, but savepoint #674's notification didn't
> be subsumed or be ignored by JM.
> Therefore, during the checkpoint #676, some tasks got notification before
> getting the checkpoint barrier and led to this exception happened, because
> there was no pending transaction in queue.
>
> Does anyone know the details about subsumed notifications mechanism and
> how checkpoint coordinator handle this situation? Please correct me if I'm
> wrong. Thanks.
>
> Best,
> Tony Wei
>
> Stefan Richter  於 2018年10月8日 週一 下午5:03寫道:
>
>> Hi Pedro,
>>
>> unfortunately the interesting parts are all removed from the log, we
>> already know about the exception itself. In particular, what I would like
>> to see is what checkpoints have been triggered and completed before the
>> exception happens.
>>
>> Best,
>> Stefan
>>
>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves > >:
>> >
>> > Hello,
>> >
>> > Find attached the jobmanager.log. I've omitted the log lines from other
>> > runs, only left the job manager info and the run with the error.
>> >
>> > jobmanager.log
>> > <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>>
>> >
>> >
>> >
>> > Thanks again for your help.
>> >
>> > Regards,
>> > Pedro.
>> >
>> >
>> >
>> > -
>> > Best Regards,
>> > Pedro Chaves
>> > --
>> > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-26 Thread Tony Wei
Hi,

I want to raise this question again, since I have had this exception on my
production job.

The exception is as follows


> 2019-11-27 14:47:29



java.lang.RuntimeException: Error while confirming checkpoint
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
> transaction pending
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:
> 195)
> at org.apache.flink.streaming.api.functions.sink.
> TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
> TwoPhaseCommitSinkFunction.java:267)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .notifyCheckpointComplete(StreamTask.java:822)
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
> ... 5 more


And these are the checkpoint / savepoint before the job failed.
[image: checkoint.png]

It seems that checkpoint # 675's notification handled the savepoint # 674's
pending transaction holder, but savepoint #674's notification didn't be
subsumed or be ignored by JM.
Therefore, during the checkpoint #676, some tasks got notification before
getting the checkpoint barrier and led to this exception happened, because
there was no pending transaction in queue.

Does anyone know the details about subsumed notifications mechanism and how
checkpoint coordinator handle this situation? Please correct me if I'm
wrong. Thanks.

Best,
Tony Wei

Stefan Richter  於 2018年10月8日 週一 下午5:03寫道:

> Hi Pedro,
>
> unfortunately the interesting parts are all removed from the log, we
> already know about the exception itself. In particular, what I would like
> to see is what checkpoints have been triggered and completed before the
> exception happens.
>
> Best,
> Stefan
>
> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves  >:
> >
> > Hello,
> >
> > Find attached the jobmanager.log. I've omitted the log lines from other
> > runs, only left the job manager info and the run with the error.
> >
> > jobmanager.log
> > <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>
> >
> >
> >
> > Thanks again for your help.
> >
> > Regards,
> > Pedro.
> >
> >
> >
> > -
> > Best Regards,
> > Pedro Chaves
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-11 Thread Tony Wei
Hi,

補充一下第三個問題,merge 的用途是給所謂的 mergeable window 使用的 (i.e. SessionWindow)
舉個例子,當 SessionWindow 的時間間隔設定為 10s,如果收到 event A (ts = t1), event B (ts =
t1 + 15s),
這個狀況下 event A, event B 會被當作是兩個 session。假如此時收到了一個 event C (ts = t1 + 5s),
且前兩個 window 都還沒被觸發處理,這個時候 event A, B, C 就會被 merge 成同一個 session。
這時調用的就會是 merge function 去把三者合併。

Best
Tony Wei

Px New <15701181132mr@gmail.com> 於 2019年11月10日 週日 上午10:58寫道:

> [image: image.png]建议深入解下 keyWindow,NoKeyWindow 与Assigner TimeWindow
> And WindowsFunction 
>
> Yuan,Youjun  于2019年11月9日周六 下午7:46写道:
>
>> 1, 是
>> 2,没有标准答案,是否可以本地先聚合?
>> 3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥?
>>
>> -邮件原件-
>> 发件人: 王佩 
>> 发送时间: Saturday, November 9, 2019 11:45 AM
>> 收件人: user-zh 
>> 主题: Flink DataStream KeyedStream 与 AggregateFunction
>>
>> 请教下:
>>
>> 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗?
>>
>> 2、假设1成立,这样就会有数据倾斜的问题。该如何解决?
>>
>> 3、假设1成立,如: DataStream
>>.keyBy(userID)
>>.timeWindow()
>>.aggregate(new
>> AggregateFunction(...)),这里的AggregateFunction
>> 为啥还需要merge呢。因为同一个Key的数据只会在同一个Partition中被计算,觉得不需要merge啊。
>>
>> 这三个问题有点疑惑,大神们帮忙看下!
>> 感谢!
>>
>


Questions about how to use State Processor API

2019-10-04 Thread Tony Wei
Hi,

I'm recently trying to use State Processor API, but I have some questions
during the development.

1. Does `OperatorTransformation#bootstrapWith` support scala api `DataSet`?
I tried, but IDE showed that it will have compile error on that line.

2. It seems that creating RocksDBStateBackend should provide a checkpoint
path, but by using
`Savepoint#create` we didn't read or write anything from or into this
checkpoint path. This is for
constructing RocksDBStateBackend object only. Am I right?
```
Savepoint
.create(new RocksDBStateBackend(checkpointPath), maxParallelism)
```
Thanks in advance.

Best,
Tony Wei


Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-24 Thread Tony Wei
Hi Becket,

I have read kafka source code and found that the error won't be propagated
to client if the list of
topic-partition is empty [1], because it bind the error with each
topic-partition. If this list is empty,
then that error won't be packaged into response body. That made the client
didn't get the error
message to find the newer coordinator.

Back to this problem, I think the original design of kafka client might not
prefer to execute
`enqueueNewPartitions` if there is no added topic-partition. It might be a
bug here, and we should
first check if `newPartitionsInTransaction` list is empty before executing
`enqueueNewPartitions`
function. Am I right?

If it can be confirmed as a bug, I would like to submit my patch to fix it.
Thanks for your help.

Best,
Tony Wei

[1]
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L2042

Tony Wei  於 2019年9月20日 週五 下午2:57寫道:

> Hi,
>
> I found that the source code [1] in kafka showed that it always check if
> `newPartitionsInTransaction`
> is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`,
> that is not
> applied to flink kafka producer code [2].
>
> I wrote a simple producer with the `flushNewPartitions` copied from flink
> kafka producer, and
> successfully reproduce this exception. Then, I modified the logic in
> `enqueueNewPartitions` to check
> if there is any `newPartitionsInTransaction` before make this request. And
> this would work well even
> if I restarted the broker who owned this transaction's coordinator, since
> the empty transaction won't
> make any request to server.
>
> The attachments are my simple producer code. Please help to verify what I
> thought is correct. Thanks.
>
> Best,
> Tony Wei
>
> [1]
> https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316
> [2]
> https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273
>
> Tony Wei  於 2019年9月20日 週五 上午11:56寫道:
>
>> Hi,
>>
>> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I
>> opened
>> flink's log level to DEBUG for producer. And I found some logs from flink
>> side
>> regarding this error. Below is some log snippet.
>>
>> It seems that producer client didn't catch this error and retry to find
>> new coordinator.
>> This caused the transaction state is inconsistent between client side and
>> server side.
>> Would it be possible that the problem is caused
>> by FlinkKafkaInternalProducer using
>> java reflection to send `addPartitionsToTransactionHandler` request in
>> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who
>> is familiar
>> with both kafka and flink's kafka connector could help me solve this?
>> Thanks very much.
>>
>> The attachment is my code to reproduce this problem.
>> The cluster's versions are the same as I mentioned in my first email.
>>
>> Best,
>> Tony Wei
>>
>> *flink taskmanager:*
>>
>>> 2019-09-20 02:32:45,927 INFO
>>>  
>>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>>>  - Flushing new partitions
>>> 2019-09-20 02:32:45,927 DEBUG
>>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
>>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>>> partitions=[])
>>>
>> 2019-09-20 02:32:45,931 DEBUG
>>> org.apache.kafka.clients.producer.internals.Sender- [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
>>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>>> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
>>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>>>- [Producer clientId=producer-29, transactionalId=map ->
>>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
>>> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
>>> sink-2e588ce1c86a9d46e2

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-20 Thread Tony Wei
Hi,

I found that the source code [1] in kafka showed that it always check if
`newPartitionsInTransaction`
is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`,
that is not
applied to flink kafka producer code [2].

I wrote a simple producer with the `flushNewPartitions` copied from flink
kafka producer, and
successfully reproduce this exception. Then, I modified the logic in
`enqueueNewPartitions` to check
if there is any `newPartitionsInTransaction` before make this request. And
this would work well even
if I restarted the broker who owned this transaction's coordinator, since
the empty transaction won't
make any request to server.

The attachments are my simple producer code. Please help to verify what I
thought is correct. Thanks.

Best,
Tony Wei

[1]
https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316
[2]
https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273

Tony Wei  於 2019年9月20日 週五 上午11:56寫道:

> Hi,
>
> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened
> flink's log level to DEBUG for producer. And I found some logs from flink
> side
> regarding this error. Below is some log snippet.
>
> It seems that producer client didn't catch this error and retry to find
> new coordinator.
> This caused the transaction state is inconsistent between client side and
> server side.
> Would it be possible that the problem is caused
> by FlinkKafkaInternalProducer using
> java reflection to send `addPartitionsToTransactionHandler` request in
> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who
> is familiar
> with both kafka and flink's kafka connector could help me solve this?
> Thanks very much.
>
> The attachment is my code to reproduce this problem.
> The cluster's versions are the same as I mentioned in my first email.
>
> Best,
> Tony Wei
>
> *flink taskmanager:*
>
>> 2019-09-20 02:32:45,927 INFO
>>  
>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>>  - Flushing new partitions
>> 2019-09-20 02:32:45,927 DEBUG
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>> partitions=[])
>>
> 2019-09-20 02:32:45,931 DEBUG
>> org.apache.kafka.clients.producer.internals.Sender- [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>>- [Producer clientId=producer-29, transactionalId=map ->
>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
>> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]}
>> with correlation id 12 to node 1
>> 2019-09-20 02:32:45,937 DEBUG
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions []
>> to transaction
>
>
> *kafka-broker-1:*
>
>>  [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized
>> transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with
>> producerId 1008 and producer epoch 1 on partition __transaction_state-37
>> (kafka.coordinator.transaction.TransactionCoordinator)
>
> [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning
>> NOT_COORDINATOR error code to client for map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> NOT_COORDINATOR error to client for map -> Sink:
>> sink-2e588ce1c86a9d46e2

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-19 Thread Tony Wei
Hi,

Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened
flink's log level to DEBUG for producer. And I found some logs from flink
side
regarding this error. Below is some log snippet.

It seems that producer client didn't catch this error and retry to find new
coordinator.
This caused the transaction state is inconsistent between client side and
server side.
Would it be possible that the problem is caused
by FlinkKafkaInternalProducer using
java reflection to send `addPartitionsToTransactionHandler` request in
`FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who is
familiar
with both kafka and flink's kafka connector could help me solve this?
Thanks very much.

The attachment is my code to reproduce this problem.
The cluster's versions are the same as I mentioned in my first email.

Best,
Tony Wei

*flink taskmanager:*

> 2019-09-20 02:32:45,927 INFO
>  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>  - Flushing new partitions
> 2019-09-20 02:32:45,927 DEBUG
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
> clientId=producer-29, transactionalId=map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
> partitions=[])
>
2019-09-20 02:32:45,931 DEBUG
> org.apache.kafka.clients.producer.internals.Sender- [Producer
> clientId=producer-29, transactionalId=map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>  - [Producer clientId=producer-29, transactionalId=map ->
> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]}
> with correlation id 12 to node 1
> 2019-09-20 02:32:45,937 DEBUG
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
> clientId=producer-29, transactionalId=map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions []
> to transaction


*kafka-broker-1:*

>  [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized
> transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with
> producerId 1008 and producer epoch 1 on partition __transaction_state-37
> (kafka.coordinator.transaction.TransactionCoordinator)

[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning
> NOT_COORDINATOR error code to client for map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting
> append of COMMIT to transaction log with coordinator and returning
> NOT_COORDINATOR error to client for map -> Sink:
> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)




Tony Wei  於 2019年9月19日 週四 下午6:25寫道:

> Hi Becket,
>
> I found that those transactions were tend to be failed
> with InvalidTxnStateException if
> they never sent any records but committed after some brokers being
> restarted.
>
> Because the error state transition always failed from EMPTY to COMMIT, I
> run a
> job with only one parallelism with or without output to Kafka. I tried to
> restart brokers
> and see what happened on these two situations and found that I couldn't
> make job failed
> when job continuously emitted output to Kafka, but it could fail when it
> didn't send any
> output to Kafka.
>
> I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka
> java producer
> to reproduce the exception, but it worked well. Maybe my observation is
> not correct,
> but the experiment result seems like that. Do you have any thoughts on
> this?
>
> Best,
> Tony Wei
>
> Tony Wei  於 2019年9月19日 週四 上午11:08寫道:
>
>> Hi Becket,
>>
>> One more thing, I have tried to restart other brokers without active
>> controller, but
>> this exception might happen as well. So it should be independent  of the
>> active
>> controller like you said.
>>
>> Best,
>> Tony Wei
>>
>> Tony Wei  於 2019年9月18日 週三 下午6:14寫道:
>>
>>> Hi Becket,
>>>
>&g

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-19 Thread Tony Wei
Hi Becket,

I found that those transactions were tend to be failed
with InvalidTxnStateException if
they never sent any records but committed after some brokers being
restarted.

Because the error state transition always failed from EMPTY to COMMIT, I
run a
job with only one parallelism with or without output to Kafka. I tried to
restart brokers
and see what happened on these two situations and found that I couldn't
make job failed
when job continuously emitted output to Kafka, but it could fail when it
didn't send any
output to Kafka.

I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka
java producer
to reproduce the exception, but it worked well. Maybe my observation is not
correct,
but the experiment result seems like that. Do you have any thoughts on this?

Best,
Tony Wei

Tony Wei  於 2019年9月19日 週四 上午11:08寫道:

> Hi Becket,
>
> One more thing, I have tried to restart other brokers without active
> controller, but
> this exception might happen as well. So it should be independent  of the
> active
> controller like you said.
>
> Best,
> Tony Wei
>
> Tony Wei  於 2019年9月18日 週三 下午6:14寫道:
>
>> Hi Becket,
>>
>> I have reproduced this problem in our development environment. Below is
>> the log message with debug level.
>> Seems that the exception was from broker-3, and I also found other error
>> code in broker-2 during the time.
>>
>> There are others INVALID_TXN_STATE error for other transaction id. I just
>> list one of them. Above log messages only
>> shows message with `kafka-sink--eba862242e60de7e4744f3307058f865-7's`
>> substring before `2019-09-18 07:14`.
>>
>> I didn't see other information to find out why producer tried to make
>> transaction state from EMPTY to COMMIT, and what
>> made NOT_COORDINATOR happened. Do you have any thought about what's
>> happening? Thanks.
>>
>> *Number of Kafka brokers: 3*
>> *logging config for kafka:*
>>
>>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>>>
>>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
>>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
>>> (%c)%n
>>> log4j.appender.transactionAppender.MaxFileSize=10MB
>>> log4j.appender.transactionAppender.MaxBackupIndex=10
>>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
>>> log4j.additivity.kafka.coordinator.transaction=true
>>>
>>
>>
>> *flink-ui*
>>>
>>> Timestamp: 2019-09-18, 07:13:43
>>>
>>
>>
>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one
>>> of transactions failed, logging first encountered failure
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>>> ... 5 more
>>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
>>> producer attempted a transactional operation in an invalid state
>>>
>>
>>
>> *broker-3*
>>>
>>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
>>> TransactionalId: blacklist -> Sink:
>>> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
>>> received transaction marker result to send: COMMIT
>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
>>> append of COMMIT to transaction log with coordinator and returning
>>> INVALID_TXN_STATE error to client for b

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-18 Thread Tony Wei
Hi Becket,

One more thing, I have tried to restart other brokers without active
controller, but
this exception might happen as well. So it should be independent  of the
active
controller like you said.

Best,
Tony Wei

Tony Wei  於 2019年9月18日 週三 下午6:14寫道:

> Hi Becket,
>
> I have reproduced this problem in our development environment. Below is
> the log message with debug level.
> Seems that the exception was from broker-3, and I also found other error
> code in broker-2 during the time.
>
> There are others INVALID_TXN_STATE error for other transaction id. I just
> list one of them. Above log messages only
> shows message with `kafka-sink--eba862242e60de7e4744f3307058f865-7's`
> substring before `2019-09-18 07:14`.
>
> I didn't see other information to find out why producer tried to make
> transaction state from EMPTY to COMMIT, and what
> made NOT_COORDINATOR happened. Do you have any thought about what's
> happening? Thanks.
>
> *Number of Kafka brokers: 3*
> *logging config for kafka:*
>
>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>>
>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
>> (%c)%n
>> log4j.appender.transactionAppender.MaxFileSize=10MB
>> log4j.appender.transactionAppender.MaxBackupIndex=10
>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
>> log4j.additivity.kafka.coordinator.transaction=true
>>
>
>
> *flink-ui*
>>
>> Timestamp: 2019-09-18, 07:13:43
>>
>
>
> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of
>> transactions failed, logging first encountered failure
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>> ... 5 more
>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
>> producer attempted a transactional operation in an invalid state
>>
>
>
> *broker-3*
>>
>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
>> TransactionalId: blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
>> received transaction marker result to send: COMMIT
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
>> TransactionalId: blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
>> received transaction marker result to send: COMMIT
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating
>> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4,
>> txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
>> txnStartT

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-18 Thread Tony Wei
oordinator epoch 0 for blacklist -> Sink: kafka-sink--eba862242e6
> 0de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1,
> txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with
> coordinator epoch 0 for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2,
> txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with
> coordinator epoch 0 for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3,
> txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with
> coordinator epoch 0 for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning
> NOT_COORDINATOR error code to client for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7's AddPartitions request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting
> append of COMMIT to transaction log with coordinator and returning
> NOT_COORDINATOR error to client for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)
>

Best,
Tony Wei


Becket Qin  於 2019年9月2日 週一 下午10:03寫道:

> Hi Tony,
>
> From the symptom it is not quite clear to me what may cause this issue.
> Supposedly the TransactionCoordinator is independent of the active
> controller, so bouncing the active controller should not have special
> impact on the transactions (at least not every time). If this is stably
> reproducible, is it possible to turn on debug level logging
> on kafka.coordinator.transaction.TransactionCoordinator to see what does
> the broker say?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei  wrote:
>
>> Hi,
>>
>> Has anyone run into the same problem? I have updated my producer
>> transaction timeout to 1.5 hours,
>> but the problem sill happened when I restarted broker with active
>> controller. It might not due to the
>> problem that checkpoint duration is too long causing transaction timeout.
>> I had no more clue to find out
>> what's wrong about my kafka producer. Could someone help me please?
>>
>> Best,
>> Tony Wei
>>
>> Fabian Hueske  於 2019年8月16日 週五 下午4:10寫道:
>>
>>> Hi Tony,
>>>
>>> I'm sorry I cannot help you with this issue, but Becket (in CC) might
>>> have an idea what went wrong here.
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>>> tony19920...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> Currently, I was trying to update our kafka cluster with larger `
>>>> transaction.max.timeout.ms`. The
>>>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>>>> to set as 3 hours.
>>>>
>>>> When I was doing rolling-restart for my brokers, this exception came to
>>>> me on the next checkpoint
>>>> after I restarted the broker with active controller.
>>>>
>>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(Thread

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-08-29 Thread Tony Wei
Hi,

Has anyone run into the same problem? I have updated my producer
transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active
controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I
had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske  於 2019年8月16日 週五 下午4:10寫道:

> Hi Tony,
>
> I'm sorry I cannot help you with this issue, but Becket (in CC) might have
> an idea what went wrong here.
>
> Best, Fabian
>
> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
> tony19920...@gmail.com>:
>
>> Hi,
>>
>> Currently, I was trying to update our kafka cluster with larger `
>> transaction.max.timeout.ms`. The
>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>> to set as 3 hours.
>>
>> When I was doing rolling-restart for my brokers, this exception came to
>> me on the next checkpoint
>> after I restarted the broker with active controller.
>>
>> java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>> The producer attempted a transactional operation in an invalid state
>>
>>
>> I have no idea why it happened, and I didn't find any error log from
>> brokers. Does anyone have
>> this exception before? How can I prevent from this exception when I tried
>> to restart kafka cluster?
>> Does this exception mean that I will lost data in some of these
>> transactions?
>>
>> flink cluster version: 1.8.1
>> kafka cluster version: 1.0.1
>> flink kafka producer version: universal
>> producer transaction timeout: 15 minutes
>> checkpoint interval: 5 minutes
>> number of concurrent checkpoint: 1
>> max checkpoint duration before and after the exception occurred:  < 2
>> minutes
>>
>> Best,
>> Tony Wei
>>
>


Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-08-13 Thread Tony Wei
Hi,

Currently, I was trying to update our kafka cluster with larger `
transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to
set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me
on the next checkpoint
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at
> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
> failed, logging first encountered failure at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
> The producer attempted a transactional operation in an invalid state


I have no idea why it happened, and I didn't find any error log from
brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried
to restart kafka cluster?
Does this exception mean that I will lost data in some of these
transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2
minutes

Best,
Tony Wei


Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Tony Wei
Hi Piotr,

Thanks a lot. I need exactly once in my use case, but instead of having the
risk of losing data, at least once is more acceptable when error occurred.

Best,
Tony Wei

Piotr Nowojski  於 2019年8月12日 週一 下午3:27寫道:

> Hi,
>
> Yes, if it’s due to transaction timeout you will lose the data.
>
> Whether can you fallback to at least once, that depends on Kafka, not on
> Flink, since it’s the Kafka that timeouts those transactions and I don’t
> see in the documentation anything that could override this [1]. You might
> try disabling the mechanism via setting `
> transaction.abort.timed.out.transaction.cleanup.interval.ms` or `
> transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s
> question more to Kafka guys. Maybe Becket could help with this.
>
> Also it MIGHT be that Kafka doesn’t remove records from the topics when
> aborting the transaction and MAYBE you can still access them via
> “READ_UNCOMMITTED” mode. But that’s again, question to Kafka.
>
> Sorry that I can not help more.
>
> If you do not care about exactly once, why don’t you just set the
> connector to at least once mode?
>
> Piotrek
>
> On 12 Aug 2019, at 06:29, Tony Wei  wrote:
>
> Hi,
>
> I had the same exception recently. I want to confirm that if it is due to
> transaction timeout,
> then I will lose those data. Am I right? Can I make it fall back to at
> least once semantic in
> this situation?
>
> Best,
> Tony Wei
>
> Piotr Nowojski  於 2018年3月21日 週三 下午10:28寫道:
>
>> Hi,
>>
>> But that’s exactly the case: producer’s transaction timeout starts when
>> the external transaction starts - but FlinkKafkaProducer011 keeps an active
>> Kafka transaction for the whole period between checkpoints.
>>
>> As I wrote in the previous message:
>>
>> > in case of failure, your timeout must also be able to cover the
>> additional downtime required for the successful job restart. Thus you
>> should increase your timeout accordingly.
>>
>> I think that 15 minutes timeout is a way too small value. If your job
>> fails because of some intermittent failure (for example worker
>> crash/restart), you will only have a couple of minutes for a successful
>> Flink job restart. Otherwise you will lose some data (because of the
>> transaction timeouts).
>>
>> Piotrek
>>
>> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
>>
>> Hi Piotr,
>>
>> Now my streaming pipeline is working without retries.
>> I decreased Flink's checkpoint interval from 15min to 10min as you
>> suggested [see screenshot_10min_ckpt.png].
>>
>> I though that producer's transaction timeout starts when the external
>> transaction starts.
>> The truth is that Producer's transaction timeout starts after the last
>> external checkpoint is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for
>> Flink's checkpoint interval, and every checkpoint takes less than 5
>> minutes, everything is working fine.
>> Am I right?
>>
>> Anyway thank you very much for the detailed explanation!
>>
>> Best,
>>
>> Dongwon
>>
>>
>>
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Please increase transaction.timeout.ms to a greater value or decrease
>>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>>> two values are overlapping. I think that’s even visible on the screenshots.
>>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>>> transaction duration of 21 minutes.
>>>
>>> In HAPPY SCENARIO (without any failure and restarting), you should
>>> assume that your timeout interval should cover with some safety margin the
>>> period between start of a checkpoint and end of the NEXT checkpoint, since
>>> this is the upper bound how long the transaction might be used. In your
>>> case at least ~25 minutes.
>>>
>>> On top of that, as described in the docs,
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>>  ,
>>> in case of failure, your timeout must also be able to cover the additional
>>> downtime required for the successful job restart. Thus you should increase
>>> your timeout accordingly.
>>>
>>> Piotrek
>>>
>>>
>>> O

Re: Kafka ProducerFencedException after checkpointing

2019-08-11 Thread Tony Wei
Hi,

I had the same exception recently. I want to confirm that if it is due to
transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at
least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski  於 2018年3月21日 週三 下午10:28寫道:

> Hi,
>
> But that’s exactly the case: producer’s transaction timeout starts when
> the external transaction starts - but FlinkKafkaProducer011 keeps an active
> Kafka transaction for the whole period between checkpoints.
>
> As I wrote in the previous message:
>
> > in case of failure, your timeout must also be able to cover the
> additional downtime required for the successful job restart. Thus you
> should increase your timeout accordingly.
>
> I think that 15 minutes timeout is a way too small value. If your job
> fails because of some intermittent failure (for example worker
> crash/restart), you will only have a couple of minutes for a successful
> Flink job restart. Otherwise you will lose some data (because of the
> transaction timeouts).
>
> Piotrek
>
> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
>
> Hi Piotr,
>
> Now my streaming pipeline is working without retries.
> I decreased Flink's checkpoint interval from 15min to 10min as you
> suggested [see screenshot_10min_ckpt.png].
>
> I though that producer's transaction timeout starts when the external
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for
> Flink's checkpoint interval, and every checkpoint takes less than 5
> minutes, everything is working fine.
> Am I right?
>
> Anyway thank you very much for the detailed explanation!
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Please increase transaction.timeout.ms to a greater value or decrease
>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>> two values are overlapping. I think that’s even visible on the screenshots.
>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>> transaction duration of 21 minutes.
>>
>> In HAPPY SCENARIO (without any failure and restarting), you should assume
>> that your timeout interval should cover with some safety margin the period
>> between start of a checkpoint and end of the NEXT checkpoint, since this is
>> the upper bound how long the transaction might be used. In your case at
>> least ~25 minutes.
>>
>> On top of that, as described in the docs,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>  ,
>> in case of failure, your timeout must also be able to cover the additional
>> downtime required for the successful job restart. Thus you should increase
>> your timeout accordingly.
>>
>> Piotrek
>>
>>
>> On 20 Mar 2018, at 11:58, Dongwon Kim  wrote:
>>
>> Hi Piotr,
>>
>> We have set producer's [transaction.timeout.ms] to 15 minutes and have
>> used the default setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction
>> is not committed properly.
>>
>> Best,
>>
>> - Dongwon
>>
>>
>>
>>
>>
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> What’s your Kafka’s transaction timeout setting? Please both check Kafka
>>> producer configuration (transaction.timeout.ms property) and Kafka
>>> broker configuration. The most likely cause of such error message is when
>>> Kafka's timeout is smaller then Flink’s checkpoint interval and
>>> transactions are not committed quickly enough before timeout occurring.
>>>
>>> Piotrek
>>>
>>> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
>>>
>>>
>>> Hi,
>>>
>>> I'm faced with the following ProducerFencedException after 1st, 3rd,
>>> 5th, 7th, ... checkpoints:
>>>
>>> --
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>> at 
>

Re: How to make two SQLs use the same KafkaTableSource?

2019-08-09 Thread Tony Wei
Hi Zhenghua,

Blink planner support lazy translation for multiple SQLs, and the common
> nodes will be reused in a single job.
>

It is very helpful, and thanks for your clarification.


> The only thing you need note here is the unified TableEnvironmentImpl do
> not support conversions between Table(s) and Stream(s).
> U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).


Does this mean that only common nodes that generated from pure SQL api can
be reused. Operator nodes
created from DataStream api will not be recognized by Blink planner? If
this is the case, it is fine with me. My
original question just focused on reused nodes in SQL api, and seems Blink
planner is what I need. Thanks
for your help again.

Best,
Tony Wei

Zhenghua Gao  於 2019年8月9日 週五 下午1:54寫道:

> Blink planner support lazy translation for multiple SQLs, and the common
> nodes will be reused in a single job.
> The only thing you need note here is the unified TableEnvironmentImpl do
> not support conversions between Table(s) and Stream(s).
> U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Aug 9, 2019 at 12:38 PM Tony Wei  wrote:
>
>> forgot to send to user mailing list.
>>
>> Tony Wei  於 2019年8月9日 週五 下午12:36寫道:
>>
>>> Hi Zhenghua,
>>>
>>> I didn't get your point. It seems that `isEagerOperationTranslation` is
>>> always return false. Is that
>>> means even I used Blink planner, the sql translation is still in a lazy
>>> manner?
>>>
>>> Or do you mean Blink planner will recognize and link two SQLs to the
>>> same kafka source, if
>>> they both use the same kafka table, even if the translation is lazy?
>>>
>>> I'm not familiar with the details of translation process, but I guessed
>>> the translating eagerly is not
>>> be an only solution. If the translation of the second SQL can reuse the
>>> operators from the first SQL,
>>> then it is possible to link them to the same kafka source operator.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Zhenghua Gao  於 2019年8月9日 週五 上午11:57寫道:
>>>
>>>> This needs EagerOperationTranslation[1]
>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413>
>>>> support. you can try in Blink planner in 1.9.0.
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413
>>>>
>>>> *Best Regards,*
>>>> *Zhenghua Gao*
>>>>
>>>>
>>>> On Fri, Aug 9, 2019 at 10:37 AM Tony Wei 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I used `flinkTableEnv.connect(new
>>>>> Kafka()...).registerTableSource(...)` to register my kafka table.
>>>>> However, I found that because SQL is a lazy operation, it will convert
>>>>> to DataStream under some
>>>>> criteria. For example, `Table#toRetractStream`.
>>>>>
>>>>> So, when I used two SQLs in one application job, the same kafka table
>>>>> will be constructed twice. It
>>>>> is not a problem from flink side, because two operators held their own
>>>>> state for offsets. But from
>>>>> kafka side, they will have the same group_id.
>>>>>
>>>>> I want to make sure that only one kafka source will commit group_id's
>>>>> offsets back to kafka. A
>>>>> workaround might be registering the same kafka topic twice with
>>>>> different name, group_id for
>>>>> two SQLs. But I would still like to know if there is any way to make
>>>>> two SQLs just read from the
>>>>> same KafkaTableSource? Thanks in advance.
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>


Re: How to make two SQLs use the same KafkaTableSource?

2019-08-08 Thread Tony Wei
forgot to send to user mailing list.

Tony Wei  於 2019年8月9日 週五 下午12:36寫道:

> Hi Zhenghua,
>
> I didn't get your point. It seems that `isEagerOperationTranslation` is
> always return false. Is that
> means even I used Blink planner, the sql translation is still in a lazy
> manner?
>
> Or do you mean Blink planner will recognize and link two SQLs to the same
> kafka source, if
> they both use the same kafka table, even if the translation is lazy?
>
> I'm not familiar with the details of translation process, but I guessed
> the translating eagerly is not
> be an only solution. If the translation of the second SQL can reuse the
> operators from the first SQL,
> then it is possible to link them to the same kafka source operator.
>
> Best,
> Tony Wei
>
> Zhenghua Gao  於 2019年8月9日 週五 上午11:57寫道:
>
>> This needs EagerOperationTranslation[1]
>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413>
>> support. you can try in Blink planner in 1.9.0.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Aug 9, 2019 at 10:37 AM Tony Wei  wrote:
>>
>>> Hi,
>>>
>>> I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)`
>>> to register my kafka table.
>>> However, I found that because SQL is a lazy operation, it will convert
>>> to DataStream under some
>>> criteria. For example, `Table#toRetractStream`.
>>>
>>> So, when I used two SQLs in one application job, the same kafka table
>>> will be constructed twice. It
>>> is not a problem from flink side, because two operators held their own
>>> state for offsets. But from
>>> kafka side, they will have the same group_id.
>>>
>>> I want to make sure that only one kafka source will commit group_id's
>>> offsets back to kafka. A
>>> workaround might be registering the same kafka topic twice with
>>> different name, group_id for
>>> two SQLs. But I would still like to know if there is any way to make two
>>> SQLs just read from the
>>> same KafkaTableSource? Thanks in advance.
>>>
>>> Best,
>>> Tony Wei
>>>
>>


如何讓兩個 SQL 使用相同的 KafkaTableSource

2019-08-08 Thread Tony Wei
Hi

我在我的 flink job 中透過 `flinkTableEnv.connect(new
Kafka()...).registerTableSource(...)` 註冊了
一張 kafka table。但從文件上我才知道 SQL 只會在特定的條件下才會真正的轉為 DataStream,比
如說呼叫了Table#toRetractStream`。

因為如此,我發現當我嘗試在同一個 flink job 中使用了不同的 SQL 時,他們會同時產生各自的
kafka source operator。從 flink 的角度來說可能不是什麼大問題,各自獨立的 operator 會各自管理
好自己的 offset state,也不會互相影響。但是從 kafka 方面來看,因為兩邊都是使用相同的
group_id,當 offset 被 commit 回 kafka 時,就會在 kafka 端有衝突。

我想要確保每個 group_id 只會被一個 operator 負責執行 commit 的動作。最簡單的做法可能是故意
為相同的 kafka topic 註冊兩個名稱不同的 table, group_id,分別給兩個 SQL 使用。但我想知道是
不是有更好的做法,可以讓兩個 SQL 是真正的從同一個 kafka operator 讀取資料?這樣也不需要同
時存在兩個做一樣事情的 kafka operator 。先謝謝各位的幫助。

Best,
Tony Wei


How to make two SQLs use the same KafkaTableSource?

2019-08-08 Thread Tony Wei
Hi,

I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` to
register my kafka table.
However, I found that because SQL is a lazy operation, it will convert to
DataStream under some
criteria. For example, `Table#toRetractStream`.

So, when I used two SQLs in one application job, the same kafka table will
be constructed twice. It
is not a problem from flink side, because two operators held their own
state for offsets. But from
kafka side, they will have the same group_id.

I want to make sure that only one kafka source will commit group_id's
offsets back to kafka. A
workaround might be registering the same kafka topic twice with different
name, group_id for
two SQLs. But I would still like to know if there is any way to make two
SQLs just read from the
same KafkaTableSource? Thanks in advance.

Best,
Tony Wei


Re: Is it possible to decide the order of where conditions in Flink SQL

2019-07-28 Thread Tony Wei
Hi Hequn,

Thank you very much. It's helpful to me.

For clarification, I think the code should look like the following snippet,
since original query was an `AND` operator. Am I right?

> CASE
> WHEN user_robot THEN false
> WHEN !UDF_NEED_TO_QUERY_DB(user) THEN false
> ELSE true
> END


Best regards,
Tony Wei


Hequn Cheng  於 2019年7月28日 週日 下午3:30寫道:

> Hi Tony,
>
> There is no order guarantee for filter conditions. The conditions would be
> pushed down or merged during query optimization.
>
> However, you can use the case when[1] to achieve what you want. The code
> looks like:
> CASE
> WHEN !user.is_robot THEN true
> WHEN UDF_NEED_TO_QUERY_DB(user) THEN true
> ELSE false
> END
>
> For case when, it evaluates the conditions in order.
>
> Note: The UDF_NEED_TO_QUERY_DB must be a nonDeterministic udf, or the case
> when would also be optimized and changed to an OR by the query optimizer.
> You can override the isDeterministic method of ScalarFunction to make it
> nonDeterministic, i.e., override def isDeterministic: Boolean = false
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html#conditional-functions
>
> On Sat, Jul 27, 2019 at 4:35 PM Tony Wei  wrote:
>
>> Hi,
>>
>> Thanks for your reply. I have tried both CTE and sql subquery, but it
>> seems that sql plan
>> optimizer will do filter pushdown. Therefore, where conditions will end
>> up being together in
>> physical plan.
>>
>> However, the visualization of physical plans on Flink UI were different
>> for these three SQL
>> query on their operations' name.
>>
>> For the original SQL, it showed:
>>
>>> where: (AND(UDF_NEED_TO_QUERY_DB(user), NOT(user.is_robot))), select:
>>> (...)
>>
>>
>> For the CTE and subquery , it showed:
>>
>>> where: (AND(NOT(user.is_robot), UDF_NEED_TO_QUERY_DB(user))), select:
>>> (...)
>>
>>
>> Does this name for each operator of physical plan have any meaning to
>> represent the
>> execution order of `where` conditions?
>>
>> Best,
>> Tony Wei
>>
>> sri hari kali charan Tummala  於 2019年7月27日 週六
>> 上午3:02寫道:
>>
>>> try cte common table expressions if it supports or sql subquery.
>>>
>>> On Fri, Jul 26, 2019 at 1:00 PM Fanbin Bu 
>>> wrote:
>>>
>>>> how about move query db filter to the outer select.
>>>>
>>>> On Fri, Jul 26, 2019 at 9:31 AM Tony Wei 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If I have multiple where conditions in my SQL, is it possible to
>>>>> specify its order, so that the query
>>>>> can be executed more efficiently?
>>>>>
>>>>> For example, if I have the following SQL, it used a heavy UDF that
>>>>> needs to access database.
>>>>> However, if I can specify the order of conditions is executing
>>>>> `!user.is_robot` first then executing
>>>>> UDF, it will reduce the number of database access. Those records with
>>>>> `true` in `user.is_robot` will
>>>>> be dropped earlier and don't need to access database.
>>>>>
>>>>> select *
>>>>>
>>>>> from users
>>>>>
>>>>> where !user.is_robot and UDF_NEED_TO_QUERY_DB(user)
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Tony Wei
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>


Re: Is it possible to decide the order of where conditions in Flink SQL

2019-07-27 Thread Tony Wei
Hi,

Thanks for your reply. I have tried both CTE and sql subquery, but it seems
that sql plan
optimizer will do filter pushdown. Therefore, where conditions will end up
being together in
physical plan.

However, the visualization of physical plans on Flink UI were different for
these three SQL
query on their operations' name.

For the original SQL, it showed:

> where: (AND(UDF_NEED_TO_QUERY_DB(user), NOT(user.is_robot))), select: (...)


For the CTE and subquery , it showed:

> where: (AND(NOT(user.is_robot), UDF_NEED_TO_QUERY_DB(user))), select: (...)


Does this name for each operator of physical plan have any meaning to
represent the
execution order of `where` conditions?

Best,
Tony Wei

sri hari kali charan Tummala  於 2019年7月27日 週六
上午3:02寫道:

> try cte common table expressions if it supports or sql subquery.
>
> On Fri, Jul 26, 2019 at 1:00 PM Fanbin Bu  wrote:
>
>> how about move query db filter to the outer select.
>>
>> On Fri, Jul 26, 2019 at 9:31 AM Tony Wei  wrote:
>>
>>> Hi,
>>>
>>> If I have multiple where conditions in my SQL, is it possible to specify
>>> its order, so that the query
>>> can be executed more efficiently?
>>>
>>> For example, if I have the following SQL, it used a heavy UDF that needs
>>> to access database.
>>> However, if I can specify the order of conditions is executing
>>> `!user.is_robot` first then executing
>>> UDF, it will reduce the number of database access. Those records with
>>> `true` in `user.is_robot` will
>>> be dropped earlier and don't need to access database.
>>>
>>> select *
>>>
>>> from users
>>>
>>> where !user.is_robot and UDF_NEED_TO_QUERY_DB(user)
>>>
>>>
>>> Thanks,
>>> Tony Wei
>>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


請問在 Flink SQL 上能不能指定 WHERE 裡的判斷式的執行順序?

2019-07-26 Thread Tony Wei
Hi,

想請問是否有辦法在 Flink SQL 上指明 WHERE 裡的判斷式的執行順序,來做到一些特定情況下的
查詢優化?

舉例來說,在下面的 SQL,假如有個很耗時的 UDF 需要每次都去查詢資料庫。在這樣的狀況下,
如果可以確保優先執行 `!user.is_robot` 的判斷,再去執行後面的 UDF 的話,就能減少許多的資料
庫查詢。因為那些在 `user.is_robot` 裡得到 `true` 的數據就可以提早被丟棄,而不用去執行後面
較為花費時間的 UDF 了。

select *

from users

where !user.is_robot and UDF_NEED_TO_QUERY_DB(user)


謝謝大家的幫忙。

Best regards,
Tony Wei


Is it possible to decide the order of where conditions in Flink SQL

2019-07-26 Thread Tony Wei
Hi,

If I have multiple where conditions in my SQL, is it possible to specify
its order, so that the query
can be executed more efficiently?

For example, if I have the following SQL, it used a heavy UDF that needs to
access database.
However, if I can specify the order of conditions is executing
`!user.is_robot` first then executing
UDF, it will reduce the number of database access. Those records with
`true` in `user.is_robot` will
be dropped earlier and don't need to access database.

select *

from users

where !user.is_robot and UDF_NEED_TO_QUERY_DB(user)


Thanks,
Tony Wei


Re: left join failing with FlinkLogicalJoinConverter NPE

2019-07-19 Thread Tony Wei
Hi,

I also found the similar issue here [1].

Best,
Tony Wei
[1] https://issues.apache.org/jira/browse/FLINK-11433

Tony Wei  於 2019年7月19日 週五 下午5:38寫道:

> Hi,
>
> Is there any update for this issue? I have had the same problem just like
> Karl's.
> After I remove query like "select collect(data) ..." from one of the
> joined tables,
> the sql can be executed correctly without throwing any NPE.
>
> Best regards,
> Tony Wei
>
> Xingcan Cui  於 2019年2月27日 週三 下午12:53寫道:
>
>> Hi Karl,
>>
>> I think this is a bug and created FLINK-11769
>> <https://issues.apache.org/jira/browse/FLINK-11769> to track it.
>>
>> Best,
>> Xingcan
>>
>> On Feb 26, 2019, at 2:02 PM, Karl Jin  wrote:
>>
>> I removed the multiset> field and the join worked
>> fine. The field was created from a Kafka source through a query that looks
>> like "select collect(data) as i_data from ... group by pk"
>>
>> Do you think this is a bug or is this something I can get around using
>> some configuration?
>>
>> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui  wrote:
>>
>>> Yes. Please check that. If it's the nested type's problem, this might be
>>> a bug.
>>>
>>> On Mon, Feb 25, 2019, 21:50 Karl Jin  wrote:
>>>
>>>> Do you think something funky might be happening with Map/Multiset
>>>> types? If so how do I deal with it (I think I can verify by removing those
>>>> columns and retry?)?
>>>>
>>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin  wrote:
>>>>
>>>>> Thanks for checking in quickly,
>>>>>
>>>>> Below is what I got on printSchema on the two tables (left joining the
>>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>>>> extracted from the string field uc_update_ts
>>>>>
>>>>> root
>>>>>  |-- uc_pk: String
>>>>>  |-- uc_update_ts: String
>>>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>>>  |-- uc_version: String
>>>>>  |-- uc_type: String
>>>>>  |-- data_parsed: Map
>>>>>
>>>>> root
>>>>>  |-- i_uc_pk: String
>>>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>>>  |-- image_count: Long
>>>>>  |-- i_data: Multiset>
>>>>>
>>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui 
>>>>> wrote:
>>>>>
>>>>>> Hi Karl,
>>>>>>
>>>>>> It seems that some field types of your inputs were not properly
>>>>>> extracted.
>>>>>> Could you share the result of `printSchema()` for your input tables?
>>>>>>
>>>>>> Best,
>>>>>> Xingcan
>>>>>>
>>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin  wrote:
>>>>>> >
>>>>>> > Hello,
>>>>>> >
>>>>>> > First time posting, so please let me know if the formatting isn't
>>>>>> correct, etc.
>>>>>> >
>>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally,
>>>>>> but getting the below exception. Looks like some sort of query 
>>>>>> optimization
>>>>>> process but I'm not sure where to start investigating/debugging. I see
>>>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>>>> >
>>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>>>> applying rule FlinkLogicalJoinConverter, args
>>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>>>> $6),joinType=left)]
>>>>>> >   at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>>>> >   at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>>>> >   at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>>>> >   at
>>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>&

Re: left join failing with FlinkLogicalJoinConverter NPE

2019-07-19 Thread Tony Wei
Hi,

Is there any update for this issue? I have had the same problem just like
Karl's.
After I remove query like "select collect(data) ..." from one of the joined
tables,
the sql can be executed correctly without throwing any NPE.

Best regards,
Tony Wei

Xingcan Cui  於 2019年2月27日 週三 下午12:53寫道:

> Hi Karl,
>
> I think this is a bug and created FLINK-11769
> <https://issues.apache.org/jira/browse/FLINK-11769> to track it.
>
> Best,
> Xingcan
>
> On Feb 26, 2019, at 2:02 PM, Karl Jin  wrote:
>
> I removed the multiset> field and the join worked fine.
> The field was created from a Kafka source through a query that looks like
> "select collect(data) as i_data from ... group by pk"
>
> Do you think this is a bug or is this something I can get around using
> some configuration?
>
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui  wrote:
>
>> Yes. Please check that. If it's the nested type's problem, this might be
>> a bug.
>>
>> On Mon, Feb 25, 2019, 21:50 Karl Jin  wrote:
>>
>>> Do you think something funky might be happening with Map/Multiset types?
>>> If so how do I deal with it (I think I can verify by removing those columns
>>> and retry?)?
>>>
>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin  wrote:
>>>
>>>> Thanks for checking in quickly,
>>>>
>>>> Below is what I got on printSchema on the two tables (left joining the
>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>>> extracted from the string field uc_update_ts
>>>>
>>>> root
>>>>  |-- uc_pk: String
>>>>  |-- uc_update_ts: String
>>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- uc_version: String
>>>>  |-- uc_type: String
>>>>  |-- data_parsed: Map
>>>>
>>>> root
>>>>  |-- i_uc_pk: String
>>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- image_count: Long
>>>>  |-- i_data: Multiset>
>>>>
>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui  wrote:
>>>>
>>>>> Hi Karl,
>>>>>
>>>>> It seems that some field types of your inputs were not properly
>>>>> extracted.
>>>>> Could you share the result of `printSchema()` for your input tables?
>>>>>
>>>>> Best,
>>>>> Xingcan
>>>>>
>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin  wrote:
>>>>> >
>>>>> > Hello,
>>>>> >
>>>>> > First time posting, so please let me know if the formatting isn't
>>>>> correct, etc.
>>>>> >
>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally,
>>>>> but getting the below exception. Looks like some sort of query 
>>>>> optimization
>>>>> process but I'm not sure where to start investigating/debugging. I see
>>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>>> >
>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>>> applying rule FlinkLogicalJoinConverter, args
>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>>> $6),joinType=left)]
>>>>> >   at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>>> >   at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>>> >   at
>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>>> >   at
>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>>>> >   at
>>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>>>> >   at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>>>> >   at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>>>> >   at
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableE

Re: 關於如何在流數據上計算 Top K 的應用問題

2019-07-11 Thread Tony Wei
Hi Caizhi,

謝謝你的回答。你的第三點想法給了我蠻大的啟發,我本來設想的情況是能否避免把全部使用者
資料都存放在 state 來解決這個問題,但聽起來這部分是避免不了的。如果我沒有理解錯,你的
作法比較像是將全部使用者的排名資訊都存放在 state,在使用了 rocksdb state backend 的狀況
下,這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入
Top-N 的資料會被讀取出來存放在一個 in-memory 的堆中做為加速運算的優化。

在我們目前的應用場景中,精確排名不是必要的資訊,可能還有一些不是硬性的需求來鬆綁這個
問題的限制,雖然沒有很有把握但或許可以根據你的想法實現一個專門針對我們應用情境的優化。

撇開上述特殊的情況,我另外好奇的是第一點中維護的 map state 要記錄精確的排名這件事的細
節,想知道如果更新是循序變化的,如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要
加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下
游?

Best Regards,
Tony Wei

Caizhi Weng  於 2019年7月12日 週五 上午11:36寫道:

> Hi Tony!
>
> 其实 Flink 对 Top-N 问题并没有很 fancy 的实现... Flink 把 Top-N 问题分成三种情况:
>
> 1. 数据只添加,不更新不删除(就像 batch mode)
> 这种情况的实现是 AppendOnlyTopNFunction,就像你说的一样使用一个 Map
> 来维护。不能直接使用堆来维护的原因是:因为要告知下游每一条记录的精确排名。
>
> 2. 数据可能有添加和更新
> 这种情况的实现是 UpdatableTopNFunction,但是这个类开头的注释表明了它只能用于以下特殊情况:
> * 数据更新后排名只能变小不能变大;
> * 数据的 sort key 要 unique;
> * 不能删数据或者撤回数据。
> 这种情况就避免了你上面说的排名变大,导致掉出 Top-N 的情况。还是可以用一个 Map 来维护。
>
> 3. 数据可以添加、更新和删除
> 这种情况的实现是 RetractableTopNFunction。因为数据更新 / 删除后可能会掉出 Top-N,要找新数据补进来,那么只能从
> state 里捞应该补进来的数据。当前由于社区没有 SortedMapState 的实现,现在是用 ValueState> 存
> state。每次读 state 都是把整个 state 拿出来读的,所以数据量大了其实没办法用... 等社区引入了 SortedMapState
> 以后,就可以用 iterator 只读取前面一些我们想要补进来的数据。
>
> Tony Wei  于2019年7月11日周四 上午9:49写道:
>
> > Hi,
> >
> > 最近正在研究 Top K 的問題,在研究中找到了 Blink SQL 可以透過維護一個儲存 K 的最大紀錄的
> > ”堆”來優化底下這類 SQL,不過我認為這只能針對 `score` 只會增加不減少的情況。
> >
> > > SELECT user_id, score
> > > FROM (
> > >   SELECT *,
> > > ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > >   FROM user_scores)
> > > WHERE row_num <= 3
> > >
> > >
> > 我的問題是當如果這樣的計算是應用在流數據上,且 `score` 可能隨時間增加或是“減少”的話,例
> > 如底下這類的 SQL,能有什麼樣的優化?
> >
> > > SELECT user_id, score
> > > FROM (
> > >   SELECT *,
> > > ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
> > >   FROM (
> > >   SELECT user_id, LAST_VAL(score) AS score
> > >   FROM user_scores
> > >   GROUP BY user_id))
> > > WHERE row_num <= 3
> > >
> > > SQL 中的 `user_scores` 可以當作是從 DataStream 直接轉換過來的 Dynamic Table,
> > `LAST_VAL`假設是一種 UDAF,可以挑出目前最新的值。所以,可以想像這張 table 的 user's
> > `score` 是會隨時間變化增減。
> >
> > 上面所說堆的優化無法處理這樣的問題,底下舉個例子。假設今天有一個 top-3 的堆中已經存放
> > 了三個使用者:A, B, C,各自的 scores 是:4, 3, 2,接下來收到了一個使用者 D 和他的分數是
> > 1 的話,這個時候演算法會直接忽略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
> > 的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
> > D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
> > 問題的,因為最新的 score 在有限的數據中會是固定的不動的。
> >
> > 不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
> > 法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
> > 理這樣的問題,讓狀態不需要存到全部資料?當然這個問題不侷限在 SQL,如果有任何實作在
> > DataStream 上的優化都是可接受。感謝大家幫忙。
> >
> > Best Regards,
> > Tony Wei
> >
>


Question about counting top k on streaming data

2019-07-10 Thread Tony Wei
Hi,

I know that there is an improvement in Blink SQL that can deal with the top
k problem like SQL
showed below by maintaining an in-memory "heap" to store top k records.
That is not a problem
when user's score will only grow up.

> SELECT user_id, score
> FROM (
>   SELECT *,
> ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
>   FROM user_scores)
> WHERE row_num <= 3
>
>
My question is how to deal with such top k problem when user's score will
decrease as well.
Suppose there is a SQL like this.

> SELECT user_id, score
> FROM (
>   SELECT *,
> ROW_NUMBER() OVER (ORDER BY score DESC) AS row_num
>   FROM (
>   SELECT user_id, LAST_VAL(score) AS score
>   FROM user_scores
>   GROUP BY user_id))
> WHERE row_num <= 3
>
> `user_scores` is a dynamic table converted from a DataStream, `LAST_VAL`
is a UDAF to get
the latest value. So, the user's score will increase but also decrease from
time to time.

So, if we only maintain a heap to store top k elements, there will be a
problem. For example, if
there is already three users: A, B, C with score: 4, 3, 2 stored in a top-3
heap. If the next record
is a D user with score 1, it will be dropped due to the score is less than
A, B and C. However, if
the next record comes after that with an updated score 0 for user A. In
reality, we know that
top-3 users will become B, C and D, but it is no chance to get user D back
if using heap in this
case. Using heap works fine if it is running on batch mode because the
users' score won't
change from time to time.

In this case, I think it should fall back to store all users and their
scores. Update top-k every time
when receive a new record. If the heap optimization won't work here in
streaming mode, is there
any other optimization can apply in this case? It is not necessary to focus
on SQL only. Any
improvement on DataStream is also welcome. Thank you.

Best Regards,
Tony Wei


Re: What does "Continuous incremental cleanup" mean in Flink 1.8 release notes

2019-03-10 Thread Tony Wei
Hi Konstantin,

That is really helpful. Thanks.

Another follow-up question: The document said "Cleanup in full snapshot" is
not applicable for
the incremental checkpointing in the RocksDB state backend. However, when
user manually
trigger a savepoint and restart job from it, the expired states should be
clean up as well based
on Flink 1.6's implementation. Am I right?

Best,
Tony Wei

Konstantin Knauf  於 2019年3月9日 週六 上午7:00寫道:

> Hi Tony,
>
> before Flink 1.8 expired state is only cleaned up, when you try to access
> it after expiration, i.e. when user code tries to access the expired state,
> the state value is cleaned and "null" is returned. There was also already
> the option to clean up expired state during full snapshots (
> https://github.com/apache/flink/pull/6460). With Flink 1.8 expired state
> is cleaned up continuously in the background regardless of checkpointing or
> any attempt to access it after expiration.
>
> As a reference the linked JIRA tickets should be a good starting point.
>
> Hope this helps.
>
> Konstantin
>
>
>
>
> On Fri, Mar 8, 2019 at 10:45 AM Tony Wei  wrote:
>
>> Hi everyone,
>>
>> I read the Flink 1.8 release notes about state [1], and it said
>>
>> *Continuous incremental cleanup of old Keyed State with TTL*
>>> We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (
>>> FLINK-9510 <https://issues.apache.org/jira/browse/FLINK-9510>). This
>>> feature allowed to clean up and make inaccessible keyed state entries when
>>> accessing them. In addition state would now also being cleaned up when
>>> writing a savepoint/checkpoint.
>>> Flink 1.8 introduces continous cleanup of old entries for both the
>>> RocksDB state backend (FLINK-10471
>>> <https://issues.apache.org/jira/browse/FLINK-10471>) and the heap state
>>> backend (FLINK-10473 <https://issues.apache.org/jira/browse/FLINK-10473>).
>>> This means that old entries (according to the ttl setting) are continously
>>> being cleanup up.
>>
>>
>> I'm not familiar with TTL's implementation in Flink 1.6 and what new
>> features introduced in Flink
>> 1.8. I don't understand what difference between these two release version
>> after reading the
>> release notes. Did they change the outcome of TTL feature, or provide new
>> TTL features, or just
>> change the behavior of executing TTL mechanism.
>>
>> Could you give me more references to learn about it? A simple example
>> to illustrate it is more
>> appreciated. Thank you.
>>
>> Best,
>> Tony Wei
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.8.html#state
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


What does "Continuous incremental cleanup" mean in Flink 1.8 release notes

2019-03-08 Thread Tony Wei
Hi everyone,

I read the Flink 1.8 release notes about state [1], and it said

*Continuous incremental cleanup of old Keyed State with TTL*
> We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510
> <https://issues.apache.org/jira/browse/FLINK-9510>). This feature allowed
> to clean up and make inaccessible keyed state entries when accessing them.
> In addition state would now also being cleaned up when writing a
> savepoint/checkpoint.
> Flink 1.8 introduces continous cleanup of old entries for both the RocksDB
> state backend (FLINK-10471
> <https://issues.apache.org/jira/browse/FLINK-10471>) and the heap state
> backend (FLINK-10473 <https://issues.apache.org/jira/browse/FLINK-10473>).
> This means that old entries (according to the ttl setting) are continously
> being cleanup up.


I'm not familiar with TTL's implementation in Flink 1.6 and what new
features introduced in Flink
1.8. I don't understand what difference between these two release version
after reading the
release notes. Did they change the outcome of TTL feature, or provide new
TTL features, or just
change the behavior of executing TTL mechanism.

Could you give me more references to learn about it? A simple example
to illustrate it is more
appreciated. Thank you.

Best,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.8.html#state


Re: how to override s3 key config in flink job

2018-11-29 Thread Tony Wei
Hi Andrey,

Thanks for your detailed answer, and I have created a JIRA issue to discuss
it [1].
Please check the description and help me to fill the details, like
component/s, since
I'm not sure where it should be put. Thank you very much.

Best,
Tony Wei

[1] https://issues.apache.org/jira/browse/FLINK-11034


Andrey Zagrebin  於 2018年11月27日 週二 下午10:43寫道:

> Hi Tony,
>
> File system factories are class-loaded in running JVMs of task executors.
> That is why their configured objects are shared by different Flink jobs.
> It is not possible to change their options per created file system and per
> job at the moment.
>
> This could be changed, e.g. for s3, by providing "rewriting config” to
> file system factory “get" method,
> but this method is not usually called by users directly in user facing
> components, like checkpointing or file sink. The user API is now mainly the
> file system URI string without any specific config.
>
> I see that making it possible has value but it would require some
> involving changes in file system dependent APIs or changing the way how
> file systems are created in general.
> You could create a JIRA issue to discuss it.
>
> Best,
> Andrey
>
> > On 27 Nov 2018, at 10:06, yinhua.dai  wrote:
> >
> > It might be difficult as you the task manager and job manager are
> pre-started
> > in a session mode.
> >
> > It seems that flink http server will always use the configuration that
> you
> > specified when you start your flink cluster, i.e. start-cluster.sh, I
> don't
> > find a way to override it.
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: how to override s3 key config in flink job

2018-11-26 Thread Tony Wei
Hi yinhua,

Our flink version is 1.6.0.

Best,
Tony Wei

yinhua.dai  於 2018年11月27日 週二 下午2:32寫道:

> Which flink version are you using.
> I know how it works in yarn, but not very clear with standalone mode.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: how to override s3 key config in flink job

2018-11-26 Thread Tony Wei
Hi yinhua,

I didn't try this yet, but I didn't see this option in both flink cli tool
and rest api either.
Could you please provide more details about how to use this option to
submit flink
application?

BTW, we are using standalone session cluster, not yarn session cluster. And
I need
to submit different flink applications with different s3 key for flink
presto s3 filesystem.

Any other suggestions are also welcome. Thank you.

Best,
Tony Wei

yinhua.dai  於 2018年11月27日 週二 上午11:37寫道:

> Did you try "-Dkey=value"?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Reset kafka offets to latest on restart

2018-11-25 Thread Tony Wei
Hi Vishal,

Sorry, I didn't notice this requirement, but I can't come up with another
solution, unless disable checkpointing or customize your own
kafka source function. For the first case, you may have to give up storing
states in flink's state backend. For the second one, write
your own implementation with kafka client and always seek to the latest
position when the job begin to run.

Best,
Tony Wei

Vishal Santoshi  於 2018年11月25日 週日 上午4:51寫道:

> I think I can set . a new uuid but it seems `allowNonRestoreState` is a
> CLI hint. I need the "automatic" restart on failure to use the new uuid.
> Our use case has no use of data on Kafka that is not current.
>
> On Thu, Nov 22, 2018 at 11:16 PM Tony Wei  wrote:
>
>> Hi Vishal,
>>
>> AFAIK, the current behavior of kafka source will always use checkpoint
>> state as the start position, ignoring other configuration.
>> A workaround solution I can come up with is to set a new uuid to your
>> kafka source and restore your job with `allowNonRestoreState`.
>> Therefore, you can use the way that Rong provided to set the initial
>> start position.
>>
>> cc. Gordon who know more about the details of kafka source.
>>
>> Best,
>> Tony Wei
>>
>> Rong Rong  於 2018年11月22日 週四 上午8:23寫道:
>>
>>> Hi Vishal,
>>>
>>> You can probably try using similar offset configuration as a service
>>> consumer.
>>> Maybe this will be useful to look at [1]
>>>
>>> Thanks,
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> On Wed, Nov 21, 2018 at 1:32 PM Jamie Grier  wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> No, there is no way to do this currently.
>>>>
>>>>
>>>> On Wed, Nov 21, 2018 at 10:22 AM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Any one ?
>>>>>
>>>>> On Tue, Nov 20, 2018 at 12:48 PM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Is it possible to have checkpointing but reset the kafka offsets to
>>>>>> latest on restart on failure ?
>>>>>>
>>>>>


Re: how to override s3 key config in flink job

2018-11-23 Thread Tony Wei
Hi,

Is there anyone can answer me?

Thanks,
Tony Wei

Tony Wei  於 2018年11月20日 週二 下午7:39寫道:

> Hi,
>
> Is there any way to provide s3.access-key and s3.secret-key in flink
> application, instead of setting
> them in flink-conf.yaml?
>
> In our use case, my team provide a flink standalone cluster to users.
> However, we don't want to let
> each user use the same s3 bucket as filesystem to store checkpoints. So,
> we want to know if is it
> feasible to let users provide their checkpoint path and corresponding aws
> key to access their own
> s3 bucket?
>
> If not, could you show me why it doesn't work currently? And, is it
> possible to become a new
> feature?
>
> Thanks in advance for your help.
>
> Best,
> Tony Wei
>


Re: Reset kafka offets to latest on restart

2018-11-22 Thread Tony Wei
Hi Vishal,

AFAIK, the current behavior of kafka source will always use checkpoint
state as the start position, ignoring other configuration.
A workaround solution I can come up with is to set a new uuid to your kafka
source and restore your job with `allowNonRestoreState`.
Therefore, you can use the way that Rong provided to set the initial start
position.

cc. Gordon who know more about the details of kafka source.

Best,
Tony Wei

Rong Rong  於 2018年11月22日 週四 上午8:23寫道:

> Hi Vishal,
>
> You can probably try using similar offset configuration as a service
> consumer.
> Maybe this will be useful to look at [1]
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> On Wed, Nov 21, 2018 at 1:32 PM Jamie Grier  wrote:
>
>> Hi Vishal,
>>
>> No, there is no way to do this currently.
>>
>>
>> On Wed, Nov 21, 2018 at 10:22 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Any one ?
>>>
>>> On Tue, Nov 20, 2018 at 12:48 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Is it possible to have checkpointing but reset the kafka offsets to
>>>> latest on restart on failure ?
>>>>
>>>


how to override s3 key config in flink job

2018-11-20 Thread Tony Wei
Hi,

Is there any way to provide s3.access-key and s3.secret-key in flink
application, instead of setting
them in flink-conf.yaml?

In our use case, my team provide a flink standalone cluster to users.
However, we don't want to let
each user use the same s3 bucket as filesystem to store checkpoints. So, we
want to know if is it
feasible to let users provide their checkpoint path and corresponding aws
key to access their own
s3 bucket?

If not, could you show me why it doesn't work currently? And, is it
possible to become a new
feature?

Thanks in advance for your help.

Best,
Tony Wei


sys.exist(1) led to standalonesession daemon closed

2018-11-04 Thread Tony Wei
Hi,

I used a scala library called scallop[1] to parse my job’s arguments. When
the argument didn’t
exist in the config setting, the default behavior of scallop would call
sys.exit(1).

It is not a problem when I’m using flink cli to submit job. However, when I
used rest api to submit
job, it seems that sys.exit(1) will leads to standalonesession daemon
closed. Maybe the reason is
that rest server is also in the same process as standalonesession daemon.
Am I correct?

If this is the root cause, is this an expected behavior and users should be
aware of not using
sys.exit(1) in their jobs?

I tested this on 1.6.0 standalone session cluster with flip-6 mode. And
here are my testing job
and logs before and after the submission.

package com.appier.rt.rt_match
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.rogach.scallop.{ScallopConf, ScallopOption}
> object TestMain {
>   def main(args: Array[String]): Unit = {
> object Args extends ScallopConf(args) {
>   val mode: ScallopOption[String] = opt[String](default =
> Some("development"))
>   verify
> }
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.fromElements(Args.mode()).map(a => a)
> env.execute()
>   }
> }


Submit by flink-cli

> $ ./bin/flink run -c com.appier.rt.rt_match.TestMain -p 2 -d
> rt-match-assembly-4.5.1-SNAPSHOT.jar --mo xyz
> Starting execution of program
> [scallop] Error: Unknown option 'mo'


Submit by rest-api

> 2018-11-05 13:27:58,800 TRACE
> org.apache.flink.runtime.webmonitor.handlers.JarListHandler   - Received
> request /jars/.
> 2018-11-05 13:27:59,679 TRACE
> org.apache.flink.runtime.rest.FileUploadHandler   - Received
> request. URL:/jobs/overview Method:GET
> 2018-11-05 13:27:59,680 TRACE
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler  - Received
> request /jobs/overview.
> 2018-11-05 13:28:01,752 TRACE
> org.apache.flink.runtime.rest.FileUploadHandler   - Received
> request. URL:/jars/ Method:GET
> 2018-11-05 13:28:01,753 TRACE
> org.apache.flink.runtime.webmonitor.handlers.JarListHandler   - Received
> request /jars/.
> 2018-11-05 13:28:02,682 TRACE
> org.apache.flink.runtime.rest.FileUploadHandler   - Received
> request. URL:/jobs/overview Method:GET
> 2018-11-05 13:28:02,683 TRACE
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler  - Received
> request /jobs/overview.
> 2018-11-05 13:28:03,899 TRACE
> org.apache.flink.runtime.rest.FileUploadHandler   - Received
> request.
> URL:/jars/7413f82a-d650-4729-873e-a94150ffe9d0_rt-match-assembly-4.5.1-SNAPSHOT.jar/run?entry
> class=com.appier.rt.rt_match.TestMain=2=--mo+xyz
> Method:POST
> 2018-11-05 13:28:03,902 TRACE
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Received
> request
> /jars/7413f82a-d650-4729-873e-a94150ffe9d0_rt-match-assembly-4.5.1-SNAPSHOT.jar/run?entry
> class=com.appier.rt.rt_match.TestMain=2=--mo+xyz.
> 2018-11-05 13:28:04,751 TRACE
> org.apache.flink.runtime.rest.FileUploadHandler   - Received
> request. URL:/jars/ Method:GET
> 2018-11-05 13:28:04,752 TRACE
> org.apache.flink.runtime.webmonitor.handlers.JarListHandler   - Received
> request /jars/.
> 2018-11-05 13:28:04,760 INFO
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
> down BLOB cache
> 2018-11-05 13:28:04,761 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:42075


Best,
Tony Wei.

[1] https://github.com/scallop/scallop


Re: How to get Cluster metrics in FLIP-6 mode

2018-09-11 Thread Tony Wei
Hi Gary,

Thanks for your information.

Best,
Tony Wei

2018-09-11 20:26 GMT+08:00 Gary Yao :

> Hi Tony,
>
> You are right that these metrics are missing. There is already a ticket for
> that [1]. At the moment you can obtain these information from the REST API
> (/overview) [2].
>
> Since FLIP-6, the JM is no longer responsible for these metrics but for
> backwards compatibility we can leave them in the JM scope for now.
>
> Best,
> Gary
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10135
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/monitoring/rest_api.html#available-requests
>
> On Tue, Sep 11, 2018 at 12:19 PM, Tony Wei  wrote:
>
>> Hi,
>>
>> I found that these metrics[1] disappeared in my JM's prometheus reporter
>> when I used FLIP-6 to
>>  deploy standalone cluster. (flink 1.5.3 release)
>>
>> Cluster
>> ScopeMetricsDescriptionType
>> *JobManager* numRegisteredTaskManagers The number of registered
>> taskmanagers. Gauge
>> numRunningJobs The number of running jobs. Gauge
>> taskSlotsAvailable The number of available task slots. Gauge
>> taskSlotsTotal The total number of task slots. GaugeI guessed maybe JM
>> is no longer responsible to these metrics, but I still need these metrics
>> on my
>> dashboard. Do anyone know how to let my metric reporter get these
>> metrics? Or did I miss something?
>> Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.6/monitoring/metrics.html#cluster
>>
>>
>


How to get Cluster metrics in FLIP-6 mode

2018-09-11 Thread Tony Wei
Hi,

I found that these metrics[1] disappeared in my JM's prometheus reporter
when I used FLIP-6 to
 deploy standalone cluster. (flink 1.5.3 release)

Cluster
ScopeMetricsDescriptionType
*JobManager* numRegisteredTaskManagers The number of registered
taskmanagers. Gauge
numRunningJobs The number of running jobs. Gauge
taskSlotsAvailable The number of available task slots. Gauge
taskSlotsTotal The total number of task slots. GaugeI guessed maybe JM is
no longer responsible to these metrics, but I still need these metrics on my
dashboard. Do anyone know how to let my metric reporter get these metrics?
Or did I miss something?
Thank you.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#cluster


Question about akka configuration for FLIP-6

2018-09-09 Thread Tony Wei
Hi,

I'm going to migrate my flink cluster from 1.4.0 to 1.5.3, and I have been
trying to map config file
to the latest version. I used to use these three configuration. Are they
still needed in FLIP-6 mode?
Moreover, is any akka config still needed in FLIP-6 mode? Since I had a
impression that FLIP-6
tried to get rid of akka and use its own rpc interface. Please correct me
if I misunderstood. Thanks.

akka.watch.heartbeat.interval
akka.watch.heartbeat.pause
taskmanager.exit-on-fatal-akka-error

Best Regards,
Tony Wei


Re: checkpoint failed due to s3 exception: request timeout

2018-08-29 Thread Tony Wei
Hi Andrey,

Cool! I will add it in my flink-conf.yaml. However, I'm still wondering if
anyone is familiar with this
problem or has any idea to find the root cause. Thanks.

Best,
Tony Wei

2018-08-29 16:20 GMT+08:00 Andrey Zagrebin :

> Hi,
>
> the current Flink 1.6.0 version uses Presto Hive s3 connector 0.185 [1],
> which has this option:
> S3_MAX_CLIENT_RETRIES = "presto.s3.max-client-retries”;
>
> If you add “s3.max-client-retries” to flink conf, flink-s3-fs-presto [2]
> should automatically prefix it and configure PrestoS3FileSystem correctly.
>
> Cheers,
> Andrey
>
> [1] https://github.com/prestodb/presto/blob/0.185/
> presto-hive/src/main/java/com/facebook/presto/hive/PrestoS3FileSystem.java
> [2] https://ci.apache.org/projects/flink/flink-docs-
> stable/ops/deployment/aws.html#shaded-hadooppresto-s3-
> file-systems-recommended
>
>
> On 29 Aug 2018, at 08:49, vino yang  wrote:
>
> Hi Tony,
>
> Maybe you can consider looking at the doc information for this class, this
> class comes from flink-s3-fs-presto.[1]
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/api/java/org/apache/hadoop/conf/Configuration.html
>
> Thanks, vino.
>
> Tony Wei  于2018年8月29日周三 下午2:18写道:
>
>> Hi Vino,
>>
>> I thought this config is for aws s3 client, but this client is inner
>> flink-s3-fs-presto.
>> So, I guessed I should find a way to pass this config to this library.
>>
>> Best,
>> Tony Wei
>>
>> 2018-08-29 14:13 GMT+08:00 vino yang :
>>
>>> Hi Tony,
>>>
>>> Sorry, I just saw the timeout, I thought they were similar because they
>>> both happened on aws s3.
>>> Regarding this setting, isn't "s3.max-client-retries: xxx" set for the
>>> client?
>>>
>>> Thanks, vino.
>>>
>>> Tony Wei  于2018年8月29日周三 下午1:17写道:
>>>
>>>> Hi Vino,
>>>>
>>>> Thanks for your quick reply, but I think these two questions are
>>>> different. The checkpoint in that question
>>>> finally finished, but my checkpoint failed due to s3 client timeout.
>>>> You can see from my screenshot that
>>>> showed the checkpoint failed in a short time.
>>>>
>>>> According to configuration, do you mean pass the configuration as
>>>> program's input arguments? I don't
>>>> think it will work. At least I need to find a way to pass it to s3
>>>> filesystem builder in my program. However,
>>>> I will ask for help to pass it by flink-conf.yaml, because I used that
>>>> to config the global setting for s3
>>>> filesystem and I thought it might have a simple way to support this
>>>> setting like other s3.xxx config.
>>>>
>>>> Very much appreciate for your answer and help.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2018-08-29 11:51 GMT+08:00 vino yang :
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> A while ago, I have answered a similar question.[1]
>>>>>
>>>>> You can try to increase this value appropriately. You can't put this
>>>>> configuration in flink-conf.yaml, you can put it in the submit command of
>>>>> the job[2], or in the configuration file you specify.
>>>>>
>>>>> [1]: http://apache-flink-user-mailing-list-archive.2336050.
>>>>> n4.nabble.com/Why-checkpoint-took-so-long-td22364.html#a22375
>>>>> [2]: https://ci.apache.org/projects/flink/flink-docs-
>>>>> release-1.6/ops/cli.html
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> Tony Wei  于2018年8月29日周三 上午11:36写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I met checkpoint failure problem that cause by s3 exception.
>>>>>>
>>>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>>>>> Your socket connection to the server was not read from or written to 
>>>>>>> within
>>>>>>> the timeout period. Idle connections will be closed. (Service: Amazon 
>>>>>>> S3;
>>>>>>> Status Code: 400; Error Code: RequestTimeout; Request ID:
>>>>>>> B8BE8978D3EFF3F5), S3 Extended Request ID: ePKce/
>>>>>>> MjMFPPNYi90rGdYmDw3blfvi0xR2CcJpCISEgxM92/
>>>>>>> 6JZAU4whpfXeV6SfG62cnts0NBw=
>>>>>>
>>>>>>
>>>>>

Re: checkpoint failed due to s3 exception: request timeout

2018-08-29 Thread Tony Wei
Hi Vino,

I thought this config is for aws s3 client, but this client is inner
flink-s3-fs-presto.
So, I guessed I should find a way to pass this config to this library.

Best,
Tony Wei

2018-08-29 14:13 GMT+08:00 vino yang :

> Hi Tony,
>
> Sorry, I just saw the timeout, I thought they were similar because they
> both happened on aws s3.
> Regarding this setting, isn't "s3.max-client-retries: xxx" set for the
> client?
>
> Thanks, vino.
>
> Tony Wei  于2018年8月29日周三 下午1:17写道:
>
>> Hi Vino,
>>
>> Thanks for your quick reply, but I think these two questions are
>> different. The checkpoint in that question
>> finally finished, but my checkpoint failed due to s3 client timeout. You
>> can see from my screenshot that
>> showed the checkpoint failed in a short time.
>>
>> According to configuration, do you mean pass the configuration as
>> program's input arguments? I don't
>> think it will work. At least I need to find a way to pass it to s3
>> filesystem builder in my program. However,
>> I will ask for help to pass it by flink-conf.yaml, because I used that to
>> config the global setting for s3
>> filesystem and I thought it might have a simple way to support this
>> setting like other s3.xxx config.
>>
>> Very much appreciate for your answer and help.
>>
>> Best,
>> Tony Wei
>>
>> 2018-08-29 11:51 GMT+08:00 vino yang :
>>
>>> Hi Tony,
>>>
>>> A while ago, I have answered a similar question.[1]
>>>
>>> You can try to increase this value appropriately. You can't put this
>>> configuration in flink-conf.yaml, you can put it in the submit command of
>>> the job[2], or in the configuration file you specify.
>>>
>>> [1]: http://apache-flink-user-mailing-list-archive.2336050.
>>> n4.nabble.com/Why-checkpoint-took-so-long-td22364.html#a22375
>>> [2]: https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.6/ops/cli.html
>>>
>>> Thanks, vino.
>>>
>>> Tony Wei  于2018年8月29日周三 上午11:36写道:
>>>
>>>> Hi,
>>>>
>>>> I met checkpoint failure problem that cause by s3 exception.
>>>>
>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>>> Your socket connection to the server was not read from or written to 
>>>>> within
>>>>> the timeout period. Idle connections will be closed. (Service: Amazon S3;
>>>>> Status Code: 400; Error Code: RequestTimeout; Request ID:
>>>>> B8BE8978D3EFF3F5), S3 Extended Request ID: ePKce/
>>>>> MjMFPPNYi90rGdYmDw3blfvi0xR2CcJpCISEgxM92/6JZAU4whpfXeV6SfG62cnts0NBw=
>>>>
>>>>
>>>> The full stack trace and screenshot is provided in the attachment.
>>>>
>>>> My setting for flink cluster and job:
>>>>
>>>>- flink version 1.4.0
>>>>- standalone mode
>>>>- 4 slots for each TM
>>>>- presto s3 filesystem
>>>>- rocksdb statebackend
>>>>- local ssd
>>>>- enable incremental checkpoint
>>>>
>>>> No weird message beside the exception in the log file. No high ratio of
>>>> GC during the checkpoint
>>>> procedure. And still 3 of 4 parts uploaded successfully on that TM. I
>>>> didn't find something that
>>>> would related to this failure. Did anyone meet this problem before?
>>>>
>>>> Besides, I also found an issue in other aws sdk[1] that mentioned this
>>>> s3 exception as well. One
>>>> reply said you can passively avoid the problem by raising the max
>>>> client retires config. So I found
>>>> that config in presto[2]. Can I just add s3.max-client-retries: xxx in
>>>> flink-conf.yaml to config
>>>> it? If not, how should I do to overwrite the default value of this
>>>> configuration? Thanks in advance.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> [1] https://github.com/aws/aws-sdk-php/issues/885
>>>> [2] https://github.com/prestodb/presto/blob/master/
>>>> presto-hive/src/main/java/com/facebook/presto/hive/s3/
>>>> HiveS3Config.java#L218
>>>>
>>>
>>


Re: checkpoint failed due to s3 exception: request timeout

2018-08-28 Thread Tony Wei
Hi Vino,

Thanks for your quick reply, but I think these two questions are different.
The checkpoint in that question
finally finished, but my checkpoint failed due to s3 client timeout. You
can see from my screenshot that
showed the checkpoint failed in a short time.

According to configuration, do you mean pass the configuration as program's
input arguments? I don't
think it will work. At least I need to find a way to pass it to s3
filesystem builder in my program. However,
I will ask for help to pass it by flink-conf.yaml, because I used that to
config the global setting for s3
filesystem and I thought it might have a simple way to support this setting
like other s3.xxx config.

Very much appreciate for your answer and help.

Best,
Tony Wei

2018-08-29 11:51 GMT+08:00 vino yang :

> Hi Tony,
>
> A while ago, I have answered a similar question.[1]
>
> You can try to increase this value appropriately. You can't put this
> configuration in flink-conf.yaml, you can put it in the submit command of
> the job[2], or in the configuration file you specify.
>
> [1]: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Why-checkpoint-took-so-long-td22364.html#a22375
> [2]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/ops/cli.html
>
> Thanks, vino.
>
> Tony Wei  于2018年8月29日周三 上午11:36写道:
>
>> Hi,
>>
>> I met checkpoint failure problem that cause by s3 exception.
>>
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>> Your socket connection to the server was not read from or written to within
>>> the timeout period. Idle connections will be closed. (Service: Amazon S3;
>>> Status Code: 400; Error Code: RequestTimeout; Request ID:
>>> B8BE8978D3EFF3F5), S3 Extended Request ID: ePKce/
>>> MjMFPPNYi90rGdYmDw3blfvi0xR2CcJpCISEgxM92/6JZAU4whpfXeV6SfG62cnts0NBw=
>>
>>
>> The full stack trace and screenshot is provided in the attachment.
>>
>> My setting for flink cluster and job:
>>
>>- flink version 1.4.0
>>- standalone mode
>>- 4 slots for each TM
>>- presto s3 filesystem
>>- rocksdb statebackend
>>- local ssd
>>- enable incremental checkpoint
>>
>> No weird message beside the exception in the log file. No high ratio of
>> GC during the checkpoint
>> procedure. And still 3 of 4 parts uploaded successfully on that TM. I
>> didn't find something that
>> would related to this failure. Did anyone meet this problem before?
>>
>> Besides, I also found an issue in other aws sdk[1] that mentioned this s3
>> exception as well. One
>> reply said you can passively avoid the problem by raising the max client
>> retires config. So I found
>> that config in presto[2]. Can I just add s3.max-client-retries: xxx in
>> flink-conf.yaml to config
>> it? If not, how should I do to overwrite the default value of this
>> configuration? Thanks in advance.
>>
>> Best,
>> Tony Wei
>>
>> [1] https://github.com/aws/aws-sdk-php/issues/885
>> [2] https://github.com/prestodb/presto/blob/master/
>> presto-hive/src/main/java/com/facebook/presto/hive/s3/
>> HiveS3Config.java#L218
>>
>


Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Tony Wei
Hi Stefan,

Thanks for your detailed explanation.

Best,
Tony Wei

2018-08-17 15:56 GMT+08:00 Stefan Richter :

> Hi,
>
> it will not be transported. The JM does the state assignment to create the
> deployment information for all tasks. If will just exclude the state for
> operators that are not present. So in your next checkpoints they will no
> longer be contained.
>
> Best,
> Stefan
>
>
> Am 17.08.2018 um 09:26 schrieb Tony Wei :
>
> Hi Chesnay,
>
> Thanks for your quick reply. I have another question. Will the state,
> which is ignored, be transported
> to TMs from DFS? Or will it be detected by JM's checkpoint coordinator and
> only those states reuired
> by operators be transported to each TM?
>
> Best,
> Tony Wei
>
> 2018-08-17 14:38 GMT+08:00 Chesnay Schepler :
>
>> The state won't exist in the snapshot.
>>
>>
>> On 17.08.2018 04:38, Tony Wei wrote:
>>
>> Hi all,
>>
>> I'm confused about the description in documentation. [1]
>>
>>
>>- *Removing a stateful operator:* The state of the removed operator
>>is lost unless
>>another operator takes it over. When starting the upgraded
>>application, you have
>>to explicitly agree to discard the state.
>>
>> Does that mean if I take a full snapshot (e.g. savepoint) after restoring
>> by explicitly agreeing to
>> discard the state, then the state won't exist in that snapshot? Or does
>> it just mean ignore the
>> state but the state still exist forever, unless I explicitly purge that
>> state by using state operator?
>>
>> And could this behavior differ between different state backend (Memory,
>> FS, RocksDB) ?
>>
>> Many thanks,
>> Tony Wei
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> ops/upgrading.html#application-topology
>>
>>
>>
>
>


Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Tony Wei
Hi Chesnay,

Thanks for your quick reply. I have another question. Will the state, which
is ignored, be transported
to TMs from DFS? Or will it be detected by JM's checkpoint coordinator and
only those states reuired
by operators be transported to each TM?

Best,
Tony Wei

2018-08-17 14:38 GMT+08:00 Chesnay Schepler :

> The state won't exist in the snapshot.
>
>
> On 17.08.2018 04:38, Tony Wei wrote:
>
> Hi all,
>
> I'm confused about the description in documentation. [1]
>
>
>- *Removing a stateful operator:* The state of the removed operator is
>lost unless
>another operator takes it over. When starting the upgraded
>application, you have
>to explicitly agree to discard the state.
>
> Does that mean if I take a full snapshot (e.g. savepoint) after restoring
> by explicitly agreeing to
> discard the state, then the state won't exist in that snapshot? Or does it
> just mean ignore the
> state but the state still exist forever, unless I explicitly purge that
> state by using state operator?
>
> And could this behavior differ between different state backend (Memory,
> FS, RocksDB) ?
>
> Many thanks,
> Tony Wei
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/ops/upgrading.html#application-topology
>
>
>


Need a clarification about removing a stateful operator

2018-08-16 Thread Tony Wei
Hi all,

I'm confused about the description in documentation. [1]


   - *Removing a stateful operator:* The state of the removed operator is
   lost unless
   another operator takes it over. When starting the upgraded application,
   you have
   to explicitly agree to discard the state.

Does that mean if I take a full snapshot (e.g. savepoint) after restoring
by explicitly agreeing to
discard the state, then the state won't exist in that snapshot? Or does it
just mean ignore the
state but the state still exist forever, unless I explicitly purge that
state by using state operator?

And could this behavior differ between different state backend (Memory, FS,
RocksDB) ?

Many thanks,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#application-topology


Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-07-30 Thread Tony Wei
Hi Juilo,

As Gordon said, the `records_lag_max` metric is a Kafka-shipped metric [1].
And I also found this thread [2] in Kafka mailing list. It seems that it is
the
design inner Kafka. So I think there is nothing we can do in
Flink-Kafka-Connector.

BTW, the Kafka document [1] said `records_lag_max` is the maximum lag
in terms of number of records for any partition in this "window". I'm not
sure
what this "window" means and if it is configurable. If it is configurable,
then
you can directly pass the config argument to Flink-Kafka-Connector to set
kafka consumer.

Best,
Tony Wei

[1] https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics
[2] https://lists.apache.org/thread.html/92475e5eb0c1a5fd08e49c30b3fef4
5213b8626e8fea8d52993c0d8c@%3Cusers.kafka.apache.org%3E

2018-07-31 1:36 GMT+08:00 Julio Biason :

> Hey Gordon,
>
> (Reviving this long thread) I think I found part of the problem: It seems
> the metric is capturing the lag from time to time and reseting the value
> in-between. I managed to replicate this attaching a SQL Sink
> (JDBCOutputFormat) connecting to an outside database -- something that took
> about 2 minutes to write 500 records.
>
> I opened the ticket https://issues.apache.org/jira/browse/FLINK-9998 with
> a bit more information about this ('cause I completely forgot to open a
> ticket a month ago about this).
>
> On Thu, Jun 14, 2018 at 11:31 AM, Julio Biason 
> wrote:
>
>> Hey Gordon,
>>
>> The job restarted somewhere in the middle of the night (I haven't checked
>> why yet) and now I have this weird status of the first TaskManager with
>> only one valid lag, the second with 2 and the third with none.
>>
>> I dunno if I could see the partition in the logs, but all "numRecordsOut"
>> are increasing over time (attached the screenshot of the graphs).
>>
>> On Thu, Jun 14, 2018 at 5:27 AM, Tzu-Li (Gordon) Tai > > wrote:
>>
>>> Hi,
>>>
>>> Thanks for the extra information. So, there seems to be 2 separate
>>> issues here. I’ll go through them one by one.
>>>
>>> I was also checking our Grafana and the metric we were using was
>>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems
>>> to be new (with the attempt thingy).
>>>
>>>
>>> After looking at the code changes in FLINK-8419, this unfortunately is a
>>> accidental “break” in the scope of the metric.
>>> In 1.4.0, the Kafka-shipped metrics were exposed under the
>>> “KafkaConsumer” metrics group. After FLINK-8419, this was changed, as you
>>> observed.
>>> In 1.5.0, however, I think the metrics are exposed under both patterns.
>>>
>>> Now, with the fact that some subtasks are returning -Inf for
>>> ‘record-lag-max’:
>>> If I understood the metric semantics correctly, this metric represents
>>> the "max record lag across **partitions subscribed by a Kafka consumer
>>> client**.
>>> So, the only possibility that could think of causing this, is that
>>> either the subtask does not have any partitions assigned to it, or simply
>>> there is a bug with the Kafka client returning this value.
>>>
>>> Is it possible that you verify that all subtasks have a partition
>>> assigned to it? That should be possible by just checking the job status in
>>> the Web UI, and observe the numRecordsOut value for each source subtask.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 13 June 2018 at 2:05:17 PM, Julio Biason (julio.bia...@azion.com)
>>> wrote:
>>>
>>> Hi Gordon,
>>>
>>> We have Kafka 0.10.1.1 running and use the flink-connector-kafka-0.10
>>> driver.
>>>
>>> There are a bunch of flink_taskmanager_job_task_operator_* metrics,
>>> including some about the committed offset for each partition. It seems I
>>> have 4 different records_lag_max with different attempt_id, though, 3 with
>>> -Inf and 1 with a value -- which will give me some more understand of
>>> Prometheus to extract this properly.
>>>
>>> I was also checking our Grafana and the metric we were using was
>>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems
>>> to be new (with the attempt thingy).
>>>
>>> On the "KafkaConsumer" front, but it only has the "commited_offset" for
>>> each partit

Re: Conceptual question

2018-06-12 Thread Tony Wei
Hi David,

I have read the document for `Context.applyToKeyedState()`, but I still
have some questions for using it to implement keyed state migration.
`Context.applyToKeyedState()` can only be called in
`processBoradcaseElement()`, so it won't have any key information.
It looks like I can use `KeyedStateFunction` to get, update or clear my
keyed states. Am I right?
If I want to migrate to different type, e.g. change `string` type to `int`
type, how do I archive by using this functionality?
It seems that I can't use `key` parameter in `KeyedStateFunction` to access
the other state, generated by another state descriptor.
Please correct me if I misunderstood. Thank you.

Best Regards,
Tony Wei



2018-06-09 9:45 GMT+08:00 TechnoMage :

> Thank you all.  This discussion is very helpful.  It sounds like I can
> wait for 1.6 though given our development status.
>
> Michael
>
>
> On Jun 8, 2018, at 1:08 PM, David Anderson 
> wrote:
>
> Hi all,
>
> I think I see a way to eagerly do full state migration without writing
> your own Operator, but it's kind of hacky and may have flaws I'm not aware
> of.
>
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to
> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
> because in the processBroadcastElement() method you can supply a
> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
> KeyedStateFunction
> will be applied every item of keyed state associated with the state
> descriptor you specify. I've been doing some experiments with this, and
> it's quite powerful in cases where it's useful to operate on all of your
> application's state.
>
> I believe this was intended for cases where an update to an item of
> broadcast state has implications for associated keyed state, but I see
> nothing that prevents you from essentially ignoring the broadcast stream
> and using this mechanism to implement keyed state migration.
>
> David
>
>
>
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Yes it should be feasible. As I said before, with Flink 1.6 there will be
>> better way for migrating a state, but for now you either need to lazily
>> convert the state, or iterate over the keys and do the job manually.
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> So my question is: is that feasible to migrate state from
>> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
>> migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can
>> it be available in the future? Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski :
>>
>>> Hi,
>>>
>>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
>>> function and you can not migrate your state that way.
>>>
>>> As far as I know yes, at the moment in order to convert everything at
>>> once (without getKeyes you still can implement lazy conversion) you would
>>> have to write your own operator.
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>>>
>>> Hi Piotrek,
>>>
>>> I used `ProcessFunction` to implement it, but it seems that I can't call
>>> `getKeyedStateBackend()` like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in
>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous
>>> states to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
>>> to migration state like the manner showed in `WindowOperator`?
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>>>
>>>> What function are you implementing and how are you using it?
>>>>
>>>> Usually it’s enough if your function implements RichFunction (or rather
>>>> extend from AbstractRichFunction) and then you could use RichFunction#open
>>>> in the similar manner as in the code that I posted in previous message.
>>>> Flink in many places performs instanceof chekcs like:
>>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>>>
>>>> public static void openFunction(Function fun
>>>> ction, Configuration parameters) throws Exception{
>>>>  

Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

So my question is: is that feasible to migrate state from `ProcessFunction`
to my own operator then use `getKeyedStateBackend()` to migrate the states?
If yes, is there anything I need to be careful with? If no, why and can it
be available in the future? Thank you.

Best Regards,
Tony Wei

2018-06-07 21:43 GMT+08:00 Piotr Nowojski :

> Hi,
>
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
> function and you can not migrate your state that way.
>
> As far as I know yes, at the moment in order to convert everything at once
> (without getKeyes you still can implement lazy conversion) you would have
> to write your own operator.
>
> Piotrek
>
>
> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>
> Hi Piotrek,
>
> I used `ProcessFunction` to implement it, but it seems that I can't call
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
> to migration state like the manner showed in `WindowOperator`?
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>
>> What function are you implementing and how are you using it?
>>
>> Usually it’s enough if your function implements RichFunction (or rather
>> extend from AbstractRichFunction) and then you could use RichFunction#open
>> in the similar manner as in the code that I posted in previous message.
>> Flink in many places performs instanceof chekcs like:
>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>
>> public static void openFunction(Function fun
>> ction, Configuration parameters) throws Exception{
>>if (function instanceof RichFunction) {
>>       RichFunction richFunction = (RichFunction) function;
>>   richFunction.open(parameters);
>>}
>> }
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> It seems that this was implemented by `Operator` API, which is a more low
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event
>> triggered, it is more convenient in this way to migrate state by foreach
>> all keys in `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it
>> possible to port it to `KeyedProcessOperator` and do the state migration
>> that you mentioned?
>> And are there something concerned and difficulties that will leads to
>> restored state failed or other problems? Thank you!
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>>
>>> Hi,
>>>
>>> General solution for state/schema migration is under development and it
>>> might be released with Flink 1.6.0.
>>>
>>> Before that, you need to manually handle the state migration in your
>>> operator’s open method. Lets assume that your OperatorV1 has a state field
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>>> with previous version. What you can do, is to add a logic in open method,
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>>> migrate “stateV1” to “stateV2”
>>>
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>
>>> I have once implemented something like that here:
>>>
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/
>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe
>>> rator.java#L258
>>>
>>> Hope that helps!
>>>
>>> Piotrek
>>>
>>>
>>> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>>>
>>> We are still pretty new to Flink and I have a conceptual / DevOps
>>> question.
>>>
>>> When a job is modified and we want to deploy the new version, what is
>>> the preferred method?  Our jobs have a lot of keyed state.
>>>
>>> If we use snapshots we have old state that may no longer apply to the
>>> new pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but
>>> that can be very resource heavy for a while.
>>>
>>> Is there an option I am missing?  Are there facilities to “patch” or
>>> “purge” selectively the keyed state?
>>>
>>> Michael
>>>
>>>
>>>
>>
>>
>
>


Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

I used `ProcessFunction` to implement it, but it seems that I can't call
`getKeyedStateBackend()` like `WindowOperator` did.
I found that `getKeyedStateBackend()` is the method in
`AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
Dose that mean I can't look up all keys and migrate the entire previous
states to the new states in `ProcessFunction#open()`?
As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to
migration state like the manner showed in `WindowOperator`?

Best Regards,
Tony Wei

2018-06-07 20:28 GMT+08:00 Piotr Nowojski :

> What function are you implementing and how are you using it?
>
> Usually it’s enough if your function implements RichFunction (or rather
> extend from AbstractRichFunction) and then you could use RichFunction#open
> in the similar manner as in the code that I posted in previous message.
> Flink in many places performs instanceof chekcs like: org.apache.flink.api.
> common.functions.util.FunctionUtils#openFunction
>
> public static void openFunction(Function function, Configuration
> parameters) throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
>
> Piotrek
>
>
> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>
> Hi Piotrek,
>
> It seems that this was implemented by `Operator` API, which is a more low
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event
> triggered, it is more convenient in this way to migrate state by foreach
> all keys in `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it
> possible to port it to `KeyedProcessOperator` and do the state migration
> that you mentioned?
> And are there something concerned and difficulties that will leads to
> restored state failed or other problems? Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>
>> Hi,
>>
>> General solution for state/schema migration is under development and it
>> might be released with Flink 1.6.0.
>>
>> Before that, you need to manually handle the state migration in your
>> operator’s open method. Lets assume that your OperatorV1 has a state field
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>> with previous version. What you can do, is to add a logic in open method,
>> to check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>> migrate “stateV1” to “stateV2”
>>
>> In your OperatorV3 you could drop the support for “stateV1”.
>>
>> I have once implemented something like that here:
>>
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L258
>>
>> Hope that helps!
>>
>> Piotrek
>>
>>
>> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>>
>> We are still pretty new to Flink and I have a conceptual / DevOps
>> question.
>>
>> When a job is modified and we want to deploy the new version, what is the
>> preferred method?  Our jobs have a lot of keyed state.
>>
>> If we use snapshots we have old state that may no longer apply to the new
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but
>> that can be very resource heavy for a while.
>>
>> Is there an option I am missing?  Are there facilities to “patch” or
>> “purge” selectively the keyed state?
>>
>> Michael
>>
>>
>>
>
>


Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

It seems that this was implemented by `Operator` API, which is a more low
level api compared to `Function` API.
Since in `Function` API level we can only migrate state by event triggered,
it is more convenient in this way to migrate state by foreach all keys in
`open()` method.
If I was implemented state operator by `ProcessFunction` API, is it
possible to port it to `KeyedProcessOperator` and do the state migration
that you mentioned?
And are there something concerned and difficulties that will leads to
restored state failed or other problems? Thank you!

Best Regards,
Tony Wei

2018-06-07 16:10 GMT+08:00 Piotr Nowojski :

> Hi,
>
> General solution for state/schema migration is under development and it
> might be released with Flink 1.6.0.
>
> Before that, you need to manually handle the state migration in your
> operator’s open method. Lets assume that your OperatorV1 has a state field
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
> with previous version. What you can do, is to add a logic in open method,
> to check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually
> migrate “stateV1” to “stateV2”
>
> In your OperatorV3 you could drop the support for “stateV1”.
>
> I have once implemented something like that here:
>
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f9
> 5c000926b2/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>
> Hope that helps!
>
> Piotrek
>
>
> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>
> We are still pretty new to Flink and I have a conceptual / DevOps question.
>
> When a job is modified and we want to deploy the new version, what is the
> preferred method?  Our jobs have a lot of keyed state.
>
> If we use snapshots we have old state that may no longer apply to the new
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but
> that can be very resource heavy for a while.
>
> Is there an option I am missing?  Are there facilities to “patch” or
> “purge” selectively the keyed state?
>
> Michael
>
>
>


Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

2018-05-16 Thread Tony Wei
Hi Ufuk, Piotr

Thanks for all of your replies. I knew that jobs are cancelled if the JM
looses the connection to ZK, but JM didn't loose connection in my case.
My job failed because of the exception from KafkaProducer. However, it
happened before and after that exception that TM lost ZK connection.
So, as Piotr said, it looks like an error in Kafka producer and I will pay
more attention on it to see if there is something unexpected happens again.

Best Regards,
Tony Wei

2018-05-15 19:56 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> It looks like there was an error in asynchronous job of sending the
> records to Kafka. Probably this is a collateral damage of loosing
> connection to zookeeper.
>
> Piotrek
>
> On 15 May 2018, at 13:33, Ufuk Celebi <u...@apache.org> wrote:
>
> Hey Tony,
>
> thanks for the detailed report.
>
> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK
> and recovered when the connection is re-established (and one JM becomes
> leader again).
>
> - Regarding the KafkaProducer: I'm not sure from the log message whether
> Flink closes the KafkaProducer because the job is cancelled or because
> there is a connectivity issue to the Kafka cluster. Including Piotr (cc) in
> this thread who has worked on the KafkaProducer in the past. If it is a
> connectivity issue, it might also explain why you lost the connection to ZK.
>
> Glad to hear that everything is back to normal. Keep us updated if
> something unexpected happens again.
>
> – Ufuk
>
>
> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920...@gmail.com> wrote:
>
>> Hi all,
>>
>> I restarted the cluster and changed the log level to DEBUG, and raised
>> the parallelism of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I
>> will bring more informations back for help. Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920...@gmail.com>:
>>
>>> Hi all,
>>>
>>> After I changed the *`h*
>>> *igh-availability.zookeeper.client.session-timeout`* and
>>> *`maxSessionTimeout`* to 12ms, the exception still occurred.
>>>
>>> Here is the log snippet. It seems this is nothing to do with zookeeper
>>> client timeout, but I still don't know why kafka producer would be closed
>>> without any task state changed.
>>>
>>> ```
>>> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>> not heard from server in 82828ms for sessionid 0x305f957eb8d000a
>>> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>> not heard from server in 82828ms for sessionid 0x305f957eb8d000a, closing
>>> socket connection and attempting reconnect
>>> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curato
>>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>>> change: SUSPENDED
>>> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leade
>>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>>> suspended. Can no longer retrieve the leader from ZooKeeper.
>>> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - SASL configuration failed:
>>> javax.security.auth.login.LoginException: No JAAS configuration section
>>> named 'Client' was found in specified JAAS configuration file:
>>> '/mnt/jaas-466390940757021791.conf'. Will continue connection to
>>> Zookeeper server without SASL authentication, if Zookeeper server allows it.
>>> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Opening socket connection to
>>> server XXX.XXX.XXX.XXX:2181
>>> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curato
>>> r.org.apache.curator.ConnectionState  - Authentication failed
>>> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Socket connection established to
>>> XXX.XXX.XXX.XXX:2181, initiating session
>>> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Session establishment complete
>>> on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a, negotiated
>>> timeout = 12

Question about the behavior of TM when it lost the zookeeper client session in HA mode

2018-05-13 Thread Tony Wei
Hi all,

Recently, my flink job met a problem that caused the job failed and
restarted.

The log is list this screen snapshot



or this

```
2018-05-11 13:21:04,582 WARN
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client
session timed out, have not heard from server in 61054ms for sessionid
0x3054b165fe2006a
2018-05-11 13:21:04,583 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client
session timed out, have not heard from server in 61054ms for sessionid
0x3054b165fe2006a, closing socket connection and attempting reconnect
2018-05-11 13:21:04,683 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
- State change: SUSPENDED
2018-05-11 13:21:04,686 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.
2018-05-11 13:21:04,689 INFO
org.apache.kafka.clients.producer.KafkaProducer   - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-11 13:21:04,694 INFO
org.apache.kafka.clients.producer.KafkaProducer   - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task
 - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd
-> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched
from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected
before a response was received.
```

Logs showed *`org.apache.kafka.clients.producer.KafkaProducer - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.`* This timeout
value is *Long.MAX_VALUE*. It happened when someone called
*`producer.close()`*.

And I also saw the log said
*`org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
- Client session timed out, have not heard from server in 61054ms for
sessionid 0x3054b165fe2006a, closing socket connection and attempting
reconnect`*
and *`org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
- Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.`*

I have checked zookeeper and kafka and there was no error during that
period.
I was wondering if TM will stop the tasks when it lost zookeeper client in
HA mode. Since I didn't see any document or mailing thread discuss this,
I'm not sure if this is the reason that made kafka producer closed.
Could someone who know HA well? Or someone know what happened in my job?

My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My
zookeeper cluster version is 3.4.11 with 3 nodes.
The *`high-availability.zookeeper.client.session-timeout`* is default
value: 6 ms.
The *`maxSessionTimeout`* in zoo.cfg is 4ms.
I have already change the *maxSessionTimeout* to 12ms this morning.

This problem happened many many times during the last weekend and made my
kafka log delay grew up. Please help me. Thank you very much!

Best Regards,
Tony Wei


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-05-04 Thread Tony Wei
Hi Stefan, Sihua,

We finally found out the root cause. Just as you said, why the performance
had been downgraded is due to EBS.
My team and I wasn't familiar with EBS before. We thought its performance
is not so weak as the monitor showed us.
But we visited this page [1]
<https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html#EBSVolumeTypes_gp2>
and found that we had a big misunderstanding about EBS.

All in all, our checkpoint procedure hit the burst IO performance over the
maximum duration and made the IO performance downgraded.
We have replaced to local SSDs and enabled incremental checkpoint mechanism
as well. Our job has run healthily for more than two weeks.

Thank you all for helping me to investigate and solve this issue.

Best Regards,
Tony Wei

[1] EBS: I/O Credits and Burst Performance
<https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html#EBSVolumeTypes_gp2>

2018-03-09 17:47 GMT+08:00 Tony Wei <tony19920...@gmail.com>:

> Hi Stefan,
>
> We prepared to  run it on local SSDs yesterday. However, as I said, the
> problem just disappeared. Of course we will replace it to local SSDs, but
> I'm afraid that I might be able to reproduce the same situation for both
> environments to compare the difference.
>
> Best Regards,
> Tony Wei.
>
> 2018-03-09 16:59 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Hi,
>>
>> if processing and checkpointing are stuck in RocksDB, this could indeed
>> hint to a problem with your IO system. The comment about using EBS could be
>> important, as it might be a bad idea from a performance point of view to
>> run RocksDB on EBS; did you ever compare against running it on local SSDs?
>>
>> Best,
>> Stefan
>>
>> Am 09.03.2018 um 05:08 schrieb Tony Wei <tony19920...@gmail.com>:
>>
>> Hi Stefan, Sihua,
>>
>> TLDR; after the experiment, I found that this problem might not about s3
>> filesystem or network io with s3. It might caused by rocksdb and io
>> performance, but I still can't recognize who caused this problem.
>>
>> For more specific details, I have to introduce my flink application's
>> detail and what I found in the experiment. The disks I used for EC2 are
>> SSD, but they are EBS.
>>
>> For the application detail, there is only one keyed ProcessFunction with
>> ValueState with scala collection data type, which represents the counting
>> by event and date
>> This operator with receive two type of message: one is event message and
>> the other is overwrite state message
>> When operator received an event message, it would update the
>> corresponding value by event and client time and emit the event to the next
>> operator with the "whole" collection, that's why I used ValueState not
>> MapState or ListState.
>> When operator received a overwrite state message, it would overwrite the
>> corresponding value in the state. This is the design that we want to replay
>> the state constructed by the new rules.
>> Moreover, my key is something like user cookie, and I have a timer
>> callback to remove those out-dated state, for example: I'm only care about
>> the state in 6 months.
>>
>> For the experiment, I tried to catch the first failure to find out some
>> cues, so I extended the checkpoint interval to a long time and use
>> savepoint. I know savepoint is not actually same as checkpoint, but I guess
>> the parts of store state and upload to remote filesystem are similar.
>> After some savepoints triggered, I found that asynchronous part was stuck
>> in full snapshot operation and time triggers in that machine were also
>> stock and blocked the operator to process element.
>> I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours,
>> and the first problem happened during the replay procedure. It is just a
>> coincidence that the callback from those keys that I replayed happened when
>> I run the experiment.
>> However, when I tried to disable all checkpoints and savepoints to
>> observed if the massive keys to access rocksdb get stuck, I found the
>> problem was disappeared. Moreover, I roll back to the original setting that
>> checkpoint got stuck. The problem didn't happened again yet.
>>
>> In summary, I sill can't tell where the problem happened, since the io
>> performance didn't touch the limitation and the problem couldn't reproduce
>> based on the same initial states.
>> I decide to try out incremental checkpoint to reduce this risk. I will
>> reopen this thread  with more information I can provide when the problem
>> happen again. If you have any suggestion about my impl

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-09 Thread Tony Wei
Hi Stefan,

We prepared to  run it on local SSDs yesterday. However, as I said, the
problem just disappeared. Of course we will replace it to local SSDs, but
I'm afraid that I might be able to reproduce the same situation for both
environments to compare the difference.

Best Regards,
Tony Wei.

2018-03-09 16:59 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> if processing and checkpointing are stuck in RocksDB, this could indeed
> hint to a problem with your IO system. The comment about using EBS could be
> important, as it might be a bad idea from a performance point of view to
> run RocksDB on EBS; did you ever compare against running it on local SSDs?
>
> Best,
> Stefan
>
> Am 09.03.2018 um 05:08 schrieb Tony Wei <tony19920...@gmail.com>:
>
> Hi Stefan, Sihua,
>
> TLDR; after the experiment, I found that this problem might not about s3
> filesystem or network io with s3. It might caused by rocksdb and io
> performance, but I still can't recognize who caused this problem.
>
> For more specific details, I have to introduce my flink application's
> detail and what I found in the experiment. The disks I used for EC2 are
> SSD, but they are EBS.
>
> For the application detail, there is only one keyed ProcessFunction with
> ValueState with scala collection data type, which represents the counting
> by event and date
> This operator with receive two type of message: one is event message and
> the other is overwrite state message
> When operator received an event message, it would update the corresponding
> value by event and client time and emit the event to the next operator with
> the "whole" collection, that's why I used ValueState not MapState or
> ListState.
> When operator received a overwrite state message, it would overwrite the
> corresponding value in the state. This is the design that we want to replay
> the state constructed by the new rules.
> Moreover, my key is something like user cookie, and I have a timer
> callback to remove those out-dated state, for example: I'm only care about
> the state in 6 months.
>
> For the experiment, I tried to catch the first failure to find out some
> cues, so I extended the checkpoint interval to a long time and use
> savepoint. I know savepoint is not actually same as checkpoint, but I guess
> the parts of store state and upload to remote filesystem are similar.
> After some savepoints triggered, I found that asynchronous part was stuck
> in full snapshot operation and time triggers in that machine were also
> stock and blocked the operator to process element.
> I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours,
> and the first problem happened during the replay procedure. It is just a
> coincidence that the callback from those keys that I replayed happened when
> I run the experiment.
> However, when I tried to disable all checkpoints and savepoints to
> observed if the massive keys to access rocksdb get stuck, I found the
> problem was disappeared. Moreover, I roll back to the original setting that
> checkpoint got stuck. The problem didn't happened again yet.
>
> In summary, I sill can't tell where the problem happened, since the io
> performance didn't touch the limitation and the problem couldn't reproduce
> based on the same initial states.
> I decide to try out incremental checkpoint to reduce this risk. I will
> reopen this thread  with more information I can provide when the problem
> happen again. If you have any suggestion about my implementation that might
> leads to some problems or about this issue, please advice me.
> Thank you ver much for taking your time to pay attention on this issue!! =
> )
>
> p.s. the attachment is about the experiment I mentioned above. I didn't
> record the stack trace because the only difference is only Time Trigger's
> state were runnable and the operator were blocked.
>
> Best Regards,
> Tony Wei
>
>
> 2018-03-06 17:00 GMT+08:00 周思华 <summerle...@163.com>:
>
>> Hi Tony,
>>
>> I agree with you.
>>
>> Best Regards,
>>
>> Sihua Zhou
>>
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 15:34,Tony Wei<tony19920...@gmail.com>
>> <tony19920...@gmail.com> wrote:
>>
>> Hi Sihua,
>>
>> You are right. The incremental checkpoint might release machine from high
>> cpu loading and make the bad machines recover quickly, but I was wondering
>> why the first checkpoint failed by timeout. You can see when the bad
>> machine recovered, the cpu loading for each checkpoint is not so high,
>> although there were still peeks in each checkpoint happened. I think the
>> high cpu loading that might be caused by those timeou

Re: Fwd: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Sihua,

You are right. The incremental checkpoint might release machine from high
cpu loading and make the bad machines recover quickly, but I was wondering
why the first checkpoint failed by timeout. You can see when the bad
machine recovered, the cpu loading for each checkpoint is not so high,
although there were still peeks in each checkpoint happened. I think the
high cpu loading that might be caused by those timeout checkpointing
threads is not the root cause. I will use the incremental checkpoint
eventually but I will decide if change my persistence filesystem after we
find out the root cause or stop the investigation and make the
conclusion in this mailing thread. What do you think?

Best Regards,
Tony Wei

2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com>:

> Hi Tony,
>
> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu load
> is so much higher that the 'good tm', so I think maybe it also a reason
> that could lead to timeout. Since you were using "full checkpoint", it need
> to iterate all the records in the RocksDB with some `if` check, when the
> state is huge this is cpu costly. Let me try to explain the full checkpoint
> a bit more, it contains two parts.
>
> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint
> Duration (sync) " on the checkpoint detail page)
>
> Part2. Loop the records of the snapshot, along with some `if` check to ensure
> that the data is sent to s3 in the order of the key group. (This can map to
> the "Checkpoint Duration(Async)").
>
> So part2 could be cpu costly and network costly, if the CPU load is too
> high, then sending data will slow down, because there are in a single loop.
> If cpu is the reason, this phenomenon will disappear if you use increment
> checkpoint, because it almost only send data to s3. In the all, for now
> trying out the incremental checkpoint is the best thing to do I think.
>
> Best Regards,
> Sihua Zhou
>
>
> 发自网易邮箱大师
>
> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com>
> <tony19920...@gmail.com> wrote:
>
> Sent to the wrong mailing list. Forward it to the correct one.
>
> -- Forwarded message --
> From: Tony Wei <tony19920...@gmail.com>
> Date: 2018-03-06 14:43 GMT+08:00
> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
> To: 周思华 <summerle...@163.com>, Stefan Richter <s.rich...@data-artisans.com
> >
> Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>
>
>
> Hi Sihua,
>
> Thanks a lot. I will try to find out the problem from machines'
> environment. If you and Stefan have any new suggestions or thoughts, please
> advise me. Thank you !
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:
>
>> Hi Tony,
>>
>> I think the two things you mentioned can both lead to a bad network. But
>> from my side, I think it more likely that it is the unstable network env
>> that cause the poor network performance itself, because as far as I know
>> I can't found out the reason that the flink would slow down the network so
>> much (even It does, the effect should not be that so much).
>>
>> Maybe stefan could tell more about that. ;)
>>
>> Best Regards,
>> Sihua Zhou
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com>
>> <tony19920...@gmail.com> wrote:
>>
>> Hi Sihua,
>>
>>
>>> Hi Tony,
>>>
>>> About to your question: average end to end latency of checkpoint is less
>>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>>> determined byt the max end to end latency (the slowest one), a checkpoint
>>> truly completed only after all task's checkpoint have completed.
>>>
>>
>> Sorry for my poor expression. What I mean is the average duration of
>> "completed" checkpoints, so I guess there are some problems that make some
>> subtasks of checkpoint take so long, even more than 10 mins.
>>
>>
>>>
>>> About to the problem: after a second look at the info you privode, we
>>> can found from the checkpoint detail picture that there is one task which
>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4
>>> others tasks didn't complete the checkpoint yet. And from the
>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the
>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I
>>> guss the network is a problem, sometimes it failed to send 50

Fwd: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Sent to the wrong mailing list. Forward it to the correct one.

-- Forwarded message --
From: Tony Wei <tony19920...@gmail.com>
Date: 2018-03-06 14:43 GMT+08:00
Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
To: 周思华 <summerle...@163.com>, Stefan Richter <s.rich...@data-artisans.com>
Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>


Hi Sihua,

Thanks a lot. I will try to find out the problem from machines'
environment. If you and Stefan have any new suggestions or thoughts, please
advise me. Thank you !

Best Regards,
Tony Wei

2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:

> Hi Tony,
>
> I think the two things you mentioned can both lead to a bad network. But
> from my side, I think it more likely that it is the unstable network env
> that cause the poor network performance itself, because as far as I know
> I can't found out the reason that the flink would slow down the network so
> much (even It does, the effect should not be that so much).
>
> Maybe stefan could tell more about that. ;)
>
> Best Regards,
> Sihua Zhou
>
> 发自网易邮箱大师
>
> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com>
> <tony19920...@gmail.com> wrote:
>
> Hi Sihua,
>
>
>> Hi Tony,
>>
>> About to your question: average end to end latency of checkpoint is less
>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>> determined byt the max end to end latency (the slowest one), a checkpoint
>> truly completed only after all task's checkpoint have completed.
>>
>
> Sorry for my poor expression. What I mean is the average duration of
> "completed" checkpoints, so I guess there are some problems that make some
> subtasks of checkpoint take so long, even more than 10 mins.
>
>
>>
>> About to the problem: after a second look at the info you privode, we can
>> found from the checkpoint detail picture that there is one task which cost
>> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others
>> tasks didn't complete the checkpoint yet. And from the bad_tm_pic.png vs
>> good_tm_pic.png, we can found that on "bad tm" the network performance is
>> far less than the "good tm" (-15 MB vs -50MB). So I guss the network is a
>> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe
>> you need to check whether the network env is stable)
>>
>
> That is what I concerned. Because I can't determine if checkpoint is stuck
> makes network performance worse or network performance got worse makes
> checkpoint stuck.
> Although I provided one "bad machine" and one "good machine", that doesn't
> mean bad machine is always bad and good machine is always good. See the
> attachments.
> All of my TMs met this problem at least once from last weekend until now.
> Some machines recovered by themselves and some recovered after I restarted
> them.
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 13:41 GMT+08:00 周思华 <summerle...@163.com>:
>
>>
>> Hi Tony,
>>
>> About to your question: average end to end latency of checkpoint is less
>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>> determined byt the max end to end latency (the slowest one), a checkpoint
>> truly completed only after all task's checkpoint have completed.
>>
>> About to the problem: after a second look at the info you privode, we can
>> found from the checkpoint detail picture that there is one task which cost
>> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others
>> tasks didn't complete the checkpoint yet. And from the bad_tm_pic.png vs
>> good_tm_pic.png, we can found that on "bad tm" the network performance is
>> far less than the "good tm" (-15 MB vs -50MB). So I guss the network is a
>> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe
>> you need to check whether the network env is stable)
>>
>> About the solution: I think incremental checkpoint can help you a lot, it
>> will only send the new data each checkpoint, but you are right if the
>> increment state size is huger than 500M, it might cause the timeout problem
>> again (because of the bad network performance).
>>
>> Best Regards,
>> Sihua Zhou
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 13:02,Tony Wei<tony19920...@gmail.com>
>> <tony19920...@gmail.com> wrote:
>>
>> Hi Sihua,
>>
>> Thanks for your suggestion. "incremental checkpoint" is what I will try
>> out next and I know it will giv

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Sihua,

Thanks for your suggestion. "incremental checkpoint" is what I will try out
next and I know it will give a better performance. However, it might not
solve this issue completely, because as I said, the average end to end
latency of checkpointing is less than 1.5 mins currently, and it is far
from my timeout configuration. I believe "incremental checkpoint" will
reduce the latency and make this issue might occur seldom, but I can't
promise it won't happen again if I have bigger states growth in the future.
Am I right?

Best Regards,
Tony Wei

2018-03-06 10:55 GMT+08:00 周思华 <summerle...@163.com>:

> Hi Tony,
>
> Sorry for jump into, one thing I want to remind is that from the log you
> provided it looks like you are using "full checkpoint", this means that the
> state data that need to be checkpointed and transvered to s3 will grow over
> time, and even for the first checkpoint it performance is slower that
> incremental checkpoint (because it need to iterate all the record from the
> rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental
> checkpoint", it could help you got a better performance.
>
> Best Regards,
> Sihua Zhou
>
> 发自网易邮箱大师
>
> On 03/6/2018 10:34,Tony Wei<tony19920...@gmail.com>
> <tony19920...@gmail.com> wrote:
>
> Hi Stefan,
>
> I see. That explains why the loading of machines grew up. However, I think
> it is not the root cause that led to these consecutive checkpoint timeout.
> As I said in my first mail, the checkpointing progress usually took 1.5
> mins to upload states, and this operator and kafka consumer are only two
> operators that have states in my pipeline. In the best case, I should never
> encounter the timeout problem that only caused by lots of pending
> checkpointing threads that have already timed out. Am I right?
>
> Since these logging and stack trace was taken after nearly 3 hours from
> the first checkpoint timeout, I'm afraid that we couldn't actually find out
> the root cause for the first checkpoint timeout. Because we are preparing
> to make this pipeline go on production, I was wondering if you could help
> me find out where the root cause happened: bad machines or s3 or
> flink-s3-presto packages or flink checkpointing thread. It will be great if
> we can find it out from those informations the I provided, or a
> hypothesis based on your experience is welcome as well. The most important
> thing is that I have to decide whether I need to change my persistence
> filesystem or use another s3 filesystem package, because it is the last
> thing I want to see that the checkpoint timeout happened very often.
>
> Thank you very much for all your advices.
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 1:07 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Hi,
>>
>> thanks for all the info. I had a look into the problem and opened
>> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your
>> stack trace, you can see many checkpointing threads are running on your TM
>> for checkpoints that have already timed out, and I think this cascades and
>> slows down everything. Seems like the implementation of some features like
>> checkpoint timeouts and not failing tasks from checkpointing problems
>> overlooked that we also require to properly communicate that checkpoint
>> cancellation to all task, which was not needed before.
>>
>> Best,
>> Stefan
>>
>>
>> Am 05.03.2018 um 14:42 schrieb Tony Wei <tony19920...@gmail.com>:
>>
>> Hi Stefan,
>>
>> Here is my checkpointing configuration.
>>
>> Checkpointing Mode Exactly Once
>> Interval 20m 0s
>> Timeout 10m 0s
>> Minimum Pause Between Checkpoints 0ms
>> Maximum Concurrent Checkpoints 1
>> Persist Checkpoints Externally Enabled (delete on cancellation)
>> Best Regards,
>> Tony Wei
>>
>> 2018-03-05 21:30 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>>
>>> Hi,
>>>
>>> quick question: what is your exact checkpointing configuration? In
>>> particular, what is your value for the maximum parallel checkpoints and the
>>> minimum time interval to wait between two checkpoints?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com>:
>>> >
>>> > Hi all,
>>> >
>>> > Last weekend, my flink job's checkpoint start failing because of
>>> timeout. I have no idea what happened, but I collect some informations
>>> about my cluster and job. Hope someone can give me advices or hin

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Stefan,

I see. That explains why the loading of machines grew up. However, I think
it is not the root cause that led to these consecutive checkpoint timeout.
As I said in my first mail, the checkpointing progress usually took 1.5
mins to upload states, and this operator and kafka consumer are only two
operators that have states in my pipeline. In the best case, I should never
encounter the timeout problem that only caused by lots of pending
checkpointing threads that have already timed out. Am I right?

Since these logging and stack trace was taken after nearly 3 hours from the
first checkpoint timeout, I'm afraid that we couldn't actually find out the
root cause for the first checkpoint timeout. Because we are preparing to
make this pipeline go on production, I was wondering if you could help me
find out where the root cause happened: bad machines or s3 or
flink-s3-presto packages or flink checkpointing thread. It will be great if
we can find it out from those informations the I provided, or a
hypothesis based on your experience is welcome as well. The most important
thing is that I have to decide whether I need to change my persistence
filesystem or use another s3 filesystem package, because it is the last
thing I want to see that the checkpoint timeout happened very often.

Thank you very much for all your advices.

Best Regards,
Tony Wei

2018-03-06 1:07 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> thanks for all the info. I had a look into the problem and opened
> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your
> stack trace, you can see many checkpointing threads are running on your TM
> for checkpoints that have already timed out, and I think this cascades and
> slows down everything. Seems like the implementation of some features like
> checkpoint timeouts and not failing tasks from checkpointing problems
> overlooked that we also require to properly communicate that checkpoint
> cancellation to all task, which was not needed before.
>
> Best,
> Stefan
>
>
> Am 05.03.2018 um 14:42 schrieb Tony Wei <tony19920...@gmail.com>:
>
> Hi Stefan,
>
> Here is my checkpointing configuration.
>
> Checkpointing Mode Exactly Once
> Interval 20m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints 1
> Persist Checkpoints Externally Enabled (delete on cancellation)
> Best Regards,
> Tony Wei
>
> 2018-03-05 21:30 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Hi,
>>
>> quick question: what is your exact checkpointing configuration? In
>> particular, what is your value for the maximum parallel checkpoints and the
>> minimum time interval to wait between two checkpoints?
>>
>> Best,
>> Stefan
>>
>> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com>:
>> >
>> > Hi all,
>> >
>> > Last weekend, my flink job's checkpoint start failing because of
>> timeout. I have no idea what happened, but I collect some informations
>> about my cluster and job. Hope someone can give me advices or hints about
>> the problem that I encountered.
>> >
>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has
>> 4 cores. These machines are ec2 spot instances. The job's parallelism is
>> set as 32, using rocksdb as state backend and s3 presto as checkpoint file
>> system.
>> > The state's size is nearly 15gb and still grows day-by-day. Normally,
>> It takes 1.5 mins to finish the whole checkpoint process. The timeout
>> configuration is set as 10 mins.
>> >
>> > 
>> >
>> > As the picture shows, not each subtask of checkpoint broke caused by
>> timeout, but each machine has ever broken for all its subtasks during last
>> weekend. Some machines recovered by themselves and some machines recovered
>> after I restarted them.
>> >
>> > I record logs, stack trace and snapshot for machine's status (CPU, IO,
>> Network, etc.) for both good and bad machine. If there is a need for more
>> informations, please let me know. Thanks in advance.
>> >
>> > Best Regards,
>> > Tony Wei.
>> > > log.log>
>>
>>
>
>


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Stefan,

Here is my checkpointing configuration.

Checkpointing Mode Exactly Once
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (delete on cancellation)
Best Regards,
Tony Wei

2018-03-05 21:30 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> quick question: what is your exact checkpointing configuration? In
> particular, what is your value for the maximum parallel checkpoints and the
> minimum time interval to wait between two checkpoints?
>
> Best,
> Stefan
>
> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com>:
> >
> > Hi all,
> >
> > Last weekend, my flink job's checkpoint start failing because of
> timeout. I have no idea what happened, but I collect some informations
> about my cluster and job. Hope someone can give me advices or hints about
> the problem that I encountered.
> >
> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has
> 4 cores. These machines are ec2 spot instances. The job's parallelism is
> set as 32, using rocksdb as state backend and s3 presto as checkpoint file
> system.
> > The state's size is nearly 15gb and still grows day-by-day. Normally, It
> takes 1.5 mins to finish the whole checkpoint process. The timeout
> configuration is set as 10 mins.
> >
> > 
> >
> > As the picture shows, not each subtask of checkpoint broke caused by
> timeout, but each machine has ever broken for all its subtasks during last
> weekend. Some machines recovered by themselves and some machines recovered
> after I restarted them.
> >
> > I record logs, stack trace and snapshot for machine's status (CPU, IO,
> Network, etc.) for both good and bad machine. If there is a need for more
> informations, please let me know. Thanks in advance.
> >
> > Best Regards,
> > Tony Wei.
> >  tm_log.log>
>
>


Re: Will that be a problem if POJO key type didn't override equals?

2018-02-06 Thread Tony Wei
Hi Timo,

Thanks a lot. I will try it out.

Best Regards,
Tony Wei

2018-02-06 17:25 GMT+08:00 Timo Walther <twal...@apache.org>:

> With heap-based state I meant state that is stored using the
> MemoryStateBackend or FsStateBackend [1]. In general, even if you are just
> using ValueState, the key might be used internally to store your value
> state in hash table.
>
> I think the migration should work in your case. Otherwise feel free to let
> us know.
>
> Regards,
> Timo
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/state/state_backends.html#the-memorystatebackend
>
>
> Am 2/6/18 um 8:54 AM schrieb Tony Wei:
>
> Hi Timo,
>
> Thanks for your response. I will implement equals for my POJO directly. Is
> that be okay instead of wrap it into another class?
> Furthermore, I want to migrate the states from the previous job. Will it
> lead to state lost? I run my job on Flink 1.4.0. I used RocksDBStateBackend
> and only ValueState as key state.
>
> BTW, could you please give more explanations about what heap-based state
> is? Since I'm not familiar with the details below the state
> implementations, it will be great if you can share more technical details
> or some references to me. Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-02-06 15:24 GMT+08:00 Timo Walther <twal...@apache.org>:
>
>> Hi Tony,
>>
>> not having a proper equals() method might work for a keyBy()
>> (partitioning operation) but it can lead to unexpected side effects when
>> dealing with state. If not now, then maybe in the future. For example,
>> heap-based state uses a hash table data structures such that your key might
>> never be found again. I would recommend to wrap your POJO into another
>> class that implements a proper hashCode/equals.
>>
>> Regards,
>> Timo
>>
>>
>> Am 2/6/18 um 4:16 AM schrieb Tony Wei:
>>
>> Hi all,
>>>
>>> I have defined a POJO class that override Object#hashCode and used it in
>>> keyBy().
>>> The pipeline looks good (i.e. no exception that told me it is
>>> UNSUPPORTED key types), but I'm afraid that it will leads to a problem that
>>> elements that I think have the same key will not get the same state because
>>> I didn't override Object#equals.
>>>
>>> Is it necessary that POJO key type overrides Object#equals? Or
>>> PojoTypeInfo didn't rely on MyClass#equals? Or keyBy() didn't rely on
>>> equals?
>>>
>>> Thank you very much.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>
>>
>>
>
>


Re: Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Tony Wei
Hi Timo,

Thanks for your response. I will implement equals for my POJO directly. Is
that be okay instead of wrap it into another class?
Furthermore, I want to migrate the states from the previous job. Will it
lead to state lost? I run my job on Flink 1.4.0. I used RocksDBStateBackend
and only ValueState as key state.

BTW, could you please give more explanations about what heap-based state
is? Since I'm not familiar with the details below the state
implementations, it will be great if you can share more technical details
or some references to me. Thank you!

Best Regards,
Tony Wei

2018-02-06 15:24 GMT+08:00 Timo Walther <twal...@apache.org>:

> Hi Tony,
>
> not having a proper equals() method might work for a keyBy() (partitioning
> operation) but it can lead to unexpected side effects when dealing with
> state. If not now, then maybe in the future. For example, heap-based state
> uses a hash table data structures such that your key might never be found
> again. I would recommend to wrap your POJO into another class that
> implements a proper hashCode/equals.
>
> Regards,
> Timo
>
>
> Am 2/6/18 um 4:16 AM schrieb Tony Wei:
>
> Hi all,
>>
>> I have defined a POJO class that override Object#hashCode and used it in
>> keyBy().
>> The pipeline looks good (i.e. no exception that told me it is UNSUPPORTED
>> key types), but I'm afraid that it will leads to a problem that elements
>> that I think have the same key will not get the same state because I didn't
>> override Object#equals.
>>
>> Is it necessary that POJO key type overrides Object#equals? Or
>> PojoTypeInfo didn't rely on MyClass#equals? Or keyBy() didn't rely on
>> equals?
>>
>> Thank you very much.
>>
>> Best Regards,
>> Tony Wei
>>
>
>
>


Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Tony Wei
Hi all,

I have defined a POJO class that override Object#hashCode and used it in
keyBy().
The pipeline looks good (i.e. no exception that told me it is UNSUPPORTED
key types), but I'm afraid that it will leads to a problem that elements
that I think have the same key will not get the same state because I didn't
override Object#equals.

Is it necessary that POJO key type overrides Object#equals? Or PojoTypeInfo
didn't rely on MyClass#equals? Or keyBy() didn't rely on equals?

Thank you very much.

Best Regards,
Tony Wei


Re: does the flink sink only support bio?

2018-01-08 Thread Tony Wei
Hi Stefan,

Your reply really helps me a lot. Thank you.

2018-01-08 19:38 GMT+08:00 Stefan Richter :

> Hi,
>
> 1.  If `snapshotState` failed at the first checkpoint, does it mean there
> is no state and no transaction can be aborted by default?
>
>
> This is a general problem and not only limited to the first checkpoint.
> Whenever you open a transaction, there is no guaranteed way to store it in
> persistent state to abort it in case of failure. In theory, your job can
> crash at any point after you just opened a transaction. So in the end I
> guess you must rely on something like e.g. timeout based mechanism. You can
> do some _best effort_ to proactively cancel uncommitted transactions
> through methods like states, listing them in files, or having a fixed pool
> of transaction ids and iterate them all for cancellation on a restart.
>
> 2. I saw FlinkKafkaProducer011 has a transaction id pool, which has
> multiple ids to be reused by producer, and it aborts all ids in this pool
> in the `initializeState`. Is this pool designed for the situation in the
> first problem or something I haven't noticed?
>
>
> This implementation is very specific for KafkaProducer and is not
> necessarily a good blueprint for what you are planning. In particular, in
> this case there is a fixed and limited universe of all potential
> transaction ids that a task can potentially (re)use, so after a restart
> without state we can simply iterate all possible transaction ids and issue
> a cancel for all of them. In general, you don’t always know all possible
> transaction ids in a way that allows you to opportunistically cancel all
> potential orphaned transactions.
>
> 2018-01-04 22:15 GMT+08:00 Stefan Richter :
>
>> Yes, that is how it works.
>>
>> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo :
>> >
>> > The TwoPhaseCommitSinkFunction seems to record the transaction status
>> > in the state just like what I imagine above, correct?
>> > and if the progress fails before commit, in the later restart, the
>> > commit would be triggered again, correct? So the commit would not be
>> > forgotten, correct?
>> >
>> > 2018-01-03 22:54 GMT+08:00 Stefan Richter > >:
>> >> I think a mix of async UPDATES and exactly-once all this might be
>> tricky,
>> >> and the typical use case for async IO is more about reads. So let’s
>> take a
>> >> step back: what would you like to achieve with this? Do you want a
>> >> read-modify-update (e.g. a map function that queries and updates a DB)
>> or
>> >> just updates (like a sink based that goes against a DB). From the
>> previous
>> >> question, I assume the second case applies, in which case I wonder why
>> you
>> >> even need to be async for a sink? I think a much better approach could
>> be
>> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> >> batching to lower update costs.
>> >>
>> >> On top of the TwoPhaseCommitSinkFunction, you could implement
>> transactions
>> >> against your DB, similar to e.g. this example with Postgres:
>> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transa
>> ction-manager-that-works-with-postgresql/
>> >> .
>> >>
>> >> Does this help or do you really need async read-modify-update?
>> >>
>> >> Best,
>> >> Stefan
>> >>
>> >>
>> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo :
>> >>
>> >> No, I mean how to implement exactly-once db commit (given our async io
>> >> target is mysql), not the state used by flink.
>> >> As mentioned in previous mail, if I commit db in
>> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> >> and flink restart would not trigger notifyCheckpointComplete for the
>> >> last checkpoint again).
>> >> On the other hand, if I update and commit per record, the sql/stored
>> >> procedure have to handle duplicate updates at failure restart.
>> >>
>> >> So, when or where to commit so that we could get exactly-once db
>> ingress.
>> >>
>> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter > >:
>> >>
>> >>
>> >> Hi,
>> >>
>> >>
>> >> Then how to implement exactly-once async io? That is, neither missing
>> >> data or duplicating data.
>> >>
>> >>
>> >> From the docs about async IO here
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/stream/asyncio.html
>> >> :
>> >>
>> >> "Fault Tolerance Guarantees:
>> >> The asynchronous I/O operator offers full exactly-once fault tolerance
>> >> guarantees. It stores the records for in-flight asynchronous requests
>> in
>> >> checkpoints and restores/re-triggers the requests when recovering from
>> a
>> >> failure.“
>> >>
>> >> So it is already handled by Flink in a way that supports exactly-once.
>> >>
>> >> Is there some way to index data by checkpoint id and records which
>> >> checkpoints already commit to db? But that means we need MapState,
>> >> right?
>> >>
>> >>
>> >> The 

Re: does the flink sink only support bio?

2018-01-08 Thread Tony Wei
Hi Stefan,

Since TwoPhaseCommitSinkFunction is new to me, I would like to know more
details.

There are two more questions:
1.  If `snapshotState` failed at the first checkpoint, does it mean there
is no state and no transaction can be aborted by default?
2. I saw FlinkKafkaProducer011 has a transaction id pool, which has
multiple ids to be reused by producer, and it aborts all ids in this pool
in the `initializeState`. Is this pool designed for the situation in the
first problem or something I haven't noticed?

Thank you.

Best Regards,
Tony Wei

2018-01-04 22:15 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Yes, that is how it works.
>
> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo <luajit...@gmail.com>:
> >
> > The TwoPhaseCommitSinkFunction seems to record the transaction status
> > in the state just like what I imagine above, correct?
> > and if the progress fails before commit, in the later restart, the
> > commit would be triggered again, correct? So the commit would not be
> > forgotten, correct?
> >
> > 2018-01-03 22:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
> >> I think a mix of async UPDATES and exactly-once all this might be
> tricky,
> >> and the typical use case for async IO is more about reads. So let’s
> take a
> >> step back: what would you like to achieve with this? Do you want a
> >> read-modify-update (e.g. a map function that queries and updates a DB)
> or
> >> just updates (like a sink based that goes against a DB). From the
> previous
> >> question, I assume the second case applies, in which case I wonder why
> you
> >> even need to be async for a sink? I think a much better approach could
> be
> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
> >> batching to lower update costs.
> >>
> >> On top of the TwoPhaseCommitSinkFunction, you could implement
> transactions
> >> against your DB, similar to e.g. this example with Postgres:
> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-
> transaction-manager-that-works-with-postgresql/
> >> .
> >>
> >> Does this help or do you really need async read-modify-update?
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <luajit...@gmail.com>:
> >>
> >> No, I mean how to implement exactly-once db commit (given our async io
> >> target is mysql), not the state used by flink.
> >> As mentioned in previous mail, if I commit db in
> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> >> and flink restart would not trigger notifyCheckpointComplete for the
> >> last checkpoint again).
> >> On the other hand, if I update and commit per record, the sql/stored
> >> procedure have to handle duplicate updates at failure restart.
> >>
> >> So, when or where to commit so that we could get exactly-once db
> ingress.
> >>
> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
> >:
> >>
> >>
> >> Hi,
> >>
> >>
> >> Then how to implement exactly-once async io? That is, neither missing
> >> data or duplicating data.
> >>
> >>
> >> From the docs about async IO here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> asyncio.html
> >> :
> >>
> >> "Fault Tolerance Guarantees:
> >> The asynchronous I/O operator offers full exactly-once fault tolerance
> >> guarantees. It stores the records for in-flight asynchronous requests in
> >> checkpoints and restores/re-triggers the requests when recovering from a
> >> failure.“
> >>
> >> So it is already handled by Flink in a way that supports exactly-once.
> >>
> >> Is there some way to index data by checkpoint id and records which
> >> checkpoints already commit to db? But that means we need MapState,
> >> right?
> >>
> >>
> >> The information required depends a bit on the store that you are using,
> >> maybe the last confirmed checkpoint id is enough, but maybe you require
> >> something more. This transaction information is probably not „by-key“,
> but
> >> „per-operator“, so I would suggest to use operator state (see next
> answer).
> >> Btw the implementation of async operators does something very similar to
> >> restore pending requests, and you can see the code in
> „AsyncWaitOperator".
> >>
> >>
> >

Re: Using latency markers

2017-12-26 Thread Tony Wei
Hi,

I think FLINK-7692 won't solve this problem. It is just a new feature to
provide more functionality for user to use metric system.

Regards,
Tony Wei

2017-12-26 18:23 GMT+08:00 Marvin777 <xymaqingxiang...@gmail.com>:

> Hi,
>
> I just want to say we're having the same issues. Using latency markers
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-latency-markers-td14791.html#a14799>
>
> I get NaN in the Task metrics. The version is 1.3.1.
>
> Maybe I'm completely missing something...
>
> And there are questions about FLINK-7692
> <https://issues.apache.org/jira/browse/FLINK-7692>, it can Solve the
> problem above? What should I do in my version.
>
> thanks.
>


Re: user driven stream processing

2017-11-28 Thread Tony Wei
Hi KZ,

https://data-artisans.com/blog/real-time-fraud-detection-ing-bank-apache-flink
This article seems to be a good example to trigger a new calculation on a
running job. Maybe you can get some help from it.

Best Regards,
Tony Wei

2017-11-29 4:53 GMT+08:00 zanqing zhang <zanqingzh...@gmail.com>:

> Hi All,
>
> Has anyone done any stream processing driven by a user request? What's the
> recommended way of doing this? Or is this completely wrong direction to go
> for applications running on top of Flink?
>
> Basically we need to tweak the stream processing based on parameters
> provided by a user, e.g. show me the total # of application failures due to
> "ABC", which is provided by the user. We are thinking of starting a flink
> job with "ABC" as a parameter but this would result in a huge number of
> flink jobs, is there a better way for this? Can we trigger the calculation
> on a running job?
>
> Thanks in advance.
>
> KZ
>
>


kafka consumer client seems not auto commit offset

2017-11-15 Thread Tony Wei
Hi Gordon,

When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that
if I used `setStartFromLatest()` the kafka consumer api didn't auto commit
offsets back to consumer group, but if I used `setStartFromGroupOffsets()`
it worked fine.

I am sure that the configuration for Kafka has `auto.commit.interval.ms =
5000` and `enable.auto.commit = true` and I didn't enable checkpointing.

All the difference is only the change from `setStartFromGroupOffsets()` to
`setStartFromLatest()`, but the auto commit mechanism just stopped working.

My Flink cluster version is 1.3.2.
My Kafka cluster version is 0.10.2.1.
My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09
GMT.
My Kafka connector library is "org.apache.flink" %
"flink-connector-kafka-0.10_2.10" % "1.3.2"

Thanks for your help in advance.

Best Regards,
Tony Wei


Re: Broadcast to all the other operators

2017-11-09 Thread Tony Wei
Hi Sadok,

The sample code is just an example to show you how to broadcast the rules
to all subtasks, but the output from CoFlatMap is not necessary to be
Tuple2<Rule, Record>. It depends on what you actually need in your Rule
Engine project.
For example, if you can apply rule on each record directly, you can emit
processed records to keyed operator.
IMHO, the scenario in the article you mentioned is having serval
well-prepared rules to enrich data, and using DSL files to decide what
rules that incoming event needs. After enriching, the features for the
particular event will be grouped by its random id and be calculated by the
models.
I think this approach might be close to the solution in that article, but
it could have some difference according to different use cases.

Best Regards,
Tony Wei


2017-11-09 17:27 GMT+08:00 Ladhari Sadok <laadhari.sa...@gmail.com>:

>
> -- Forwarded message --
> From: Ladhari Sadok <laadhari.sa...@gmail.com>
> Date: 2017-11-09 10:26 GMT+01:00
> Subject: Re: Broadcast to all the other operators
> To: Tony Wei <tony19920...@gmail.com>
>
>
> Thanks Tony for your very fast answer ,
>
> Yes it resolves my problem that way, but with flatMap I will get
> Tuple2<Rule, Record> always in the processing function ( in
> case of no rules update available and <newRule,Record> in the other case ).
> There is no optimization of this solution ? Do you think it is the same
> solution in this picture : https://data-artisans.com/wp-c
> ontent/uploads/2017/10/streaming-in-definitions.png ?
>
> Best regards,
> Sadok
>
>
> Le 9 nov. 2017 9:21 AM, "Tony Wei" <tony19920...@gmail.com> a écrit :
>
> Hi Sadok,
>
> What I mean is to keep the rules in the operator state. The event in Rule
> Stream is just the change log about rules.
> For more specific, you can fetch the rules from Redis in the open step of
> CoFlatMap and keep them in the operator state, then use Rule Stream to
> notify the CoFlatMap to 1. update some rules or 2. refetch all rules from
> Redis.
> Is that what you want?
>
> Best Regards,
> Tony Wei
>
> 2017-11-09 15:52 GMT+08:00 Ladhari Sadok <laadhari.sa...@gmail.com>:
>
>> Thank you for the answer, I know that solution, but I don't want to
>> stream the rules all time.
>> In my case I have the rules in Redis and at startup of flink they are
>> loaded.
>>
>> I want to broadcast changes just when it occurs.
>>
>> Thanks.
>>
>> Le 9 nov. 2017 7:51 AM, "Tony Wei" <tony19920...@gmail.com> a écrit :
>>
>>> Hi Sadok,
>>>
>>> Since you want to broadcast Rule Stream to all subtasks, it seems that
>>> it is not necessary to use KeyedStream.
>>> How about use broadcast partitioner, connect two streams to attach the
>>> rule on each record or imply rule on them directly, and do the key operator
>>> after that?
>>> If you need to do key operator and apply the rules, it should work by
>>> changing the order.
>>>
>>> The code might be something like this, and you can change the rules'
>>> state in the CoFlatMapFunction.
>>>
>>> DataStream rules = ...;
>>> DataStream records = ...;
>>> DataStream<Tuple2<Rule, Record>> recordWithRule =
>>> rules.broadcast().connect(records).flatMap(...);
>>> dataWithRule.keyBy(...).process(...);
>>>
>>> Hope this will make sense to you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-11-09 6:25 GMT+08:00 Ladhari Sadok <laadhari.sa...@gmail.com>:
>>>
>>>> Hello,
>>>>
>>>> I'm working on Rules Engine project with Flink 1.3, in this project I
>>>> want to update some keyed operator state when external event occurred.
>>>>
>>>> I have a Datastream of updates (from kafka) I want to broadcast the
>>>> data contained in this stream to all keyed operator so I can change the
>>>> state in all operators.
>>>>
>>>> It is like this use case :
>>>> Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
>>>> ng-in-definitions.png
>>>> All article : https://data-artisans.com/blog
>>>> /real-time-fraud-detection-ing-bank-apache-flink
>>>>
>>>> I founded it in the DataSet API but not in the DataStream API !
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/batch/index.html#broadcast-variables
>>>>
>>>> Can some one explain to me who to solve this problem ?
>>>>
>>>> Thanks a lot.
>>>>
>>>> Flinkly regards,
>>>> Sadok
>>>>
>>>
>>>
>
>
>


Re: Broadcast to all the other operators

2017-11-09 Thread Tony Wei
Hi Sadok,

What I mean is to keep the rules in the operator state. The event in Rule
Stream is just the change log about rules.
For more specific, you can fetch the rules from Redis in the open step of
CoFlatMap and keep them in the operator state, then use Rule Stream to
notify the CoFlatMap to 1. update some rules or 2. refetch all rules from
Redis.
Is that what you want?

Best Regards,
Tony Wei

2017-11-09 15:52 GMT+08:00 Ladhari Sadok <laadhari.sa...@gmail.com>:

> Thank you for the answer, I know that solution, but I don't want to stream
> the rules all time.
> In my case I have the rules in Redis and at startup of flink they are
> loaded.
>
> I want to broadcast changes just when it occurs.
>
> Thanks.
>
> Le 9 nov. 2017 7:51 AM, "Tony Wei" <tony19920...@gmail.com> a écrit :
>
>> Hi Sadok,
>>
>> Since you want to broadcast Rule Stream to all subtasks, it seems that it
>> is not necessary to use KeyedStream.
>> How about use broadcast partitioner, connect two streams to attach the
>> rule on each record or imply rule on them directly, and do the key operator
>> after that?
>> If you need to do key operator and apply the rules, it should work by
>> changing the order.
>>
>> The code might be something like this, and you can change the rules'
>> state in the CoFlatMapFunction.
>>
>> DataStream rules = ...;
>> DataStream records = ...;
>> DataStream<Tuple2<Rule, Record>> recordWithRule =
>> rules.broadcast().connect(records).flatMap(...);
>> dataWithRule.keyBy(...).process(...);
>>
>> Hope this will make sense to you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-11-09 6:25 GMT+08:00 Ladhari Sadok <laadhari.sa...@gmail.com>:
>>
>>> Hello,
>>>
>>> I'm working on Rules Engine project with Flink 1.3, in this project I
>>> want to update some keyed operator state when external event occurred.
>>>
>>> I have a Datastream of updates (from kafka) I want to broadcast the data
>>> contained in this stream to all keyed operator so I can change the state in
>>> all operators.
>>>
>>> It is like this use case :
>>> Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
>>> ng-in-definitions.png
>>> All article : https://data-artisans.com/blog
>>> /real-time-fraud-detection-ing-bank-apache-flink
>>>
>>> I founded it in the DataSet API but not in the DataStream API !
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#broadcast-variables
>>>
>>> Can some one explain to me who to solve this problem ?
>>>
>>> Thanks a lot.
>>>
>>> Flinkly regards,
>>> Sadok
>>>
>>
>>


Re: Broadcast to all the other operators

2017-11-08 Thread Tony Wei
Hi Sadok,

Since you want to broadcast Rule Stream to all subtasks, it seems that it
is not necessary to use KeyedStream.
How about use broadcast partitioner, connect two streams to attach the rule
on each record or imply rule on them directly, and do the key operator
after that?
If you need to do key operator and apply the rules, it should work by
changing the order.

The code might be something like this, and you can change the rules' state
in the CoFlatMapFunction.

DataStream rules = ...;
DataStream records = ...;
DataStream<Tuple2<Rule, Record>> recordWithRule =
rules.broadcast().connect(records).flatMap(...);
dataWithRule.keyBy(...).process(...);

Hope this will make sense to you.

Best Regards,
Tony Wei

2017-11-09 6:25 GMT+08:00 Ladhari Sadok <laadhari.sa...@gmail.com>:

> Hello,
>
> I'm working on Rules Engine project with Flink 1.3, in this project I want
> to update some keyed operator state when external event occurred.
>
> I have a Datastream of updates (from kafka) I want to broadcast the data
> contained in this stream to all keyed operator so I can change the state in
> all operators.
>
> It is like this use case :
> Image : https://data-artisans.com/wp-content/uploads/2017/10/
> streaming-in-definitions.png
> All article : https://data-artisans.com/blog/real-time-fraud-
> detection-ing-bank-apache-flink
>
> I founded it in the DataSet API but not in the DataStream API !
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#broadcast-variables
>
> Can some one explain to me who to solve this problem ?
>
> Thanks a lot.
>
> Flinkly regards,
> Sadok
>


Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-10-30 Thread Tony Wei
Hi Max,

The way that Flink to assign key to which subtask is based on
`KeyGroupRangeAssignment.assignKeyToParallelOperator`.
Its first step is to assign key to a key group based on the max parallelism
[2]. Then, assign each key group to a specific subtask based on the current
parallelism [3].

The question that you asked is if the keyBy in Flink is deterministic. I
think the answer is yes, but the problem is that assignment to key group is
not just `obj.hashCode()`, but `murmurhash(obj.hashCode())` instead.
If you can know the output from murmurhash on the each object, you can
determine which subtask that operator will go to.
I'm not sure if this is a good solution and I am also wondering if it can
be fulfilled.

Best Regards,
Tony Wei

[1]
https://github.com/apache/flink/blob/0182141d41be52ae0cf4a3563d4b8c6f3daca02e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L47
[2]
https://github.com/apache/flink/blob/0182141d41be52ae0cf4a3563d4b8c6f3daca02e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L58
[3]
https://github.com/apache/flink/blob/0182141d41be52ae0cf4a3563d4b8c6f3daca02e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L115

2017-10-30 23:20 GMT+08:00 m@xi <makisnt...@gmail.com>:

> Hi all,
>
> After trying to understand exactly how keyBy works internally, I did not
> get
> anything more than "it applies obj.hashcode() % n", where n is the number
> of
> tasks/processors.
>
> This post for example
> https://stackoverflow.com/questions/45062061/why-is-
> keyed-stream-on-a-keyby-creating-skewed-downstream-execution,
> suggest to implement a KeySelector and write our own hashcode function.
> Though none of the above is clear, especially the hashcode part.
>
> I am running a pc with 4 slots/processors and I would like to hash each
> record based on a certain field to a specific processor. Ideally, lets say
> that the 4 processors have ids: 0, 1, 2, 3. Then I would like to send the
> tuples whose (key % 4) = 0 to the proc with id 0,  (key % 4) = 1 to the
> proc
> with id 1 etc etc.
>
> I would like to know exactly to which processor/task each tuple goes.
> Can I do that deterministically with keyBy in Flink??
>
> Thanks in advance.
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: count and window question with kafka

2017-10-30 Thread Tony Wei
Hi,

I think ProcessFunction[1] is what you want. You can add it after keyBy and
emit the result to sink after timeout or buffer filled.
The reference has a good example that show you how to use it.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html


2017-10-30 23:56 GMT+08:00 Telco Phone <tel...@yahoo.com>:

> I have a process that will take 250,000 records from kafka and produce a
> file. (Using a CustomFileSync)
>
> Currently I just have the following:
>
>
> DataStream stream =
> env.addSource(new FlinkKafkaConsumer010("topic"", schema,
> properties)).setParallelism(40).flatMap(new SchemaRecordSplit()).
> setParallelism(40).name("Splitter").keyBy("partition", "randomkey",
> "schemaId");
>
> stream.addSink(new CustomFileSystemSink()).setParallelism(40);
>
>
> In my CustomFileSystemSink I have a for..next loop which closes the file
> off at 250K rows.
>
>
> What I am looking to do is to close off the file every 5 min OR 250K
> rows...
>
>
> As I read the window types is it possible to read from kafka and have the
> sink close every 5 min OR 250K rows ?
>
> Hope this makes sense
>
>
>
>
>


Re: StreamTransformation object

2017-10-27 Thread Tony Wei
Hi Andrea,

I don't think you need to touch `StreamTransformation`. If you can get the
result from build(), you can do the same thing I mentioned above: casting
it as SingleOutputStreamOperator.
Then, you can pass it to select function to add the next operator, and get
the result to add another slotSharing group.

Best Regards,
Tony Wei

2017-10-27 17:18 GMT+08:00 AndreaKinn <kinn6...@hotmail.it>:

> I'm trying to create an API to get results from build() but anyway I'm
> stuck
> on the StreamTransformation which is essential to implement
> slotSharingGroup(...). I have to provide it from the main class.
>
>
> Tony Wei wrote
> > Hi Andrea,
> >
> > The `learn` operator is defined in this method [1]. If you need to set
> its
> > slotSharing group, you should add `slotSharingGroup(...)` behind line 97
> > [2] or a new API to get the result from `inferenceStreamBuilder.build(
> )`.
> >
> > Best Regards,
> > Tony Wei
> >
> > [1]
> > https://github.com/htm-community/flink-htm/blob/
> master/flink-htm-streaming-java/src/main/java/org/
> numenta/nupic/flink/streaming/api/HTMStream.java#L148
> > [2]
> > https://github.com/htm-community/flink-htm/blob/
> master/flink-htm-streaming-java/src/main/java/org/
> numenta/nupic/flink/streaming/api/HTMStream.java#L97
> >
> > 2017-10-26 17:36 GMT+08:00 AndreaKinn 
>
> > kinn6aer@
>
> > :
> >
> >> Can you be clearer about this part?
> >>
> >> I'm really appreciating your help
> >>
> >>
> >> Tony Wei wrote
> >> > you need to refactor `HTMStream` to expose
> >> > `InferenceStreamBuilder.build()`.
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: StreamTransformation object

2017-10-26 Thread Tony Wei
Hi Andrea,

The `learn` operator is defined in this method [1]. If you need to set its
slotSharing group, you should add `slotSharingGroup(...)` behind line 97
[2] or a new API to get the result from `inferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

[1]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148
[2]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97

2017-10-26 17:36 GMT+08:00 AndreaKinn <kinn6...@hotmail.it>:

> Can you be clearer about this part?
>
> I'm really appreciating your help
>
>
> Tony Wei wrote
> > you need to refactor `HTMStream` to expose
> > `InferenceStreamBuilder.build()`.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: StreamTransformation object

2017-10-26 Thread Tony Wei
Hi Andrea,

In this way, you will only set a slotSharing group on select operator and
learn operator will remain in the default group.
If you want to set lean operator as well, I am afraid that you need to
refactor `HTMStream` to expose `InferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

2017-10-26 17:01 GMT+08:00 AndreaKinn <kinn6...@hotmail.it>:

> Mmm looks good. This solution would be great.
> In this way am I setting a slotSharing group for both learn and select
> method and not only on select?
> I believed I need to call slotSharingGroup exactly on the return type of
> learn.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


  1   2   >