Re: How to debug checkpoints failing to complete

2020-03-27 Thread Congxian Qiu
Hi

>From my experience, you can first check the jobmanager.log, find out
whether the checkpoint expired or was declined by some task, if expired,
you can follow the adivce of seeksst given above(maybe enable debug log can
help you here), if was declined, then you can go to the taskmanager.log to
find out the reason.
Best,
Congxian


David Anderson  于2020年3月25日周三 下午11:21写道:

> seeksst has already covered many of the relevant points, but a few more
> thoughts:
>
> I would start by checking to see if the checkpoints are failing because
> they timeout, or for some other reason. Assuming they are timing out, then
> a good place to start is to look and see if this can be explained by data
> skew (which you can see in the metrics in the Flink dashboard). Common
> causes of data skew include hot key(s), and joins between streams where one
> stream is significantly behind the other.
>
> Another likely cause of checkpoint troubles is back pressure, which is
> most often caused by slow or unavailable connections between flink and
> external systems, such as sinks, async i/o operators, filesystems, network,
> etc.
>
> --david
>
>
>
> On Tue, Mar 24, 2020 at 2:59 AM seeksst  wrote:
>
>> Hi:
>> according to my experience, there are several possible reasons for
>> checkpoint fail.
>> 1. if you use rocksdb as backend, insufficient disk will cause it.
>> because file save on local disk, and you may see a exception.
>> 2. Sink can’t be written. all parallelism can’t be complete, and there is
>> often no phenomenon.
>> 3. Back Pressure. data skew will cause one subtask take on more
>> calculations, so checkpoint can’t be finish.
>> Here is my advice:
>>  1. learn more about checkpoint work.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/internals/stream_checkpointing.html
>> 2. try to test back pressure.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/back_pressure.html
>> 3. if there is no data skew, you can set more parallelism, or you can
>> ajust checkpoint parameter.
>> In my computer, I have hadoop environment. so i commit job to yarn, i
>> can use dashboard to test pressure.
>>
>> On 2020/03/23 15:14:33, Stephen Connolly  wrote:
>> > We have a topology and the checkpoints fail to complete a *lot* of the
>> time.>
>> >
>> > Typically it is just one subtask that fails.>
>> >
>> > We have a parallelism of 2 on this topology at present and the other>
>> > subtask will complete in 3ms though the end to end duration on the
>> rare>
>> > times when the checkpointing completes is like 4m30>
>> >
>> > How can I start debugging this? When I run locally on my development>
>> > cluster I have no issues, the issues only seem to show in production.>
>> >
>>
>


Re: End to End Latency Tracking in flink

2020-03-27 Thread Congxian Qiu
Hi
As far as I know, the latency-tracking feature is for debugging usages, you
can use it to debug, and disable it when running the job on production.
>From my side, use $current_processing - $event_time is something ok, but
keep the things in mind: the event time may not be the time ingested in
Flink.

Best,
Congxian


Lu Niu  于2020年3月28日周六 上午6:25写道:

> Hi,
>
> I am looking for end to end latency monitoring of link job. Based on my
> study, I have two options:
>
> 1. flink provide a latency tracking feature. However, the documentation
> says it cannot show actual latency of business logic as it will bypass all
> operators.
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking
>  Also,
> the feature can significantly impact the performance so I assume it's not
> for usage in production. What are users use the latency tracking for?
> Sounds like only back pressure could affect the latency.
>
> 2. I found another stackoverflow question on this.
> https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application
>  .
> The answer suggestion to expose (current processing - the event time)
> after source and before sink for end to end latency monitoring. Is this a
> good solution? If not, What’s the official solution for end to end latency
> tracking?
>
> Thank you!
>
> Best
> Lu
>
>


Re: flink 1.9 状态后端为FsStateBackend,修改checkpoint时出现警告

2020-03-27 Thread Congxian Qiu
Hi
从报错来看,你用 StateProcessAPI,StateProcessAPI 的某些 function(这里是
getMetricGroup) 不提供支持,因此会有这个提示,如果你没有显示调用这个 function 的话,那可能是个 bug

Best,
Congxian


guanyq  于2020年3月25日周三 下午3:24写道:

>
>
>
>
>
>
>


Re: Flink 1.10: 关于 RocksDBStateBackend In background 状态清理机制的问题

2020-03-27 Thread Congxian Qiu
Hi

这个地方我理解是,每次处理一定数量的 StateEntry 之后,会获取当前的 timestamp 然后在 RocksDB 的 compaction
时对所有的 StateEntry 进行 filter。
> Calling of TTL filter during compaction slows it down.

Best,
Congxian


LakeShen  于2020年3月26日周四 下午8:55写道:

> Hi 社区的小伙伴,
>
> 我现在有个关于 Flink 1.10 RocksDBStateBackend 状态清理机制的问题,在 1.10中,RocksDB 默认使用 in
> background 方式进行状态清理,使用 compaction filter 方式。正如官方文档所说:
>
> > RocksDB compaction filter will query current timestamp, used to check
> > expiration, from Flink every time after processing certain number of
> state
> > entries.
>
>
> 现在有个疑问,RocksDB 在处理一定数量的 State Entrys 就会进行 compaction filter,那么这个 compaction
> filter 是针对这一定数量 State Entrys ,然后检查他们是否过期吗?
> 还是说,会针对一个 Task 当前所有的状态文件,统一进行 Compaction filter,在合并时,检查每个 entry,过期的状态 Key
> 就过滤删除掉。
>
> 这个地方我没有弄明白,非常期待你的回复。
>
> Best wishes,
> 沈磊
>


Re: flinksql如何控制结果输出的频率

2020-03-27 Thread Benchao Li
Jark, 这个功能我们用的还挺多的~
现在还有个痛点是window operator不支持retract输入,所以用了emit就没有办法做到窗口级联使用了。

Jark Wu  于2020年3月27日周五 下午8:01写道:

> Benchao 可以啊。这么隐秘的实验性功能都被你发现了 :D
>
> On Fri, 27 Mar 2020 at 15:24, Benchao Li  wrote:
>
> > Hi,
> >
> > 对于第二个场景,可以尝试一下fast emit:
> > table.exec.emit.early-fire.enabled = true
> > table.exec.emit.early-fire.delay = 5min
> >
> > PS:
> > 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> > 2. window加了emit之后,会由原来输出append结果变成输出retract结果
> >
> > Jingsong Li  于2020年3月27日周五 下午2:51写道:
> >
> > > Hi,
> > >
> > > For #1:
> > > 创建级联的两级window:
> > > - 1分钟窗口
> > > - 5分钟窗口,计算只是保存数据,发送明细数据结果
> > >
> > > Best,
> > > Jingsong Lee
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink savepoint问题

2020-03-27 Thread Congxian Qiu
Hi

对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign
Checkpoint 可以解决反压情况下的 checkpoint
对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成
snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。
[1] https://issues.apache.org/jira/browse/FLINK-14551
Best,
Congxian


大数据开发面试_夏永权  于2020年3月27日周五 下午4:19写道:

> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!
>
> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id
> 在程序有背压的时候停不掉
>
>
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job
> 1f768e4ca9ad5792a4844a5d12163b73.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
> ... 9 more
> stop flink job failed!!!
>
>
>
>
> 2.再用flink
> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)
>
>
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at
> com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 11 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
> at
> 

Re: flinksql如何控制结果输出的频率

2020-03-27 Thread Tianwang Li
第一个场景: 从SQL的角度,增加时间字段精确到分钟为key,格式如-MM-dd HH:mm。这样是不是就可以实现你要到效果了。

flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道:

> 我有两个需求
> 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
> 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?



-- 
**
 tivanli
**


回复:Flink1.10执行sql超出内存限制被yarn杀掉

2020-03-27 Thread faaron zheng
Hi,感谢大家的回复,经过我的分析和测试,我猜测是和taskmanager.network.blocking-shuffle.type=mmap 
有关。我看了下监控,Mappred占用的内存会逐渐升至20多G甚至更高,从而导致超过yarn的限制被杀掉。另一方面,如果我配置成taskmanager.network.blocking-shuffle.type=file,监控Mappred一直为0,最后报错会是OutOfMemoryError:direct
 buffer memory 说明mmap和file用的是不同的存储。 
我有还有两个疑问,一:file模式用的是direct中哪一部分memory。二:对于单表几个T的这种情况,两种模式如何降低内存不够的问题。  
原始邮件  发件人: Xintong Song  日期: 2020年3月24日周二 上午9:58 
收件人: user-zh  主 题: Re: Flink1.10执行sql超出内存限制被yarn杀掉 Hi 
Faaron, 内存超用被杀说明是 native memory 用的比实际配置多,常见有以下几种可能:    - JVM Overhead 
配置大小不够。这个默认大小是 TM 大小的 10%,但是不会超过 1G。你的情况是 TM    
的总内存比较大,可以尝试调大一点。相关配置项:taskmanager.memory.jvm-overhead.[min|max|fraction]    - 
UDF 中使用了 native memory,可能是用户代码,也可能是依赖的第三方库。这种属于 task off-heap 内存,默认大小是    
0,相关配置项:taskmanager.memory.task.off-heap.size    - 如果使用了 
RocksDBStateBackend,也有可能 RocksDB 的内存超用。Flink 会设置 RocksDB    使用的缓存大小为 managed 
memory 大小,但是我们发现 RocksDB 存在缺陷,在极个别情况下有可能会限制不住。可以尝试关闭    RocksDB 的内存控制,这样 
RocksDB 会使用默认缓存大小,不会随着 Flink TM    
的增大而增大。配置项:state.backend.rocksdb.memory.managed Thank you~ Xintong Song On Mon, 
Mar 23, 2020 at 10:15 PM LakeShen  wrote: > Hi 
farron , > > 能否在详细描述一下你的 SQL 的逻辑 > > > > faaron zheng  
于2020年3月23日周一 下午10:12写道: > > > > > > 
大家好,我在用flink1.10执行sql时,当数据比较大的时候,3T左右,100多亿条数据,在执行hash和sort的时候经常超出内存限制,被yarn杀掉,我的tm给了40g内存,每个有10个slot,每个slot3g内存。我也试过给更大的内存,但是没什么效果。不知道这是什么原因?
 > > > > > > > > > faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制

End to End Latency Tracking in flink

2020-03-27 Thread Lu Niu
Hi,

I am looking for end to end latency monitoring of link job. Based on my
study, I have two options:

1. flink provide a latency tracking feature. However, the documentation
says it cannot show actual latency of business logic as it will bypass all
operators.
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking
Also,
the feature can significantly impact the performance so I assume it's not
for usage in production. What are users use the latency tracking for?
Sounds like only back pressure could affect the latency.

2. I found another stackoverflow question on this.
https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application
.
The answer suggestion to expose (current processing - the event time) after
source and before sink for end to end latency monitoring. Is this a good
solution? If not, What’s the official solution for end to end latency
tracking?

Thank you!

Best
Lu


Re: Testing RichAsyncFunction with TestHarness

2020-03-27 Thread KristoffSC
I've debug it a little bit and I found that it fails in
InstantiationUtil.readObjectFromConfig method when we execute
byte[] bytes = config.getBytes(key, (byte[])null);  This returns null.

The key that it is looking for is "edgesInOrder". In the config map, there
are only two entries though. 
For "checkpointing -> {Boolean@6347} true" and "operatorID ->
{byte[16]@6351} "




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Testing RichAsyncFunction with TestHarness

2020-03-27 Thread KristoffSC
Hi,
Im trying to test my RichAsyncFunction implementation with
OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2

My test setup is:
 this.processFunction = new MyRichAsyncFunction();
this.testHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.ORDERED));

this.testHarness.open();

I'm having below exception when calling  this.testHarness.open();

java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.(StreamElementSerializer.java:64)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483)


I will appreciate help with this one.

Additionally even though I add all necessary dependencies defiend in [1] I
cannot see ProcessFunctionTestHarnesses class.

Thanks.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Support of custom JDBC dialects in flink-jdbc

2020-03-27 Thread Dongwon Kim
Hi Jark,
Many thanks for creating the issue on Jira and nice summarization :-)

Best,
Dongwon

On Sat, Mar 28, 2020 at 12:37 AM Jark Wu  wrote:

> Hi Dongwon,
>
> I saw many requirements on this and I'm big +1 for this.
> I created https://issues.apache.org/jira/browse/FLINK-16833 to track this
> effort. Hope this can be done before 1.11 release.
>
> Best,
> Jark
>
> On Fri, 27 Mar 2020 at 22:22, Dongwon Kim  wrote:
>
>> Hi, I tried flink-jdbc [1] to read data from Druid because Druid
>> implements Calcite Avatica [2], but the connection string,
>> jdbc:avatica:remote:url=http://BROKER:8082/druid/v2/sql/avatica/, is not
>> supported by any of JDBCDialects [3].
>>
>> I implement custom JDBCDialect [4], custom StreamTableSourceFactory [5],
>> etc in my own repository [6], but it seems overkill.
>>
>> How about supporting custom JDBC dialects in flink-jdbc based on SPI?
>>
>> Best,
>> - Dongwon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector
>> [2] https://druid.apache.org/docs/latest/querying/sql.html#jdbc
>> [3]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
>> [4]
>> https://github.com/eastcirclek/flink-avatica-source/blob/master/src/main/java/io/github/eastcirclek/AvaticaJDBCDialect.java
>> [5]
>> https://github.com/eastcirclek/flink-avatica-source/blob/master/src/main/java/io/github/eastcirclek/AvaticaJDBCSourceFactory.java
>> [6] https://github.com/eastcirclek/flink-avatica-source
>>
>


Re: Support of custom JDBC dialects in flink-jdbc

2020-03-27 Thread Jark Wu
Hi Dongwon,

I saw many requirements on this and I'm big +1 for this.
I created https://issues.apache.org/jira/browse/FLINK-16833 to track this
effort. Hope this can be done before 1.11 release.

Best,
Jark

On Fri, 27 Mar 2020 at 22:22, Dongwon Kim  wrote:

> Hi, I tried flink-jdbc [1] to read data from Druid because Druid
> implements Calcite Avatica [2], but the connection string,
> jdbc:avatica:remote:url=http://BROKER:8082/druid/v2/sql/avatica/, is not
> supported by any of JDBCDialects [3].
>
> I implement custom JDBCDialect [4], custom StreamTableSourceFactory [5],
> etc in my own repository [6], but it seems overkill.
>
> How about supporting custom JDBC dialects in flink-jdbc based on SPI?
>
> Best,
> - Dongwon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector
> [2] https://druid.apache.org/docs/latest/querying/sql.html#jdbc
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
> [4]
> https://github.com/eastcirclek/flink-avatica-source/blob/master/src/main/java/io/github/eastcirclek/AvaticaJDBCDialect.java
> [5]
> https://github.com/eastcirclek/flink-avatica-source/blob/master/src/main/java/io/github/eastcirclek/AvaticaJDBCSourceFactory.java
> [6] https://github.com/eastcirclek/flink-avatica-source
>


Re: Issue with Could not resolve ResourceManager address akka.tcp://flink

2020-03-27 Thread Yang Wang
Could you also check the jobmanager logs whether the flink akka is also
bound to
and listening at the hostname "prod-bigd-dn11"? Otherwise, all the package
from
taskmanager will be discarded.


Best,
Yang

Vitaliy Semochkin  于2020年3月27日周五 下午3:35写道:

> Hello Zhu,
>
> The host can be resolved and  there are no filewalls in the cluster, so
> all ports are opened.
>
> Regards,
> Vitaliy
>
> On Fri, Mar 27, 2020 at 8:32 AM Zhu Zhu  wrote:
>
>> Hi Vitaliy,
>>
>> >> *Cannot serve slot request, no ResourceManager connected*
>> This is not a problem, just that the JM needs RM to be connected to send
>> slot requests.
>>
>> >> *Could not resolve ResourceManager address
>> akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager*
>> This should be the root cause. Would you check whether the hostname
>> *prod-bigd-dn11* is resolvable? And whether the port 43757 of that
>> machine is permitted to be accessed?
>>
>> Thanks,
>> Zhu Zhu
>>
>> Vitaliy Semochkin  于2020年3月27日周五 上午1:54写道:
>>
>>> Hi,
>>>
>>> I'm facing an issue similar to
>>> https://issues.apache.org/jira/browse/FLINK-14074
>>> Job starts and then yarn logs report "*Could not resolve
>>> ResourceManager address akka.tcp://flink*"
>>>
>>> A fragment from yarn logs looks like this:
>>>
>>> LazyFromSourcesSchedulingStrategy]
>>> 16:54:21,279 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink
>>> Java Job at Thu Mar 26 16:54:09 CET 2020 (9817283f911d83a6d278cc39d17d6b11)
>>> switched from state CREATED to RUNNING.
>>> 16:54:21,287 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null - 157818240)
>>> -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) -> Filter
>>> (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) -> FlatMap
>>> (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem data set
>>> from EMC events) (1/3) (5482b0e6ae1d64d9b0918ec15599211f) switched from
>>> CREATED to SCHEDULED.
>>> 16:54:21,287 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null - 157818240)
>>> -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) -> Filter
>>> (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) -> FlatMap
>>> (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem data set
>>> from EMC events) (2/3) (5c993710423eea47ae66f833b2999530) switched from
>>> CREATED to SCHEDULED.
>>> 16:54:21,287 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null - 157818240)
>>> -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) -> Filter
>>> (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) -> FlatMap
>>> (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem data set
>>> from EMC events) (3/3) (23cfa30fba857b2c75ba76a21c7d4972) switched from
>>> CREATED to SCHEDULED.
>>> 16:54:21,287 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null - 157818240)
>>> -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) -> Filter
>>> (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) -> FlatMap
>>> (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem data set
>>> from EMD events) (1/3) (7cc8a395b87e82000184724eb1698ace) switched from
>>> CREATED to SCHEDULED.
>>> 16:54:21,288 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null - 157818240)
>>> -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) -> Filter
>>> (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) -> FlatMap
>>> (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem data set
>>> from EMD events) (2/3) (5edfe3d1f509856d17fa0da078cb3f7e) switched from
>>> CREATED to SCHEDULED.
>>> 16:54:21,288 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null - 157818240)
>>> -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) -> Filter
>>> (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) -> FlatMap
>>> (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem data set
>>> from EMD events) (3/3) (dd3397f889a3fad1acf4c59f59a93d92) switched from
>>> CREATED to SCHEDULED.
>>> 16:54:21,297 INFO
>>>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
>>> serve slot request, no ResourceManager connected. Adding as pending request
>>> [SlotRequestId{b4c6e7357e4620bf2e997c46d7723eb1}]
>>> 16:54:21,301 INFO
>>>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
>>> serve slot request, no ResourceManager connected. Adding as pending request
>>> 

Support of custom JDBC dialects in flink-jdbc

2020-03-27 Thread Dongwon Kim
Hi, I tried flink-jdbc [1] to read data from Druid because Druid implements
Calcite Avatica [2], but the connection string, jdbc:avatica:remote:url=
http://BROKER:8082/druid/v2/sql/avatica/, is not supported by any of
JDBCDialects [3].

I implement custom JDBCDialect [4], custom StreamTableSourceFactory [5],
etc in my own repository [6], but it seems overkill.

How about supporting custom JDBC dialects in flink-jdbc based on SPI?

Best,
- Dongwon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector
[2] https://druid.apache.org/docs/latest/querying/sql.html#jdbc
[3]
https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
[4]
https://github.com/eastcirclek/flink-avatica-source/blob/master/src/main/java/io/github/eastcirclek/AvaticaJDBCDialect.java
[5]
https://github.com/eastcirclek/flink-avatica-source/blob/master/src/main/java/io/github/eastcirclek/AvaticaJDBCSourceFactory.java
[6] https://github.com/eastcirclek/flink-avatica-source


Re: "Legacy Source Thread" line in logs

2020-03-27 Thread Arvid Heise
Hi KristoffSC,

the short answer is: you have probably differently configured logger. They
log in a different format or level.

The longer answer: all source connectors currently use the legacy source
thread. That will only change with FLIP-27 [1] being widely adapted. It was
originally planned to come sooner, that's why the name of the source thread
contains "legacy".
Even a bit further into the details: in Flink <1.9, each task used several
threads for doing things. With 1.9, all tasks now use a single thread with
a mailbox model (kind of like an java.util.concurrent.Executor). However,
one type of tasks couldn't be refactored: source tasks. They had to stick
with the old model, because the source interfaces assume that each source
connector spawns his own thread and pushes it's messages. The new
interfaces with FLIP-27 will be pull-based, so that we can also use the
mailbox model for that.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

On Fri, Mar 27, 2020 at 12:40 PM KristoffSC 
wrote:

> Hi all,
> When I run Flink from IDE i can see this prefix in logs
> "Legacy Source Thread"
>
> Running the same job as JobCluster on docker, this prefix is not present.
> What this prefix means?
> Btw, I'm using [1] as ActiveMQ connector.
>
> Thanks.
>
> [1]
> https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: flinksql如何控制结果输出的频率

2020-03-27 Thread Jark Wu
Benchao 可以啊。这么隐秘的实验性功能都被你发现了 :D

On Fri, 27 Mar 2020 at 15:24, Benchao Li  wrote:

> Hi,
>
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
>
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>
> Jingsong Li  于2020年3月27日周五 下午2:51写道:
>
> > Hi,
> >
> > For #1:
> > 创建级联的两级window:
> > - 1分钟窗口
> > - 5分钟窗口,计算只是保存数据,发送明细数据结果
> >
> > Best,
> > Jingsong Lee
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re:在Flink SQL的JDBC Connector中,Oracle的TIMESTAMP字段类型转换异常问题

2020-03-27 Thread sunfulin



Hi,
据我所知现在还不能直接支持Oracle的driver吧?你是怎么使用Flink SQL读写oracle的哈?














在 2020-03-27 17:21:21,"111"  写道:
>Hi,
>在使用Flink SQL读写Oracle JDBC表时,遇到了timestamp转换异常:
>Caused by: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast 
>to java.sql.Timestamp at 
>org.apache.flink.table.dataformat.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:860)
> at 
>org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:344)
> at 
>org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1377)
> at 
>org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1365)
>看报错原因是Oracle JDBC返回的是oracle.sql.Timestamp, 
>而Flink的runtime-blink里面需要的参数类型java.sql.Timestamp。网上看到了解决方案:https://stackoverflow.com/questions/13269564/java-lang-classcastexception-oracle-sql-timestamp-cannot-be-cast-to-java-sql-ti/22055190#22055190不过我们是yarn
> session模式启动,如果想要修改系统参数需要把集群的每个参数都修改一遍。请问官方是否遇到这个问题,如何更优雅的解决?
>Best,Xinghalo


"Legacy Source Thread" line in logs

2020-03-27 Thread KristoffSC
Hi all,
When I run Flink from IDE i can see this prefix in logs
"Legacy Source Thread"

Running the same job as JobCluster on docker, this prefix is not present.
What this prefix means?
Btw, I'm using [1] as ActiveMQ connector.

Thanks.

[1]
https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[Third-party Tool] Flink memory calculator

2020-03-27 Thread Yangze Guo
Hi, there.

In release-1.10, the memory setup of task managers has changed a lot.
I would like to provide here a third-party tool to simulate and get
the calculation result of Flink's memory configuration.

 Although there is already a detailed setup guide[1] and migration
guide[2] officially, the calculator could further allow users to:
- Verify if there is any conflict in their configuration. The
calculator is more lightweight than starting a Flink cluster,
especially when running Flink on Yarn/Kubernetes. User could make sure
their configuration is correct locally before deploying it to external
resource managers.
- Get all of the memory configurations before deploying. User may set
taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
But they also want to know the total memory consumption of Flink. With
this tool, users could get all of the memory configurations they are
interested in. If anything is unexpected, they would not need to
re-deploy a Flink cluster.

The repo link of this tool is
https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
BashJavaUtils.jar of Flink and ensures the calculation result is
exactly the same as your Flink dist. For more details, please take a
look at the README.

Any feedback or suggestion is welcomed!

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html

Best,
Yangze Guo


[Third-party Tool] Flink memory calculator

2020-03-27 Thread Yangze Guo
Hi, there.

In release-1.10, the memory setup of task managers has changed a lot.
I would like to provide here a third-party tool to simulate and get
the calculation result of Flink's memory configuration.

 Although there is already a detailed setup guide[1] and migration
guide[2] officially, the calculator could further allow users to:
- Verify if there is any conflict in their configuration. The
calculator is more lightweight than starting a Flink cluster,
especially when running Flink on Yarn/Kubernetes. User could make sure
their configuration is correct locally before deploying it to external
resource managers.
- Get all of the memory configurations before deploying. User may set
taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
But they also want to know the total memory consumption of Flink. With
this tool, users could get all of the memory configurations they are
interested in. If anything is unexpected, they would not need to
re-deploy a Flink cluster.

The repo link of this tool is
https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
BashJavaUtils.jar of Flink and ensures the calculation result is
exactly the same as your Flink dist. For more details, please take a
look at the README.

Any feedback or suggestion is welcomed!

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html

Best,
Yangze Guo


在Flink SQL的JDBC Connector中,Oracle的TIMESTAMP字段类型转换异常问题

2020-03-27 Thread 111
Hi,
在使用Flink SQL读写Oracle JDBC表时,遇到了timestamp转换异常:
Caused by: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to 
java.sql.Timestamp at 
org.apache.flink.table.dataformat.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:860)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:344)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1377)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1365)
看报错原因是Oracle JDBC返回的是oracle.sql.Timestamp, 
而Flink的runtime-blink里面需要的参数类型java.sql.Timestamp。网上看到了解决方案:https://stackoverflow.com/questions/13269564/java-lang-classcastexception-oracle-sql-timestamp-cannot-be-cast-to-java-sql-ti/22055190#22055190不过我们是yarn
 session模式启动,如果想要修改系统参数需要把集群的每个参数都修改一遍。请问官方是否遇到这个问题,如何更优雅的解决?
Best,Xinghalo

Re: Dynamic Flink SQL

2020-03-27 Thread Krzysztof Zarzycki
I want to do a bit different hacky PoC:
* I will write a sink, that caches the results in "JVM global" memory. Then
I will write a source, that reads this cache.
* I will launch one job, that reads from Kafka source, shuffles the data to
the desired partitioning and then sinks to that cache.
* Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
that uses the source from cache to read the data out and then reinterprets
it as keyed stream [1].
* Using JVM global memory is necessary, because AFAIK the jobs use
different classloaders. The class of cached object also needs to be
available in the parent classloader i.e. in the cluster's classpath.
This is just to prove the idea, the performance and usefulness of it. All
the problems of checkpointing this data I will leave for later.

I'm very very interested in your, community, comments about this idea and
later productization of it.
Thanks!

Answering your comments:

> Unless you need reprocessing for newly added rules, I'd probably just
> cancel with savepoint and restart the application with the new rules. Of
> course, it depends on the rules themselves and how much state they require
> if a restart is viable. That's up to a POC.
>
No, I don't need reprocessing (yet). The rule starts processing the data
from the moment it is defined.
The cancellation with savepoint was considered, but because the number of
new rules defined/changed daily might be large enough, that will generate
too much of downtime. There is a lot of state kept in those rules making
the restart heavy. What's worse, that would be cross-tenant downtime,
unless the job was somehow per team/tenant. Therefore we reject this option.
BTW, the current design of our system is similar to the one from the blog
series by Alexander Fedulov about dynamic rules pattern [2] he's just
publishing.


> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
> That's why I proposed to run one big query instead of 500 small ones. Have
> a POC where you add two of your rules manually to a Table and see how the
> optimized logical plan looks like. I'd bet that the source is only tapped
> once.
>

I can do that PoC, no problem. But AFAIK it will only work with the
"restart with savepoint" pattern discussed above.


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html



> On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki 
> wrote:
>
>> Hello Arvid,
>> Thanks for joining to the thread!
>> First, did you take into consideration that I would like to dynamically
>> add queries on the same source? That means first define one query, later
>> the day add another one , then another one, and so on. A Week later kill
>> one of those, start yet another one, etc... There will be hundreds of these
>> queries running at once, but the set of queries change several times a day.
>> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
>> Regarding the temporary tables AFAIK they are only the metadata (let's
>> say Kafka topic detals) and store it in the scope of a SQL session.
>> Therefore multiple queries against that temp table will behave the same way
>> as querying normal table, that is will read the datasource multiple times.
>>
>> It looks like the feature I want or could use is defined by the way of
>> FLIP-36 about Interactive Programming, more precisely caching the stream
>> table [1].
>> While I wouldn't like to limit the discussion to that non-existing yet
>> feature. Maybe there are other ways of achieving this danymic querying
>> capability.
>>
>> Kind Regards,
>> Krzysztof
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>>
>>
>>
>> * You want to use primary Table API as that allows you to
>>> programmatically introduce structural variance (changing rules).
>>>
>> * You start by registering the source as temporary table.
>>>
>> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>>> * Lastly you unionAll the results.
>>>
>>> Then I'd perform some experiment if indeed the optimizer figured out
>>> that it needs to only read the source once. The resulting code would be
>>> minimal and easy to maintain. If the performance is not satisfying, you can
>>> always make it more complicated.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>>
>>> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki 
>>> wrote:
>>>
 Dear Flink community!

 In our company we have implemented a system that realize the dynamic
 business rules pattern. We spoke about it during Flink Forward 2019
 https://www.youtube.com/watch?v=CyrQ5B0exqU.
 

Re: 申请中文社区

2020-03-27 Thread Yangze Guo
您好,如果想订阅user-zh,可以发送邮件到user-zh-subscr...@flink.apache.org

Best,
Yangze Guo

On Fri, Mar 27, 2020 at 4:45 PM 大数据开发面试_夏永权  wrote:
>
> 申请中文社区,不知是否成功,我非常想加入社区,非常感谢。


申请中文社区

2020-03-27 Thread 大数据开发面试_夏永权
申请中文社区,不知是否成功,我非常想加入社区,非常感谢。

flink savepoint问题

2020-03-27 Thread 大数据开发面试_夏永权
Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!

1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id  在程序有背压的时候停不掉


 The program finished with the following exception:
org.apache.flink.util.FlinkException: Could not cancel job 
1f768e4ca9ad5792a4844a5d12163b73.
at org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
... 9 more
stop flink job failed!!!




2.再用flink sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)


 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at 

Re: state schema evolution for case classes

2020-03-27 Thread Tzu-Li (Gordon) Tai
Hi Apoorv,

Sorry for the late reply, have been quite busy with backlog items the past
days.

On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks Gordon for the suggestion,
>
> I am going by this repo :
> https://github.com/mrooding/flink-avro-state-serialization
>
> So far I am able to alter the scala case classes and able to restore from
> savepoint using memory state backend, but when I am using rocksdb as
> statebackend and try to restore from savepoint it break with following
> error :
>

When you say restoring it with the RocksDB backend, was the savepoint you
are attempting to restore from taken with the RocksDB backend as well?
I'm asking that, because currently you cannot change the state backend
across restores, as they have different savepoint binary formats.
This is also the case when you use the State Processor API - when you load
an existing savepoint, you first have to load it with the same state
backend that was used to create the savepoint. You can change the state
backend using the State Processor API, by creating a new savepoint with
your desired target backend, and dumping all state data extracted from the
loaded savepoint into the new fresh savepoint.
There has been previous proposals (FLIP-41) [1] to unify the savepoint
formats which would make a lot of this easier, but AFAIK this isn't on the
roadmap in the near future.

Best Regards,
Gordon

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


>
> org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from 
> RocksDB.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
>   at 
> nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
>   at 
> nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
>   at 
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
>   at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
>   at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
>   at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
>   at 
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>   at 
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>   at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>   at 
> nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
>   at 
> nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
>   at 
> nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
>   ... 8 more
>
>
>
>
> On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Apoorv,
>>
>> Flink currently does not natively support schema evolution for state
>> types using Scala case classes [1].
>>
>> So, as Roman has pointed out, there are 2 possible ways for you to do
>> that:
>> - Implementing a custom serializer that support schema evolution for your
>> specific Scala case classes, as Roman suggested.
>> - or, using the State Processor API [2] to migrate your case classes
>> offline as a batch job
>>
>> For your question on how to implement a schema-evolution supporting
>> serializer, can you share with me the problems you have met so far?
>> Otherwise, if you take a look at the PojoSerializerSnapshot class, that
>> would be a starting point to implement something similar for your case
>> classes.
>>
>> As you will quickly realize, it's not simple, so I would strongly suggest
>> trying out the 

Re: Issue with Could not resolve ResourceManager address akka.tcp://flink

2020-03-27 Thread Vitaliy Semochkin
Hello Zhu,

The host can be resolved and  there are no filewalls in the cluster, so all
ports are opened.

Regards,
Vitaliy

On Fri, Mar 27, 2020 at 8:32 AM Zhu Zhu  wrote:

> Hi Vitaliy,
>
> >> *Cannot serve slot request, no ResourceManager connected*
> This is not a problem, just that the JM needs RM to be connected to send
> slot requests.
>
> >> *Could not resolve ResourceManager address
> akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager*
> This should be the root cause. Would you check whether the hostname
> *prod-bigd-dn11* is resolvable? And whether the port 43757 of that
> machine is permitted to be accessed?
>
> Thanks,
> Zhu Zhu
>
> Vitaliy Semochkin  于2020年3月27日周五 上午1:54写道:
>
>> Hi,
>>
>> I'm facing an issue similar to
>> https://issues.apache.org/jira/browse/FLINK-14074
>> Job starts and then yarn logs report "*Could not resolve ResourceManager
>> address akka.tcp://flink*"
>>
>> A fragment from yarn logs looks like this:
>>
>> LazyFromSourcesSchedulingStrategy]
>> 16:54:21,279 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- Job Flink Java Job at Thu Mar 26 16:54:09 CET 2020
>> (9817283f911d83a6d278cc39d17d6b11) switched from state CREATED to RUNNING.
>> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
>> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
>> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
>> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
>> data set from EMC events) (1/3) (5482b0e6ae1d64d9b0918ec15599211f) switched
>> from CREATED to SCHEDULED.
>> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
>> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
>> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
>> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
>> data set from EMC events) (2/3) (5c993710423eea47ae66f833b2999530) switched
>> from CREATED to SCHEDULED.
>> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
>> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
>> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
>> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
>> data set from EMC events) (3/3) (23cfa30fba857b2c75ba76a21c7d4972) switched
>> from CREATED to SCHEDULED.
>> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
>> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
>> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
>> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
>> data set from EMD events) (1/3) (7cc8a395b87e82000184724eb1698ace) switched
>> from CREATED to SCHEDULED.
>> 16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
>> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
>> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
>> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
>> data set from EMD events) (2/3) (5edfe3d1f509856d17fa0da078cb3f7e) switched
>> from CREATED to SCHEDULED.
>> 16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
>> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
>> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
>> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
>> data set from EMD events) (3/3) (dd3397f889a3fad1acf4c59f59a93d92) switched
>> from CREATED to SCHEDULED.
>> 16:54:21,297 INFO
>>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
>> serve slot request, no ResourceManager connected. Adding as pending request
>> [SlotRequestId{b4c6e7357e4620bf2e997c46d7723eb1}]
>> 16:54:21,301 INFO
>>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
>> serve slot request, no ResourceManager connected. Adding as pending request
>> [SlotRequestId{841bbb79b01b5e0d9ae749a03f65c303}]
>> 16:54:21,301 INFO
>>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
>> serve slot request, no ResourceManager connected. Adding as pending request
>> [SlotRequestId{496120465d541ea9fd2ffcec89e2ac3b}]
>> 16:54:21,304 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>- Connecting to ResourceManager akka.tcp://
>> 

Re: flinksql如何控制结果输出的频率

2020-03-27 Thread Benchao Li
Hi,

对于第二个场景,可以尝试一下fast emit:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 5min

PS:
1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
2. window加了emit之后,会由原来输出append结果变成输出retract结果

Jingsong Li  于2020年3月27日周五 下午2:51写道:

> Hi,
>
> For #1:
> 创建级联的两级window:
> - 1分钟窗口
> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>
> Best,
> Jingsong Lee
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


??????Flink?????? ?????? Keyed Watermarks

2020-03-27 Thread ??????(Jiacheng Jiang)
??




----
??:"Utopia"https://bigdata.cs.ut.ee/keyed-watermarks-partition-aware-watermark-generation-apache-flink



Best regards
Utopia

Re: Ask for reason for choice of S3 plugins

2020-03-27 Thread David Anderson
If you are using both the Hadoop S3 and Presto S3 filesystems, you should
use s3p:// and s3a:// to distinguish between the two.

Presto is recommended for checkpointing because the Hadoop implementation
has very high latency when creating files, and because it hits request rate
limits very quickly. The Hadoop S3 filesystem tries to imitate a normal
filesystem on top of S3:

 - before writing a key it checks if the "parent directory" exists by
checking for a key with the prefix up to the last "/"
 - it creates empty marker files to mark the existence of such a parent
directory
 - all these existence requests are S3 HEAD requests, which have very low
request rate limits


*David Anderson* | Training Coordinator

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


On Fri, Mar 27, 2020 at 7:10 AM  wrote:

> Hi,
>
>
>
> In this document
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins,
> it mentioned that
>
>- Presto is the recommended file system for checkpointing to S3.
>
> Is there a reason for that? Is there some bottleneck for s3 hadoop plugin
> that can’t support checkpoint storage well?
>
>
>
> And if I have the s3:// scheme with both plugins loaded, is there a class
> loading order or just random for accessing S3? Which plugin will take
> charge?
>
>
>
> Best Regards,
>
> Brian
>
>
>


Re: flinksql如何控制结果输出的频率

2020-03-27 Thread Jingsong Li
Hi,

For #1:
创建级联的两级window:
- 1分钟窗口
- 5分钟窗口,计算只是保存数据,发送明细数据结果

Best,
Jingsong Lee


Ask for reason for choice of S3 plugins

2020-03-27 Thread B.Zhou
Hi,

In this document 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins,
 it mentioned that

  *   Presto is the recommended file system for checkpointing to S3.
Is there a reason for that? Is there some bottleneck for s3 hadoop plugin that 
can't support checkpoint storage well?

And if I have the s3:// scheme with both plugins loaded, is there a class 
loading order or just random for accessing S3? Which plugin will take charge?

Best Regards,
Brian