Re: Latency Tracking in Apache Flink

2021-10-20 Thread Puneet Duggal
Hi,

Yes it is a simple ETL job and i thought of using it start_time, end_time 
concept… but just wanted to know if flink or any other 3rd party monitoring 
tools like datadog etc provide out of the box functionality to report latency. 

Thanks and regards,
Puneet Duggal

> On 21-Oct-2021, at 8:01 AM, JING ZHANG  wrote:
> 
> job



Mesos deploy starts Mesos framework but does not start job managers

2021-10-20 Thread Javier Vegas
I am trying to deploy a Flink cluster via Mesos following
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/mesos/
(I know Mesos support has been deprecated, and I am planning to migrate my
deployment tools to Kubernetes, but for now I am stuck using Mesos). To
deploy, I am using a custom Docker image that contains both Flink and my
user binaries. The command I am using to start the cluster is

/opt/flink/bin/mesos-appmaster.sh \
  -Djobmanager.rpc.address=$HOST \
  -Dmesos.resourcemanager.framework.user=flink \
  -Dmesos.resourcemanager.framework.name=timeline-flink-populator \
  -Dmesos.master=10.0.25.139:5050 \
  -Dmesos.resourcemanager.tasks.cpus=4 \
  -Dmesos.resourcemanager.tasks.container.type=docker \
  -Dmesos.resourcemanager.tasks.container.image.name=
docker.strava.com/strava/flink:jv-mesos \
  -Dtaskmanager.numberOfTaskSlots=4 ;

mesos-appmaster.sh is able to start a Mesos framework and a Flink job
manager, but fails to start task managers. Looking in the Mesos syslog I
see that the Mesos framework was sending offers that were being declined
very quickly, and the agents ended in LOST state. I am attaching all the
relevant lines in the syslog.

Any ideas what the problem could be or what else I could check to see what
is happening?

Thanks,

Javier Vegas


syslog
Description: Binary data


退订

2021-10-20 Thread aegean0...@163.com


| |
aegean0933
邮箱:aegean0...@163.com
|
退订

Re: Using the flink CLI option --pyRequirements

2021-10-20 Thread Dian Fu
Hi Francis Conroy,

Do you want to debug the PyFlink job submitted via `flink run`? There is
documentation [1] on how to debug it via `PyCharm`.

PS: It supports the loopback mode in PyFlink which is enabled in local
deployment. That's when you execute the PyFlink jobs locally, e.g. when
executing it directly in IDE. With the loopback mode, you could simply set
a breakpoint in the source code in the IDE. More information could refer to
[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/debugging/#remote-debug
[2]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#loopback-mode-for-debugging

On Tue, Oct 19, 2021 at 1:34 PM Francis Conroy 
wrote:

> Hi,
>
> I'm trying to install some required modules by supplying a requirements
> file when submitting to the cluster and the CLI just seems to stall. I've
> built 1.15-SNAPSHOT@7578758fa8c84314b8b3206629b3afa9ff41b636 and have run
> the wordcount example, everything else seems to work, I just can't submit a
> pyflink job to my cluster when using the --pyRequirements option.
>
> I started going down the line of debugging the flink CLI using intellij
> idea, but wasn't able to figure out how to make my venv with pyflink
> installed available to the debug environment.
>
> Thanks,
> Francis Conroy
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: Latency Tracking in Apache Flink

2021-10-20 Thread JING ZHANG
Hi Puneet,

> Read about latency markers but not much useful as it just skips time taken
> by each operator.


Yes, latency-tracking indeed has the problem you said.

Is there any way to track latency / time taken for each event processing.


I'm afraid there is no built-in way to  track latency / time taken for each
event processing.
If the job is a simple ETL job, you may could track each element latency by:
1. calculate a start_time in source
2. calculate end_time in the sink
3. calculate the latency by end_time - start_time.
But if the job has aggregate operator, it's hard to track latency for each
event processing because a group of raw records would be aggregated into an
accumulator result.

Best,
JING ZHANG


Puneet Duggal  于2021年10月21日周四 上午1:21写道:

> Hi,
>
> Is there any way to track latency / time taken for each event processing.
> Read about latency markers but not much useful as it just skips time taken
> by each operator.
>
> Thanks,
> Puneet


flink修改sink并行度后,无法从checkpoint restore问题

2021-10-20 Thread kong
hi,我遇到flink修改sink并行度后,无法从checkpoint restore问题


flink 版本: 1.13.1
flink on yarn
DataStream api方式写的java job


试验1:不修改任何代码,cancel job后,能从指定的checkpoint恢复
   dataStream.addSink(new Sink(config)).name("").uid("");
试验2:只修改sink端的并行度,job无法启动,一直是Initiating状态
dataStream.addSink(new 
Sink(config)).name("").uid("").setParallelism(2);


日志异常
2021-10-21 09:52:57,076 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at 
akka://flink/user/rpc/resourcemanager_0 .
2021-10-21 09:52:57,132 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}.
2021-10-21 09:52:57,133 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Starting the resource manager.
2021-10-21 09:52:57,134 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-10-21 09:52:57,135 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-10-21 09:52:57,142 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Start JobDispatcherLeaderProcess.
2021-10-21 09:52:57,151 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.MiniDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
2021-10-21 09:52:57,228 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/57645e97919d2efebfab67e2846696e7/job_manager_lock'}.
2021-10-21 09:52:57,312 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@node2.hadoop:42535] has failed, address is now gated for [50] 
ms. Reason: [Association failed with [akka.tcp://flink@node2.hadoop:42535]] 
Caused by: [java.net.ConnectException: Connection refused: 
node2.hadoop/xx.xx.xx.xx:42535]
2021-10-21 09:52:57,313 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: node2.hadoop/xx.xx.xx.xx:42535
2021-10-21 09:52:57,352 INFO  org.apache.flink.yarn.YarnResourceManagerDriver   
   [] - Recovered 0 containers from previous attempts ([]).
2021-10-21 09:52:57,353 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.
2021-10-21 09:52:57,379 INFO  org.apache.hadoop.conf.Configuration  
   [] - resource-types.xml not found
2021-10-21 09:52:57,379 INFO  
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to 
find 'resource-types.xml'.
2021-10-21 09:52:57,394 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2021-10-21 09:52:57,395 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: node2.hadoop/xx.xx.xx.xx:42535
2021-10-21 09:52:57,396 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@node2.hadoop:42535] has failed, address is now gated for [50] 
ms. Reason: [Association failed with [akka.tcp://flink@node2.hadoop:42535]] 
Caused by: [java.net.ConnectException: Connection refused: 
node2.hadoop/xx.xx.xx.xx:42535]
2021-10-21 09:52:57,400 INFO  
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound 
of the thread pool size is 500
2021-10-21 09:52:57,403 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
2021-10-21 09:52:57,407 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
ResourceManager akka.tcp://flink@node3.hadoop:43978/user/rpc/resourcemanager_0 
was granted leadership with fencing token ae27185bb3fda634d2c109510ab54ba4







Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

2021-10-20 Thread Dian Fu
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗?

On Tue, Oct 19, 2021 at 6:34 PM xuzh  wrote:

> 错误日志
> Exception in thread Thread-14:
> Traceback (most recent call last):
>  File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in
> _bootstrap_inner
>   self.run()
>  File
> "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 218, in run
>   while not self._finished.wait(next_call - time.time()):
>  File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
>   signaled = self._cond.wait(timeout)
>  File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
>   gotit = waiter.acquire(True, timeout)
> OverflowError: timeout value is too large


Re: Apache Flink - Using upsert JDBC sink for DataStream

2021-10-20 Thread M Singh
 Thanks Jing for your references.  I will check them.  Mans
On Monday, October 18, 2021, 11:24:13 PM EDT, JING ZHANG 
 wrote:  
 
 Hi Mans,
Is there a DataStream api for using the upsert functionality ?
You could try use `JdbcSink#sink` method, pass a upsert query as first 
parameter value. However, there is no standard syntax for upsert, you need to 
check whether the external database supports upsert or not. If yes, what's its 
upsert grammer.  The following table describes the database-specific DML that 
is used[1].
| Database | Upsert Grammar |
| MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
| PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |



Also, is there any reason for why the TableJdbcUpsertOutputFormat constructors 
are not public ? 
`TableJdbcUpsertOutputFormat` is  designed to an internal class in Jdbc table 
connector. When build `JdbcOutputFormat`, `JdbcOutputFormat.Builder` would 
choose to create a `TableJdbcUpsertOutputFormat` or `JdbcOutputFormat` instance 
depends on whether key fields is defined in DML. 
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/#idempotent-writes
Best,JING ZHANG
M Singh  于2021年10月19日周二 上午7:00写道:

 Hi Jing:
Thanks for your response and example.
Is there a DataStream api for using the upsert functionality ?
Also, is there any reason for why the TableJdbcUpsertOutputFormat constructors 
are not public ? 
Thanks again for your help.
Mans
On Monday, October 18, 2021, 12:30:36 AM EDT, JING ZHANG 
 wrote:  
 
 Hi,If you need JDBC upsert functionality, it's easier to implement app using 
Flink SQL. You could use JDBC Table Connector [1]. You could define primary key 
in DDL when writing data to external database. See CREATE TABLE DDL for more 
details about PRIMARY KEY syntax.I find an example in 
`JdbcUpsertTableSinkITCase` of flink-connector-jdbc, hope this helps.
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table t =
tEnv.fromDataStream(
get4TupleDataStream(env)
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<
Tuple4>() {
@Override
public long extractAscendingTimestamp(
Tuple4
element) {
return element.f0;
}
}),
$("id"),
$("num"),
$("text"),
$("ts"));

tEnv.createTemporaryView("T", t);
tEnv.executeSql(
"CREATE TABLE upsertSink ("
+ "  cnt BIGINT,"
+ "  lencnt BIGINT,"
+ "  cTag INT,"
+ "  ts TIMESTAMP(3)"
+ ") WITH ("
+ "  'connector.type'='jdbc',"
+ "  'connector.url'='',"
+ "  'connector.table'='upsertSink'"
+ ")");

tEnv.executeSql(
"INSERT INTO upsertSink \n"
+ "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS 
ts\n"
+ "FROM (\n"
+ "  SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS 
ts\n"
+ "  FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE 
WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n"
+ "  GROUP BY len, cTag\n"
+ ")\n"
+ "GROUP BY cnt, cTag")
.await();
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table
Best,JING ZHANG
M Singh  于2021年10月17日周日 上午12:59写道:

Hi Folks:
I am working on Flink DataStream pipeline and would like to use JDBC upsert 
functionality.  I found a class TableJdbcUpsertOutputFormat but am not sure who 
to use it with the JdbcSink as shown in the document 
(https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html).
 
I could not find how to pass OutputFormat argument to the JDBC sink.
Please let me know if there is any documentation or example for using JDBC sink 
with upsert for DataStreams.
Thanks


 

  
  

High availability data clean up

2021-10-20 Thread Weiqing Yang
Hi,

Per the doc
,
`kubernetes.jobmanager.owner.reference` can be used to set up the owners of
the job manager Deployment. If the owner is deleted, then the job manager
and its related pods will be deleted. How about the HA related ConfigMaps?
Are they also deleted when deleting the owner of the job manager
Deployment? Per the wiki

here,
the HA data will be retained when deleting jobmanager Deployment. If we
want to delete these HA related configMaps as well when deleting the
job manager, what is the suggested way to do that?

Thanks,
weiqing


Re: Problem with Flink job and Kafka.

2021-10-20 Thread Marco Villalobos
Hi Qingsheng,

Thank you.

I am running Flink in EMR 6.3.0 which uses Flink version 1.12.1.  We AWS
MSK Kafka, and we are currently using Kafka 2.2.1.

The stack trace seems empty, it only states this error:

org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to
write a non-default producerId at version 1

I notice that this configuration uses Flink 2.5.0, which is ahead of Kafka
2.2.1.  I take your word that this is a version issue.

I am going to try and upgrade our Kafka clusters and see if that makes a
difference.

Thank you. Any further insight will be highly appreciated.

-Marco


On Mon, Oct 18, 2021 at 8:03 PM Qingsheng Ren  wrote:

> Hi Marco,
>
> Sorry I forgot to cc the user mailing list just now.
>
> From the exception message it looks like a versioning issue. Could you
> provide some additional information, such as Flink & Kafka connector
> version, Kafka broker version, and full exception stack? Also it will be
> helpful to paste part of your code (on DataStream API) or SQL (on Table &
> SQL API).
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Oct 19, 2021, 9:28 AM +0800, Marco Villalobos <
> mvillalo...@kineteque.com>, wrote:
>
> I have the simplest Flink job that simply deques off of a kafka topic and
> writes to another kafka topic, but with headers, and manually copying the
> event time into the kafka sink.
>
> It works as intended, but sometimes I am getting this error:
>
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to
> write a non-default producerId at version 1.
>
> Does anybody know what this means and how to fix this?
>
> Thank you.
>
>


Re: [DISCUSS] Creating an external connector repository

2021-10-20 Thread Thomas Weise
Hi,

I see the stable core Flink API as a prerequisite for modularity. And
for connectors it is not just the source and sink API (source being
stable as of 1.14), but everything that is required to build and
maintain a connector downstream, such as the test utilities and
infrastructure.

Without the stable surface of core Flink, changes will leak into
downstream dependencies and force lock step updates. Refactoring
across N repos is more painful than a single repo. Those with
experience developing downstream of Flink will know the pain, and that
isn't limited to connectors. I don't remember a Flink "minor version"
update that was just a dependency version change and did not force
other downstream changes.

Imagine a project with a complex set of dependencies. Let's say Flink
version A plus Flink reliant dependencies released by other projects
(Flink-external connectors, Beam, Iceberg, Hudi, ..). We don't want a
situation where we bump the core Flink version to B and things fall
apart (interface changes, utilities that were useful but not public,
transitive dependencies etc.).

The discussion here also highlights the benefits of keeping certain
connectors outside Flink. Whether that is due to difference in
developer community, maturity of the connectors, their
specialized/limited usage etc. I would like to see that as a sign of a
growing ecosystem and most of the ideas that Arvid has put forward
would benefit further growth of the connector ecosystem.

As for keeping connectors within Apache Flink: I prefer that as the
path forward for "essential" connectors like FileSource, KafkaSource,
... And we can still achieve a more flexible and faster release cycle.

Thanks,
Thomas





On Wed, Oct 20, 2021 at 3:32 AM Jark Wu  wrote:
>
> Hi Konstantin,
>
> > the connectors need to be adopted and require at least one release per
> Flink minor release.
> However, this will make the releases of connectors slower, e.g. maintain
> features for multiple branches and release multiple branches.
> I think the main purpose of having an external connector repository is in
> order to have "faster releases of connectors"?
>
>
> From the perspective of CDC connector maintainers, the biggest advantage of
> maintaining it outside of the Flink project is that:
> 1) we can have a more flexible and faster release cycle
> 2) we can be more liberal with committership for connector maintainers
> which can also attract more committers to help the release.
>
> Personally, I think maintaining one connector repository under the ASF may
> not have the above benefits.
>
> Best,
> Jark
>
> On Wed, 20 Oct 2021 at 15:14, Konstantin Knauf  wrote:
>
> > Hi everyone,
> >
> > regarding the stability of the APIs. I think everyone agrees that
> > connector APIs which are stable across minor versions (1.13->1.14) are the
> > mid-term goal. But:
> >
> > a) These APIs are still quite young, and we shouldn't make them @Public
> > prematurely either.
> >
> > b) Isn't this *mostly* orthogonal to where the connector code lives? Yes,
> > as long as there are breaking changes, the connectors need to be adopted
> > and require at least one release per Flink minor release.
> > Documentation-wise this can be addressed via a compatibility matrix for
> > each connector as Arvid suggested. IMO we shouldn't block this effort on
> > the stability of the APIs.
> >
> > Cheers,
> >
> > Konstantin
> >
> >
> >
> > On Wed, Oct 20, 2021 at 8:56 AM Jark Wu  wrote:
> >
> >> Hi,
> >>
> >> I think Thomas raised very good questions and would like to know your
> >> opinions if we want to move connectors out of flink in this version.
> >>
> >> (1) is the connector API already stable?
> >> > Separate releases would only make sense if the core Flink surface is
> >> > fairly stable though. As evident from Iceberg (and also Beam), that's
> >> > not the case currently. We should probably focus on addressing the
> >> > stability first, before splitting code. A success criteria could be
> >> > that we are able to build Iceberg and Beam against multiple Flink
> >> > versions w/o the need to change code. The goal would be that no
> >> > connector breaks when we make changes to Flink core. Until that's the
> >> > case, code separation creates a setup where 1+1 or N+1 repositories
> >> > need to move lock step.
> >>
> >> From another discussion thread [1], connector API is far from stable.
> >> Currently, it's hard to build connectors against multiple Flink versions.
> >> There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
> >>  maybe also in the future versions,  because Table related APIs are still
> >> @PublicEvolving and new Sink API is still @Experimental.
> >>
> >>
> >> (2) Flink testability without connectors.
> >> > Flink w/o Kafka connector (and few others) isn't
> >> > viable. Testability of Flink was already brought up, can we really
> >> > certify a Flink core release without Kafka connector? Maybe those
> >> > connectors that are used in Flink e2e tests to 

Latency Tracking in Apache Flink

2021-10-20 Thread Puneet Duggal
Hi,

Is there any way to track latency / time taken for each event processing. Read 
about latency markers but not much useful as it just skips time taken by each 
operator.

Thanks,
Puneet

Flink on kubernetes HA ::Renew deadline reached

2021-10-20 Thread marco



Hello flink community::

I am deploying flink application cluster standalone mode on kubernetes, but i 
am facing some problems 

the job starts normally and it continues to run but at some point in time it 
crushes and gets restarted.

Does anyone facing the same problem or know how to resolve it.
 
I am using flink 1.12.5 version.

here's the log of the JM:

{"@timestamp":"2021-10-19T04:47:29.887+02:00","@version":"1","message":"WebSocket
 successfully 
opened","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"OkHttp
 https://172.31.64.1/...","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:49:06.833+02:00","@version":"1","message":"Triggering
 checkpoint 2930 (type=CHECKPOINT) @ 1634611746678 for job 
.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"Checkpoint
 Timer","level":"INFO","level_value":2}
{"@timestamp":"2021-10-19T04:49:07.000+02:00","@version":"1","message":"Completed
 checkpoint 2930 for job  (474 bytes in 229 
ms).","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"jobmanager-future-thread-1","level":"INFO","level_value":2}
{"@timestamp":"2021-10-19T04:51:40.861+02:00","@version":"1","message":"WebSocket
 close received. code: 1000, reason: 
","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"OkHttp
 WebSocket https://172.31.64.1/...","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:51:40.862+02:00","@version":"1","message":"Submitting
 reconnect task to the 
executor","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"OkHttp
 WebSocket https://172.31.64.1/...","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:51:40.862+02:00","@version":"1","message":"Scheduling
 reconnect 
task","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"scheduleReconnect|Executor
 for Watch 1899640261","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:51:40.862+02:00","@version":"1","message":"Current 
reconnect backoff is 1000 milliseconds 
(T0)","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"scheduleReconnect|Executor
 for Watch 1899640261","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:51:41.863+02:00","@version":"1","message":"Connecting
 websocket ... 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@713a35c5","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"reconnectAttempt|Executor
 for Watch 1899640261","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:51:41.864+02:00","@version":"1","message":"WebSocket
 successfully 
opened","logger_name":"io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager","thread_name":"OkHttp
 https://172.31.64.1/...","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:53:16.916+02:00","@version":"1","message":"Renew 
deadline reached after 120 seconds while renewing lock ConfigMapLock: flink-dev 
- job1-dispatcher-leader 
(8f7dddbb-236a-43fe-a706-1795b494838d)","logger_name":"io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector","thread_name":"pool-19-thread-1","level":"DEBUG","level_value":1}
{"@timestamp":"2021-10-19T04:53:16.918+02:00","@version":"1","message":"Stopping
 
SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
{"@timestamp":"2021-10-19T04:53:16.919+02:00","@version":"1","message":"Stopping
 dispatcher 
akka.tcp://flink@172.24.10.160:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-61","level":"INFO","level_value":2}
{"@timestamp":"2021-10-19T04:53:16.919+02:00","@version":"1","message":"Stopping
 all currently running jobs of dispatcher 
akka.tcp://flink@172.24.10.160:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-61","level":"INFO","level_value":2}
{"@timestamp":"2021-10-19T04:53:16.920+02:00","@version":"1","message":"Stopping
 the JobMaster for job 
flinkJob().","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-68","level":"INFO","level_value":2}
{"@timestamp":"2021-10-19T04:53:16.920+02:00","@version":"1","message":"Stopping
 

RE: Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Schwalbe Matthias
Hi Ori,

Just a couple of comments (some code is missing for a concise explanation):

  *   SimpleAggregator is not used in the job setup below (assuming another job 
setup)
  *   SimpleAggregator is called for each event that goes into a specific 
session window, however
 *   The scala vectors will ever grow with the number of events that end up 
in a single window, hence
 *   Your BigO complexity will be O(n^2), n: number of events in window (or 
worse)
 *   For each event the accumulator is retrieved from window state and 
stored to window state (and serialized, if on RocksDB Backend)
  *   On the other hand when you use a process function
 *   Flink keeps a list state of events belonging to the session window, and
 *   Only when the window is triggered (on session gap timeout) all events 
are retrieved from window state and processed
 *   On RocksDbBackend the new events added to the window are appended to 
the existing window state key without touching the previously stored events, 
hence
 *   Serialization is only done once per incoming event, and
 *   BigO complexity is around O(n)

… much simplified

When I started with similar questions I spent quite some time in the debugger, 
breaking into the windowing functions and going up the call stack, in order to 
understand how Flink works … time well spent


I hope this helps …

I won’t be able to follow up for the next 1 ½ weeks, unless you try to meet me 
on FlinkForward conference …

Thias

From: Ori Popowski 
Sent: Mittwoch, 20. Oktober 2021 16:17
To: user 
Subject: Huge backpressure when using AggregateFunction with Session Window

I have a simple Flink application with a simple keyBy, a SessionWindow, and I 
use an AggregateFunction to incrementally aggregate a result, and write to a 
Sink.

Some of the requirements involve accumulating lists of fields from the events 
(for example, all URLs), so not all the values in the end should be primitives 
(although some are, like total number of events, and session duration).

This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic of my 
AggregateFunction's add() and merge() functions are what's ruining the job 
(i.e. causing the backpressure).

I've managed to create a reduced version of my job, where I just append and 
concatenate some of the event values and I can confirm that a backpressure 
starts just 40 minutes after launching the job:


class SimpleAggregator extends AggregateFunction[Event, Accumulator, 
Session] with LazyLogging {

  override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
  )

  override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
  accumulator._1 :+ value.getEnvUrl,
  accumulator._2 :+ value.getCtxVisitId,
  accumulator._3 :+ value.getVisionsSId,
  accumulator._4 :+ value.getTime.longValue(),
  accumulator._5 :+ value.getTime.longValue()
)
  }

  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
  a._1 ++ b._1,
  a._2 ++ b._2,
  a._3 ++ b._3,
  a._4 ++ b._4,
  a._5 ++ b._5
)
  }

  override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
  .setSessionDuration(1000)
  .setSessionTotalEvents(1000)
  .setSId("-" + UUID.randomUUID().toString)
  .build()
  }
}

This is the job overall (simplified version):


class App(
  source: SourceFunction[Event],
  sink: SinkFunction[Session]
) {

  def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
  .assignAscendingTimestamps(_.getTime)
  .keyBy(event => (event.getWmUId, event.getWmEnv, 
event.getSId).toString())
  
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
  .allowedLateness(0.seconds.asFlinkTime)
  .process(new ProcessFunction).uid("process-session")
  .addSink(sink).uid("sink")

senv.execute("session-aggregation")
  }
}

After 3 weeks of grueling debugging, profiling, checking the serialization and 
more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just 
aggregates all the events behind the scenes and just gives them to me as an 
iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the ProcessWindowFunction 
actually aggregates more data, and also does concatenations and appends, for 
some reason there's no backpressure.

To finish this long post, what I'm trying to 

Re: Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-20 Thread Timo Walther

Hi Peter,

as a temporary workaround I would simply implement a UDF like:

public class EverythingToString extends ScalarFunction {

   public String eval(@DataTypeHint(inputGroup = ANY) Object o) {
 return o.toString();
   }
}

For the Utf8 issue, you can instruct Avro to generate Java classes with 
String instead using the `avro.java.string` option.


The rework of the type system messed up the Avro support in Flink. This 
is a known issue that is tracked under


https://issues.apache.org/jira/browse/FLINK-8183

Regards,
Timo

On 20.10.21 17:30, Peter Schrott wrote:

Hi Timo,

thanks a lot for your suggestion.

I also considered this workaround but when going from DataStreams API to 
Table API (using the POJO generated by maven avro plugin) types are not 
mapped correctly, esp. UTF8 (avros implementation of CharSquence) and 
also enums. In the table I have then mostly RAW types, which are not 
handy to perform SQL statements on. It is already discussed here: 
https://www.mail-archive.com/user@flink.apache.org/msg9.html 



Best, Peter

On Wed, Oct 20, 2021 at 5:21 PM Timo Walther > wrote:


A current workaround is to use DataStream API to read the data and
provide your custom Avro schema to configure the format. Then switch to
Table API.

StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum
classes will be represented as RAW types but you can forward them as
blackboxes or convert them in a UDF.

We will further improve the support of external types in the Table API
type system in the near future.

Regards,
Timo

On 20.10.21 15:51, Peter Schrott wrote:
 > Hi people!
 >
 > I was digging deeper this days and found the "root cause" of the
issue and the difference between avro reading from files and avro
reading from Kafka & SR.
 >
 > plz see:

https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E


 >
 > The main problem with Kafka & SR is, that the
"org.apache.avro.generic.GenericDatumReader" is initialized with and
"expected" schema which is taken from the flinks sql table
definition. When it comes to deserializing the and attribute with
type "enum" it does not match with the expected schema where this
same attribute is typed as "string". Hence avro deserializer breaks
here.
 >
 > Not sure how to tackle that issue. The functioning of the
"GeneraticDatumReader" can not really be changed. A solution could
be to create an analogues reader for reading data based on SQL ddl.
 >
 > Cheers, Peter
 >
 > On 2021/10/12 16:18:30 Dongwon Kim wrote:
 >> Hi community,
 >>
 >> Can I get advice on this question?
 >>
 >> Another user just sent me an email asking whether I found a
solution or a
 >> workaround for this question, but I'm still stuck there.
 >>
 >> Any suggestions?
 >>
 >> Thanks in advance,
 >>
 >> Dongwon
 >>
 >> -- Forwarded message -
 >> From: Dongwon Kim mailto:eastcirc...@gmail.com>>
 >> Date: Mon, Aug 9, 2021 at 7:26 PM
 >> Subject: How to deserialize Avro enum type in Flink SQL?
 >> To: user mailto:user@flink.apache.org>>
 >>
 >>
 >> Hi community,
 >>
 >> I have a Kafka topic where the schema of its values is defined
by the
 >> "MyRecord" record in the following Avro IDL and registered to
the Confluent
 >> Schema Registry.
 >>
 >>> @namespace("my.type.avro")
 >>> protocol MyProtocol {
 >>>    enum MyEnumType {
 >>>      TypeVal1, TypeVal2
 >>>    }
 >>>    record MyEntry {
 >>>      MyEnumType type;
 >>>    }
 >>>    record MyRecord {
 >>>      array entries;
 >>>    }
 >>> }
 >>
 >>
 >> To read from the topic, I've defined the following DDL:
 >>
 >>> CREATE TABLE my_table
 >>
 >> (
 >>>      `entries` ARRAY>>          *`type` ??? (This is the main question)*
 >>>      >>
 >>> ) WITH (
 >>>      'connector' = 'kafka',
 >>>      'topic' = 'my-topic',
 >>>      'properties.bootstrap.servers' = '...:9092',
 >>>      'scan.startup.mode' = 'latest-offset',
 >>>      'value.format' = 'avro-confluent',
 >>>      'value.avro-confluent.schema-registry.url' = 'http://...:8081'
 >>>
 >> )
 >>
 >>
 >> And I run the following query :
 >>
 >>> SELECT * FROM my_table
 >>
 >>
 >> Now I got the following messages in Flink-1.13.1 when I use
*STRING* for
 >> the type:
 >>
 >>> *Caused by: java.io.IOException: Failed to deserialize Avro
record.*
 >>>    at
 >>>
 

Re: Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Timo Walther

Hi Ori,

this sounds indeed strange. Can you also reproduce this behavior locally 
with a faker source? We should definitely add a profiler and see where 
the bottleneck lies.


Which Flink version and state backend are you using?

Regards,
Timo

On 20.10.21 16:17, Ori Popowski wrote:
I have a simple Flink application with a simple keyBy, a SessionWindow, 
and I use an AggregateFunction to incrementally aggregate a result, and 
write to a Sink.


Some of the requirements involve accumulating lists of fields from the 
events (for example, all URLs), so not all the values in the end should 
be primitives (although some are, like total number of events, and 
session duration).


This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic 
of my AggregateFunction's add() and merge() functions are what's ruining 
the job (i.e. causing the backpressure).


I've managed to create a reduced version of my job, where I just append 
and concatenate some of the event values and I can confirm that a 
backpressure starts just 40 minutes after launching the job:


class SimpleAggregator extends AggregateFunction[Event, Accumulator, 
Session] with LazyLogging {


override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
)

override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
accumulator._1 :+ value.getEnvUrl,
accumulator._2 :+ value.getCtxVisitId,
accumulator._3 :+ value.getVisionsSId,
accumulator._4 :+ value.getTime.longValue(),
accumulator._5 :+ value.getTime.longValue()
)
}

override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
a._1 ++ b._1,
a._2 ++ b._2,
a._3 ++ b._3,
a._4 ++ b._4,
a._5 ++ b._5
)
}

override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
.setSessionDuration(1000)
.setSessionTotalEvents(1000)
.setSId("-" + UUID.randomUUID().toString)
.build()
}
}


This is the job overall (simplified version):

class App(
source: SourceFunction[Event],
sink: SinkFunction[Session]
) {

def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
.assignAscendingTimestamps(_.getTime)
.keyBy(event => (event.getWmUId, event.getWmEnv, event.getSId).toString())
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
.allowedLateness(0.seconds.asFlinkTime)
.process(new ProcessFunction).uid("process-session")
.addSink(sink).uid("sink")

senv.execute("session-aggregation")
}
}


After 3 weeks of grueling debugging, profiling, checking the 
serialization and more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just 
aggregates all the events behind the scenes and just gives them to me as 
an iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the 
ProcessWindowFunction actually aggregates more data, and also does 
concatenations and appends, for some reason there's no backpressure.


To finish this long post, what I'm trying to understand here is why when 
I collected the events using an AggregateFunction there was a 
backpressure, and when Flink does this for me with ProcessWindowFunction 
there's no backpressure? It seems to me something is fundamentally wrong 
here, since it means I cannot do any non-reducing operations without 
creating backpressure. I think it shouldn't cause the backpressure I 
experienced. I'm trying to understand what I did wrong here.


Thanks!




(无主题)

2021-10-20 Thread TROY

退订

Re: Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-20 Thread Peter Schrott
Hi Timo,

thanks a lot for your suggestion.

I also considered this workaround but when going from DataStreams API to
Table API (using the POJO generated by maven avro plugin) types are not
mapped correctly, esp. UTF8 (avros implementation of CharSquence) and also
enums. In the table I have then mostly RAW types, which are not handy to
perform SQL statements on. It is already discussed here:
https://www.mail-archive.com/user@flink.apache.org/msg9.html

Best, Peter

On Wed, Oct 20, 2021 at 5:21 PM Timo Walther  wrote:

> A current workaround is to use DataStream API to read the data and
> provide your custom Avro schema to configure the format. Then switch to
> Table API.
>
> StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum
> classes will be represented as RAW types but you can forward them as
> blackboxes or convert them in a UDF.
>
> We will further improve the support of external types in the Table API
> type system in the near future.
>
> Regards,
> Timo
>
> On 20.10.21 15:51, Peter Schrott wrote:
> > Hi people!
> >
> > I was digging deeper this days and found the "root cause" of the issue
> and the difference between avro reading from files and avro reading from
> Kafka & SR.
> >
> > plz see:
> https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
> >
> > The main problem with Kafka & SR is, that the
> "org.apache.avro.generic.GenericDatumReader" is initialized with and
> "expected" schema which is taken from the flinks sql table definition. When
> it comes to deserializing the and attribute with type "enum" it does not
> match with the expected schema where this same attribute is typed as
> "string". Hence avro deserializer breaks here.
> >
> > Not sure how to tackle that issue. The functioning of the
> "GeneraticDatumReader" can not really be changed. A solution could be to
> create an analogues reader for reading data based on SQL ddl.
> >
> > Cheers, Peter
> >
> > On 2021/10/12 16:18:30 Dongwon Kim wrote:
> >> Hi community,
> >>
> >> Can I get advice on this question?
> >>
> >> Another user just sent me an email asking whether I found a solution or
> a
> >> workaround for this question, but I'm still stuck there.
> >>
> >> Any suggestions?
> >>
> >> Thanks in advance,
> >>
> >> Dongwon
> >>
> >> -- Forwarded message -
> >> From: Dongwon Kim 
> >> Date: Mon, Aug 9, 2021 at 7:26 PM
> >> Subject: How to deserialize Avro enum type in Flink SQL?
> >> To: user 
> >>
> >>
> >> Hi community,
> >>
> >> I have a Kafka topic where the schema of its values is defined by the
> >> "MyRecord" record in the following Avro IDL and registered to the
> Confluent
> >> Schema Registry.
> >>
> >>> @namespace("my.type.avro")
> >>> protocol MyProtocol {
> >>>enum MyEnumType {
> >>>  TypeVal1, TypeVal2
> >>>}
> >>>record MyEntry {
> >>>  MyEnumType type;
> >>>}
> >>>record MyRecord {
> >>>  array entries;
> >>>}
> >>> }
> >>
> >>
> >> To read from the topic, I've defined the following DDL:
> >>
> >>> CREATE TABLE my_table
> >>
> >> (
> >>>  `entries` ARRAY >>>  *`type` ??? (This is the main question)*
> >>>  >>
> >>> ) WITH (
> >>>  'connector' = 'kafka',
> >>>  'topic' = 'my-topic',
> >>>  'properties.bootstrap.servers' = '...:9092',
> >>>  'scan.startup.mode' = 'latest-offset',
> >>>  'value.format' = 'avro-confluent',
> >>>  'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >>>
> >> )
> >>
> >>
> >> And I run the following query :
> >>
> >>> SELECT * FROM my_table
> >>
> >>
> >> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> >> the type:
> >>
> >>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >>>at
> >>>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >>>at
> >>>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >>>at
> >>>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >>>at
> >>>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >>>at
> >>>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >>>at
> >>>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >>>at
> >>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >>>at
> >>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >>>at
> >>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >>>at
> >>>
> 

Re: Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-20 Thread Timo Walther
A current workaround is to use DataStream API to read the data and 
provide your custom Avro schema to configure the format. Then switch to 
Table API.


StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum 
classes will be represented as RAW types but you can forward them as 
blackboxes or convert them in a UDF.


We will further improve the support of external types in the Table API 
type system in the near future.


Regards,
Timo

On 20.10.21 15:51, Peter Schrott wrote:

Hi people!

I was digging deeper this days and found the "root cause" of the issue and the 
difference between avro reading from files and avro reading from Kafka & SR.

plz see: 
https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E

The main problem with Kafka & SR is, that the "org.apache.avro.generic.GenericDatumReader" is initialized 
with and "expected" schema which is taken from the flinks sql table definition. When it comes to deserializing 
the and attribute with type "enum" it does not match with the expected schema where this same attribute is typed 
as "string". Hence avro deserializer breaks here.

Not sure how to tackle that issue. The functioning of the 
"GeneraticDatumReader" can not really be changed. A solution could be to create 
an analogues reader for reading data based on SQL ddl.

Cheers, Peter

On 2021/10/12 16:18:30 Dongwon Kim wrote:

Hi community,

Can I get advice on this question?

Another user just sent me an email asking whether I found a solution or a
workaround for this question, but I'm still stuck there.

Any suggestions?

Thanks in advance,

Dongwon

-- Forwarded message -
From: Dongwon Kim 
Date: Mon, Aug 9, 2021 at 7:26 PM
Subject: How to deserialize Avro enum type in Flink SQL?
To: user 


Hi community,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.


@namespace("my.type.avro")
protocol MyProtocol {
   enum MyEnumType {
 TypeVal1, TypeVal2
   }
   record MyEntry {
 MyEnumType type;
   }
   record MyRecord {
 array entries;
   }
}



To read from the topic, I've defined the following DDL:


CREATE TABLE my_table


(

 `entries` ARRAY>
) WITH (
 'connector' = 'kafka',
 'topic' = 'my-topic',
 'properties.bootstrap.servers' = '...:9092',
 'scan.startup.mode' = 'latest-offset',
 'value.format' = 'avro-confluent',
 'value.avro-confluent.schema-registry.url' = 'http://...:8081'


)


And I run the following query :


SELECT * FROM my_table



Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:


*Caused by: java.io.IOException: Failed to deserialize Avro record.*
   at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
   at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
   at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
   at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
   at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
   at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
   at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
   at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
   at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
   at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
*Caused by: org.apache.avro.AvroTypeException: Found
my.type.avro.MyEnumType, expecting union*
   at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
   at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
   at
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
   at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
   at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
 

Re: Metric Scopes purpose

2021-10-20 Thread Chesnay Schepler
The metric scope options are used by reporters that do not rely on tags, 
to generate a fully-qualified metric name.
The prometheus reporter identifies metrics in a different way (a generic 
scope like jobmanager.job.myMetric + a bunch of tags to select a 
specific instance) instead, and ignores the metric scope entirely.


On 20/10/2021 16:28, JP MB wrote:

Hi,
Sorry for highlighting this one again.
Can someone provide me some light on the purpose of metric scope 
configurations and where can I see the immediate results of changing 
these properties?


Regards,
José Brandão

Em ter., 19 de out. de 2021 às 18:19, JP MB 
 escreveu:


Hello,
I have been playing with metric scopes and I'm not sure if I
understood them correctly.
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/

For instance,

  * |metrics.scope.task|
  o Default:
.taskmanager
  o Applied to all metrics that were scoped to a task.

*What is supposed to happen when if remove the 
scope from there? *

  * |metrics.scope.task=|.taskmanager...


I have performed this change but was unable to see any differences
in the metrics that are being exported by the reporter I'm
currently using which is the PrometheusReporter.

*Also, is there any place where I can see the "raw"/pre-reporter
metrics?
*
Regards,
José Brandão



Re: Metric Scopes purpose

2021-10-20 Thread JP MB
Hi,
Sorry for highlighting this one again.
Can someone provide me some light on the purpose of metric scope
configurations and where can I see the immediate results of changing these
properties?

Regards,
José Brandão

Em ter., 19 de out. de 2021 às 18:19, JP MB 
escreveu:

> Hello,
> I have been playing with metric scopes and I'm not sure if I understood
> them correctly.
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/
>
> For instance,
>
>- metrics.scope.task
>   - Default:
>   .taskmanager
>   - Applied to all metrics that were scoped to a task.
>
> *What is supposed to happen when if remove the  scope from
> there? *
>
>- metrics.scope.task=.taskmanager...
>
>
> I have performed this change but was unable to see any differences in the
> metrics that are being exported by the reporter I'm currently using which
> is the PrometheusReporter.
>
>
> *Also, is there any place where I can see the "raw"/pre-reporter metrics?*
> Regards,
> José Brandão
>
>


Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Ori Popowski
I have a simple Flink application with a simple keyBy, a SessionWindow, and
I use an AggregateFunction to incrementally aggregate a result, and write
to a Sink.

Some of the requirements involve accumulating lists of fields from the
events (for example, all URLs), so not all the values in the end should be
primitives (although some are, like total number of events, and session
duration).

This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic of
my AggregateFunction's add() and merge() functions are what's ruining the
job (i.e. causing the backpressure).

I've managed to create a reduced version of my job, where I just append and
concatenate some of the event values and I can confirm that a backpressure
starts just 40 minutes after launching the job:

class SimpleAggregator extends AggregateFunction[Event,
Accumulator, Session] with LazyLogging {

  override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
  )

  override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
  accumulator._1 :+ value.getEnvUrl,
  accumulator._2 :+ value.getCtxVisitId,
  accumulator._3 :+ value.getVisionsSId,
  accumulator._4 :+ value.getTime.longValue(),
  accumulator._5 :+ value.getTime.longValue()
)
  }

  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
  a._1 ++ b._1,
  a._2 ++ b._2,
  a._3 ++ b._3,
  a._4 ++ b._4,
  a._5 ++ b._5
)
  }

  override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
  .setSessionDuration(1000)
  .setSessionTotalEvents(1000)
  .setSId("-" + UUID.randomUUID().toString)
  .build()
  }
}


This is the job overall (simplified version):

class App(
  source: SourceFunction[Event],
  sink: SinkFunction[Session]
) {

  def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
  .assignAscendingTimestamps(_.getTime)
  .keyBy(event => (event.getWmUId, event.getWmEnv,
event.getSId).toString())
  
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
  .allowedLateness(0.seconds.asFlinkTime)
  .process(new ProcessFunction).uid("process-session")
  .addSink(sink).uid("sink")

senv.execute("session-aggregation")
  }
}


After 3 weeks of grueling debugging, profiling, checking the serialization
and more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just
aggregates all the events behind the scenes and just gives them to me as an
iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the
ProcessWindowFunction actually aggregates more data, and also does
concatenations and appends, for some reason there's no backpressure.

To finish this long post, what I'm trying to understand here is why when I
collected the events using an AggregateFunction there was a backpressure,
and when Flink does this for me with ProcessWindowFunction there's no
backpressure? It seems to me something is fundamentally wrong here, since
it means I cannot do any non-reducing operations without creating
backpressure. I think it shouldn't cause the backpressure I experienced.
I'm trying to understand what I did wrong here.

Thanks!


Looking back at the Apache Flink 1.14 development cycle / getting better for 1.15

2021-10-20 Thread Johannes Moser
Dear Flink community,

In preparation for the 1.15 development cycle of Apache Flink (it already 
started) and preparing the release management we are collecting feedback from 
the community.
If you didn’t have a chance to look at the release announcement you might want 
to do that now [1]

Also watch out for the 1.14 release meetup [2] tomorrow and Flink Forward [3] 
next week.

-

We’d love to improve the experience for contributors and users going forward 
and if you want to help us do so I’d be very thankful to get some answers to 
the following questions.

** What features/changes will make your live easier/harder?

** Would you like to be more/less informed about the development process?

** In case you have contributed to that release, how was your experience?

** Do you have any questions regarding the 1.14 release?

-

I will share an anonymous overview of the feedback at a later stage.
Thanks for the answers in advance.

Best,
Joe


[1] https://flink.apache.org/news/2021/09/29/release-1.14.0.html 

[2] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/281406310 

[3] https://www.flink-forward.org/global-2021 
 



Re: Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-20 Thread Peter Schrott
Hi people!

I was digging deeper this days and found the "root cause" of the issue and the 
difference between avro reading from files and avro reading from Kafka & SR.

plz see: 
https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E

The main problem with Kafka & SR is, that the 
"org.apache.avro.generic.GenericDatumReader" is initialized with and "expected" 
schema which is taken from the flinks sql table definition. When it comes to 
deserializing the and attribute with type "enum" it does not match with the 
expected schema where this same attribute is typed as "string". Hence avro 
deserializer breaks here.

Not sure how to tackle that issue. The functioning of the 
"GeneraticDatumReader" can not really be changed. A solution could be to create 
an analogues reader for reading data based on SQL ddl.

Cheers, Peter

On 2021/10/12 16:18:30 Dongwon Kim wrote:
> Hi community,
> 
> Can I get advice on this question?
> 
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
> 
> Any suggestions?
> 
> Thanks in advance,
> 
> Dongwon
> 
> -- Forwarded message -
> From: Dongwon Kim 
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user 
> 
> 
> Hi community,
> 
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
> 
> > @namespace("my.type.avro")
> > protocol MyProtocol {
> >   enum MyEnumType {
> > TypeVal1, TypeVal2
> >   }
> >   record MyEntry {
> > MyEnumType type;
> >   }
> >   record MyRecord {
> > array entries;
> >   }
> > }
> 
> 
> To read from the topic, I've defined the following DDL:
> 
> > CREATE TABLE my_table
> 
> (
> > `entries` ARRAY > *`type` ??? (This is the main question)*
> > >>
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'my-topic',
> > 'properties.bootstrap.servers' = '...:9092',
> > 'scan.startup.mode' = 'latest-offset',
> > 'value.format' = 'avro-confluent',
> > 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >
> )
> 
> 
> And I run the following query :
> 
> > SELECT * FROM my_table
> 
> 
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
> 
> > *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >   at
> > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >   at
> > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >   at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >   at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> > *Caused by: org.apache.avro.AvroTypeException: Found
> > my.type.avro.MyEnumType, expecting union*
> >   at
> > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >   at
> > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >   at
> > 

回复:flink作业的停止

2021-10-20 Thread lei-tian
我不是用的sql,但是也是同样的问题




| |
totorobabyfans
|
|
邮箱:totorobabyf...@163.com
|

签名由 网易邮箱大师 定制

在2021年10月20日 16:21,Kenyore Woo 写道:
我遇到过和你一模一样的问题。
如果你使用的也是Flink SQL,你可以在configuration中增加table.dml_sync=true的配置。这个配置对我有用。
详见TableEnvironment.executeInternal
On Oct 20, 2021 at 09:06:54, lei-tian  wrote:

> Hi , yuepeng-pan:
> 你好,我这边提交的是数据yarn的per-job的模式,Flink的UI界面上在任务running状态下可以看到jobmanager的日志和taskmanager的日志,任务finished或者failed后它会出现在UI界面上的Completed
> Job List,同时左边的tm的点击去后已经没有相关信息,只有jm有相关信息,
> 应该是JM资源没有被释放。
>
>
> | |
> lei-tian
> |
> |
> totorobabyf...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年10月19日 10:53,Yuepeng Pan 写道:
> Hi,
> lei-tian.
> 基于你的描述,我推测(flink-1.10+)会存在这几种可能。
> 1. 使用了
> flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager
> container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式
> 的部署方式中,这是正常的。
> 2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的
> 算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase
> IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager
> container和jobmanager container 都未释放。
> 3.其他。
> 如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on
> yarn 的per-job的部署方式运行任务。
>
>
>
>
> 祝好。
> Roc
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-10-18 21:31:21,"lei-tian"  写道:
> 您好:
>
> 我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。
>
>
> | |
> lei-tian
> |
> |
> totorobabyf...@163.com
> |
> 签名由网易邮箱大师定制
>


Re: Reset of transient variables in state to default values.

2021-10-20 Thread Yun Tang
Hi,

For RocksDB state backend, it will pick the registered kryo serializer for 
normal read/write use and checkpint/restore. Moreover, since key-values are 
serialized to store in RocksDB, it actually deep copy them to avoid later 
unexpected modification.

For FileSystem/HashMap state backend, it will pick the registered kryo 
serializer only for checkpoint/restore. Since java-based state backend would 
not deep copy key-values for performance reasons, it might be changed 
unexpectedly if user misused, which might make the field reset to default value.

Best,
Yun Tang

From: Arvid Heise 
Sent: Monday, October 18, 2021 20:30
To: Alex Drobinsky 
Cc: JING ZHANG ; Yun Tang ; User-Flink 

Subject: Re: Reset of transient variables in state to default values.

That's what I would try out, but I'm not sure if the statebackend would pick 
that up. @Yun Tang do you know more?

On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky 
mailto:alex.drobin...@gmail.com>> wrote:
Hi Arvid,

It sounds like a good direction, do I need to register my state class with 
KryoSerializer , similar to this ?

env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
 ProtobufSerializer.class);


пн, 18 окт. 2021 г. в 10:32, Arvid Heise 
mailto:ar...@apache.org>>:
Hi Alex,

could you also log the identifity hashcode (or something similar) of the 
related instance? I suspect that it's not the field that is set to null but 
that you get a clone where the field is null. In that case, you need to add a 
specific KryoSerializer to initialize it (or just go with a lazy access pattern 
all the way).

On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky 
mailto:alex.drobin...@gmail.com>> wrote:
Hi Jing,

Job doesn't restart from the checkpoint, it's a brand new clean job , no 
exceptions happened during execution, no restarts :)
The state is a Keyed State so a new key means a new State - in this situation a 
currentFile is equal to null - as expected and handled without issues.
Before I even thought to inquire about my questions, the first thing I did - I 
added log messages with the value of currentFile in any place it could be 
changed.
So I checked that before I release my control to Flink, currentFile has the 
correct value and after I receive value from state in the next iteration it's 
set to null.
The checkpoints by themselves could be irrelevant to the problem, the only 
indication of connection is my assumption based on observation that the 
interval between first event and first occurrence of nullification is exactly 
the same as the checkpoint interval.

Yun Tang - you are correct, it's a KryoSerializer, if I remove the "transient" 
qualifier from currentFile, it crashes inside of KryoSerializer because 
RandomAccessFile isn't serializable.
Which also points to the fact that at least once serialization was actually 
executed. I will try an alternative approach - I will add my own writeObject 
implementation, it should work :)

Best regards,
Alex






вт, 12 окт. 2021 г. в 15:07, JING ZHANG 
mailto:beyond1...@gmail.com>>:
Hi Alex,
Since you use `FileSystemStateBackend`, I think currentFile became nullified 
once in a while is not caused by period checkpoint.

Because if job is running without failover or restore from checkpoint, 
read/write value state on `FileSystemStateBackend` does not cause serialization 
and deserialization at all. I have already simplify your coding and verify this 
point. If you are interested, you can find this simplified code in the 
attachment of the email.

There are some possible reasons come to my mind, I hope this helps.
1. Does job restore from checkpoint/savepoint? This may caused by failover or 
user trigger stop-with-savepoint.
2. If job does not restore from checkpoint or savepoint.
 2.1 After read the MultiStorePacketState from ValueState, is there 
somewhere in your program to update the currentFile field to null again? 
Because the state stored in heap,  it may be changed if program updates its 
value somewhere.
 2.2 When the currentFile became null, is there any possible that current 
key never appear before? that is it's the first time that the current key 
appears, so get state would return default value(a new MultiStorePacketState 
instance with null currentFile)

Best,
JING ZHANG

Yun Tang mailto:myas...@live.com>> 于2021年10月12日周二 下午4:41写道:
Hi Alex,

Since you use customized MultiStorePacketState class as the value state type, 
it should use kryo serializer [1] to serialize your class via accessing RocksDB 
state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo 
would serialize your transient field.
If you're not familiar with Flink's serialization stack, I think you could 
check behaviors below:

  1.  Without any checkpoint restore, use FileSystemStateBackend to see whether 
the transient field could be read as expected, the answer should be yes.
  2.  After restoring from 

Re: Wired rows in SQL Query Result (Changelog)

2021-10-20 Thread JING ZHANG
Hi kingofthecity,
>
> But when I change the result-mode from TABLE to CHANGELOG,
> there are 4 rows for each update like:
>
 The results are expected when the result-mode is `CHANGELOG`.

> SELECT sector, avg(`value`) as `index` FROM stock INNER JOIN metadata ON
> stock.id=metadata.id GROUP BY sector;
>
In this case, the topology is source -> join -> aggregate.
For an update for a stock source, let's say (id1, 1) -> (id1,2).
When the input record (id1, 2) arrives at join operator. the output of join
would be two record:
-U (id1,  1, 'Technology'),
+ U(id1, 2, 'Technology')
Those two record arrives at downstream aggregate, each record would leads
to two output record (-U, +U). so each update at source would lead to 4
output records.
Hope this helps.

Best,
JING ZHANG



thekingofcity  于2021年10月20日周三 下午5:52写道:

> Hi,
>
> I'm working on a simple sql that contains aggregation.
>
> The results looks fine in the `SQL Query Result (Table)`
> but looks weird when I change the result-mode to CHANGELOG.
>
> ```
> CREATE TABLE stock (
>   id VARCHAR(10) NOT NULL PRIMARY KEY,
>   `value` DOUBLE NOT NULL
> ) WITH (
>   'connector' = 'pravega',
>   'controller-uri' = 'tcp://pravega:9090',
>   'scope' = 'stock',
>   'scan.execution.type' = 'streaming',
>   'scan.streams' = 'dbserver1.stock.stock',
>   'format' = 'debezium-json'
> );
>
> CREATE TABLE metadata (
>   id VARCHAR(10) NOT NULL PRIMARY KEY,
>   sector STRING NOT NULL
> ) WITH (
>   'connector' = 'pravega',
>   'controller-uri' = 'tcp://pravega:9090',
>   'scope' = 'stock',
>   'scan.execution.type' = 'streaming',
>   'scan.streams' = 'dbserver1.stock.metadata',
>   'format' = 'debezium-json'
> );
>
> SELECT sector, avg(`value`) as `index` FROM stock INNER JOIN metadata ON
> stock.id=metadata.id GROUP BY sector;
> ```
>
> The sql should return a continuously updating stock average numbers with
> small changes:
>
> And this is exactly what I see in the `SQL Query Result (Table)`.
>
> ```
>  sector  index
>  Technology 115.95986429850261
>   Consumer Cyclical  482.5149917602539
> Industrials 167.49693044026694
> ```
>
> But when I change the result-mode from TABLE to CHANGELOG,
> there are 4 rows for each update like:
>
> ```
> -U[Technology, 115.77986653645833]
> +U[Technology, 103.54979705810547]
> -U[Technology, 103.54979705810547]
> +U[Technology, 115.81319681803386]
>
> -U[Technology, 115.81319681803386]
> +U[Technology, 103.42479705810547]
> -U[Technology, 103.42479705810547]
> +U[Technology, 115.82653299967448]
>
> -U[Technology, 115.82653299967448]
> +U[Technology, 140.48730087280273]
> -U[Technology, 140.48730087280273]
> +U[Technology, 115.84153493245442]
>
> -U[Industrials, 167.40963745117188]
> +U[Industrials, 140.22450256347656]
> -U[Industrials, 140.22450256347656]
> +U[Industrials, 167.43633524576822]
> ```
>
> > I insert blank line every 4 lines to get a better visualization.
>
> The technology sector should mostly be around 115 +/- 5,
> but there are 103, 140, and 167 which are wired since stock value can't
> fluctuate like this
> and I can be sure because TABLE and my own python code shows the same
> correct (small changes) outcome.
>
> With many thanks,
> kingofthecity
>
>


Re: [DISCUSS] Creating an external connector repository

2021-10-20 Thread Jark Wu
Hi Konstantin,

> the connectors need to be adopted and require at least one release per
Flink minor release.
However, this will make the releases of connectors slower, e.g. maintain
features for multiple branches and release multiple branches.
I think the main purpose of having an external connector repository is in
order to have "faster releases of connectors"?


>From the perspective of CDC connector maintainers, the biggest advantage of
maintaining it outside of the Flink project is that:
1) we can have a more flexible and faster release cycle
2) we can be more liberal with committership for connector maintainers
which can also attract more committers to help the release.

Personally, I think maintaining one connector repository under the ASF may
not have the above benefits.

Best,
Jark

On Wed, 20 Oct 2021 at 15:14, Konstantin Knauf  wrote:

> Hi everyone,
>
> regarding the stability of the APIs. I think everyone agrees that
> connector APIs which are stable across minor versions (1.13->1.14) are the
> mid-term goal. But:
>
> a) These APIs are still quite young, and we shouldn't make them @Public
> prematurely either.
>
> b) Isn't this *mostly* orthogonal to where the connector code lives? Yes,
> as long as there are breaking changes, the connectors need to be adopted
> and require at least one release per Flink minor release.
> Documentation-wise this can be addressed via a compatibility matrix for
> each connector as Arvid suggested. IMO we shouldn't block this effort on
> the stability of the APIs.
>
> Cheers,
>
> Konstantin
>
>
>
> On Wed, Oct 20, 2021 at 8:56 AM Jark Wu  wrote:
>
>> Hi,
>>
>> I think Thomas raised very good questions and would like to know your
>> opinions if we want to move connectors out of flink in this version.
>>
>> (1) is the connector API already stable?
>> > Separate releases would only make sense if the core Flink surface is
>> > fairly stable though. As evident from Iceberg (and also Beam), that's
>> > not the case currently. We should probably focus on addressing the
>> > stability first, before splitting code. A success criteria could be
>> > that we are able to build Iceberg and Beam against multiple Flink
>> > versions w/o the need to change code. The goal would be that no
>> > connector breaks when we make changes to Flink core. Until that's the
>> > case, code separation creates a setup where 1+1 or N+1 repositories
>> > need to move lock step.
>>
>> From another discussion thread [1], connector API is far from stable.
>> Currently, it's hard to build connectors against multiple Flink versions.
>> There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
>>  maybe also in the future versions,  because Table related APIs are still
>> @PublicEvolving and new Sink API is still @Experimental.
>>
>>
>> (2) Flink testability without connectors.
>> > Flink w/o Kafka connector (and few others) isn't
>> > viable. Testability of Flink was already brought up, can we really
>> > certify a Flink core release without Kafka connector? Maybe those
>> > connectors that are used in Flink e2e tests to validate functionality
>> > of core Flink should not be broken out?
>>
>> This is a very good question. How can we guarantee the new Source and Sink
>> API are stable with only test implementation?
>>
>>
>> Best,
>> Jark
>>
>>
>>
>>
>>
>> On Tue, 19 Oct 2021 at 23:56, Chesnay Schepler 
>> wrote:
>>
>> > Could you clarify what release cadence you're thinking of? There's quite
>> > a big range that fits "more frequent than Flink" (per-commit, daily,
>> > weekly, bi-weekly, monthly, even bi-monthly).
>> >
>> > On 19/10/2021 14:15, Martijn Visser wrote:
>> > > Hi all,
>> > >
>> > > I think it would be a huge benefit if we can achieve more frequent
>> > releases
>> > > of connectors, which are not bound to the release cycle of Flink
>> itself.
>> > I
>> > > agree that in order to get there, we need to have stable interfaces
>> which
>> > > are trustworthy and reliable, so they can be safely used by those
>> > > connectors. I do think that work still needs to be done on those
>> > > interfaces, but I am confident that we can get there from a Flink
>> > > perspective.
>> > >
>> > > I am worried that we would not be able to achieve those frequent
>> releases
>> > > of connectors if we are putting these connectors under the Apache
>> > umbrella,
>> > > because that means that for each connector release we have to follow
>> the
>> > > Apache release creation process. This requires a lot of manual steps
>> and
>> > > prohibits automation and I think it would be hard to scale out
>> frequent
>> > > releases of connectors. I'm curious how others think this challenge
>> could
>> > > be solved.
>> > >
>> > > Best regards,
>> > >
>> > > Martijn
>> > >
>> > > On Mon, 18 Oct 2021 at 22:22, Thomas Weise  wrote:
>> > >
>> > >> Thanks for initiating this discussion.
>> > >>
>> > >> There are definitely a few things that are not optimal with our
>> > >> current management of connectors. I 

Wired rows in SQL Query Result (Changelog)

2021-10-20 Thread thekingofcity
Hi,

I'm working on a simple sql that contains aggregation.

The results looks fine in the `SQL Query Result (Table)`
but looks weird when I change the result-mode to CHANGELOG.

```
CREATE TABLE stock (
id VARCHAR(10) NOT NULL PRIMARY KEY,
`value` DOUBLE NOT NULL
) WITH (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'stock',
'scan.execution.type' = 'streaming',
'scan.streams' = 'dbserver1.stock.stock',
'format' = 'debezium-json'
);

CREATE TABLE metadata (
id VARCHAR(10) NOT NULL PRIMARY KEY,
sector STRING NOT NULL
) WITH (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'stock',
'scan.execution.type' = 'streaming',
'scan.streams' = 'dbserver1.stock.metadata',
'format' = 'debezium-json'
);

SELECT sector, avg(`value`) as `index` FROM stock INNER JOIN metadata ON 
stock.id=metadata.id GROUP BY sector;
```

The sql should return a continuously updating stock average numbers with small 
changes:

And this is exactly what I see in the `SQL Query Result (Table)`.

```
sector index
Technology 115.95986429850261
Consumer Cyclical 482.5149917602539
Industrials 167.49693044026694
```

But when I change the result-mode from TABLE to CHANGELOG,
there are 4 rows for each update like:

```
-U[Technology, 115.77986653645833]
+U[Technology, 103.54979705810547]
-U[Technology, 103.54979705810547]
+U[Technology, 115.81319681803386]

-U[Technology, 115.81319681803386]
+U[Technology, 103.42479705810547]
-U[Technology, 103.42479705810547]
+U[Technology, 115.82653299967448]

-U[Technology, 115.82653299967448]
+U[Technology, 140.48730087280273]
-U[Technology, 140.48730087280273]
+U[Technology, 115.84153493245442]

-U[Industrials, 167.40963745117188]
+U[Industrials, 140.22450256347656]
-U[Industrials, 140.22450256347656]
+U[Industrials, 167.43633524576822]
```

> I insert blank line every 4 lines to get a better visualization.

The technology sector should mostly be around 115 +/- 5,
but there are 103, 140, and 167 which are wired since stock value can't 
fluctuate like this
and I can be sure because TABLE and my own python code shows the same correct 
(small changes) outcome.

With many thanks,
kingofthecity

Re: 回复:flink作业的停止

2021-10-20 Thread Kenyore Woo
我遇到过和你一模一样的问题。
如果你使用的也是Flink SQL,你可以在configuration中增加table.dml_sync=true的配置。这个配置对我有用。
详见TableEnvironment.executeInternal
On Oct 20, 2021 at 09:06:54, lei-tian  wrote:

> Hi , yuepeng-pan:
> 你好,我这边提交的是数据yarn的per-job的模式,Flink的UI界面上在任务running状态下可以看到jobmanager的日志和taskmanager的日志,任务finished或者failed后它会出现在UI界面上的Completed
> Job List,同时左边的tm的点击去后已经没有相关信息,只有jm有相关信息,
> 应该是JM资源没有被释放。
>
>
> | |
> lei-tian
> |
> |
> totorobabyf...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年10月19日 10:53,Yuepeng Pan 写道:
> Hi,
> lei-tian.
> 基于你的描述,我推测(flink-1.10+)会存在这几种可能。
> 1. 使用了
> flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager
> container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式
> 的部署方式中,这是正常的。
> 2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的
> 算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase
> IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager
> container和jobmanager container 都未释放。
> 3.其他。
> 如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on
> yarn 的per-job的部署方式运行任务。
>
>
>
>
> 祝好。
> Roc
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-10-18 21:31:21,"lei-tian"  写道:
> 您好:
>
> 我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。
>
>
> | |
> lei-tian
> |
> |
> totorobabyf...@163.com
> |
> 签名由网易邮箱大师定制
>


Re: [External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-20 Thread Chesnay Schepler
What version are you using, and if you are using 1.13+, are you using 
the adaptive scheduler or reactive mode?


On 20/10/2021 07:39, Clemens Valiente wrote:

Hi Chesnay,
thanks a lot for the clarification.
We managed to resolve the collision, and isolated a problem to the 
metrics themselves.


Using the REST API at /jobs//metrics?get=uptime
the response is [{"id":"uptime","value":"-1"}]
despite the job running and processing data for 5 days at that point. 
All task,taskmanager, and jobmanager related metrics seem fine, only 
the job metrics are incorrect. Basically all of these do not have 
correct metrics:

[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"lastCheckpointProcessedData"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]
Looking at the Gauge the only way it can return -1 is 
when isTerminalState() is true which I don't think can be the case in 
a running application.

Do you know where we can check on what went wrong?

Best Regards
Clemens


On Thu, Oct 14, 2021 at 8:55 PM Chesnay Schepler  
wrote:


I think you are misunderstanding a few things.

a) when you include a variable in the scope format, then Flink
fills that in /before/ it reaches Datadog. If you set it to
"flink.", then what we send to Datadog is
"flink.myAwesomeJob".
b) the exception you see is not coming from Datadog. They occur
because, based on the configured scope formats, metrics from
different jobs running in the same JobManager resolve to the same
name (the standby jobmanger is irrelevant). Flink rejects these
metrics, because if were to send these out you'd get funny results
in Datadog because all jobs would try to report the same metric.

In short, you need to include the job id or job name in the
metrics.scope.jm.job scope formats.

On 13/10/2021 06:39, Clemens Valiente wrote:

Hi,

we are using datadog as our metrics reporter as documented here:

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#datadog

our jobmanager scope is
metrics.scope.jm : flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager
since datadog doesn't allow placeholder in metric names, we
cannot include the  or  placeholder in the scope.

This setup worked nicely on our standalone kubernetes application
deployment without using HA.
But when we set up HA, we lost checkpointing metrics in datadog,
and see this warning in the jobmanager log:
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'totalNumberOfCheckpoints'. Metric will not be reported.[flink, jobmanager]
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'numberOfInProgressCheckpoints'. Metric will not be reported.[flink, 
jobmanager]
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'numberOfCompletedCheckpoints'. Metric will not be reported.[flink, 
jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'numberOfFailedCheckpoints'. Metric will not be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'lastCheckpointRestoreTimestamp'. Metric will not be reported.[flink, 
jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'lastCheckpointSize'. Metric will not be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'lastCheckpointDuration'. Metric will not be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'lastCheckpointProcessedData'. Metric will not be reported.[flink, 
jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name'lastCheckpointPersistedData'. Metric will not be reported.[flink, 
jobmanager]

Re: Programmatically configuring S3 settings

2021-10-20 Thread Arvid Heise
We had a similar thread on this ML where a user is executing through IDE.
It seems as FileSystems are not automatically initialized in LocalExecutor
and you should do it manually as a workaround [1] in your main before
accessing the FileSystems.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L319-L319

On Tue, Oct 19, 2021 at 6:36 PM Pavel Penkov  wrote:

> I've placed a flink-conf.yaml file in conf dir but
> StreamExecutionEnvironment.getExecutionEnvironment doesn't pick it up. If
> set programmatically keys are visible in Flink Web UI, they are just not
> passed to Hadoop FS.
>
> On 2021/10/18 03:04:04, Yangze Guo  wrote:
> > Hi, Pavel.>
> >
> > From my understanding of the doc[1], you need to set it in>
> > flink-conf.yaml instead of your job.>
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
>
> >
> > Best,>
> > Yangze Guo>
> >
> > On Sat, Oct 16, 2021 at 5:46 AM Pavel Penkov  wrote:>
> > >>
> > > Apparently Flink 1.14.0 doesn't correctly translate S3 options when
> they are set programmatically. I'm creating a local environment like this
> to connect to local MinIO instance:>
> > >>
> > >   val flinkConf = new Configuration()>
> > >   flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000;)>
> > >   flinkConf.setString("s3.aws.credentials.provider",
> "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")>
> > >>
> > >   val env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)>
> > >>
> > > Then StreamingFileSink fails with a huge stack trace with most
> relevant messages being Caused by:
> org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials
> provided by SimpleAWSCredentialsProvider
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
> com.amazonaws.SdkClientException: Failed to connect to service endpoint:
>  which means that Hadoop tried to enumerate all of the credential providers
> instead of using the one set in configuration. What am I doing wrong?>
> >
>


Re: Unable to create connection to Azure Data Lake Gen2 with abfs: "Configuration property {storage_account}.dfs.core.windows.net not found"

2021-10-20 Thread Arvid Heise
Hi Preston,

Before FileSystems are accessed, they are first initialized with the
configuration coming from the flink-conf. [1]
It seems that your local execution in IDE is bypassing that, so I'd call
that manually. Which entry point are you using? We should probably fix it,
such that it's also calling initialize.
Also make sure your config options are prefixed with (fs.)azure. because
only they are relayed.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L319-L319

On Wed, Oct 20, 2021 at 12:30 AM Preston Price  wrote:

> Some details about my runtime/environment:
> Java 11
> Flink version 1.14.0
> Running locally in IntelliJ
>
> The error message that I am getting is: Configuration property
> {storage_account}.dfs.core.windows.net not found.
> Reading through all the docs hasn't yielded much help.
>
> In the Flink docs here
> ,
> it's claimed that we can set the credentials for ABFS by specifying the
> value in flink-conf.yaml, so this is what I am trying. However, in the
> code path expressed in the stack trace, I don't believe the configuration
> loaded from flink-conf.yaml is ever consulted. Here are the relevant
> parts of the stack trace:
> Caused by:
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException:
> Configuration property {storage_account}.dfs.core.windows.net not found.
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:372)
> ~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1133)
> ~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.(AzureBlobFileSystemStore.java:174)
> ~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
> ~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
> at
> org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)
> ~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
> ~[flink-core-1.14.0.jar:1.14.0]
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> ~[flink-core-1.14.0.jar:1.14.0]
>
> When we first call org.apache.flink.core.fs.FileSystem.get the
> FileSystemFactories are initialized with a new/empty configuration
> ,
> not the configuration loaded from flink-conf.yaml Therefore, later on
> when we're calling AbstractAzureFSFactory.create
> 
>  we
> have an empty config, so the call
> to HadoopConfigLoader.getOrLoadHadoopConfig() then
> HadoopConfigLoader.loadHadoopConfigFromFlink
> 
>  can't
> merge in our config from flink-conf.yaml
>
> So if the Configuration loaded from flink-conf.yaml isn't supplied to the
> AbstractAzureFSFactory, how do we configure Flink to connect to Azure Data
> Lake?
>
> Thanks!
>


RE: Troubleshooting checkpoint timeout

2021-10-20 Thread Alexis Sarda-Espinosa
Currently the windows are 10 minutes in size with a 1-minute slide time. The 
approximate 500 event/minute throughput is already rather high for my use case, 
so I don’t expect it to be higher, but I would imagine that’s still pretty low.

I did have some issues with storage space, and I wouldn’t be surprised if there 
is an IO bottleneck in my dev environment, but then my main question would be: 
if IO is being throttled, could that result in the high “start delay” times I 
observe? That seems to be the main slowdown, so I just want to be sure I’m 
looking in the right direction.

I’d like to mention another thing about my pipeline’s structure in case it’s 
relevant, although it may be completely unrelated. I said that I specify the 
windowing properties once (windowedStream in my 1st e-mail) and use it twice, 
but it’s actually used 3 times. In addition to the 2 ProcessWindowFunctions 
that end in sinks, the stream is also joined with a side output:

openedEventsTimestamped = openedEvents
.getSideOutput(…)
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy)

windowedStream
.process(ProcessWindowFunction3())
.keyBy(keySelector)

.connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped, 
keySelector))
.process(...)

Could this lead to delays or alignment issues?

Regards,
Alexis.

From: Parag Somani 
Sent: Mittwoch, 20. Oktober 2021 09:22
To: Caizhi Weng 
Cc: Alexis Sarda-Espinosa ; Flink ML 

Subject: Re: Troubleshooting checkpoint timeout

I had similar problem, where i have concurrent two checkpoints were configured. 
Also, i used to save it in S3(using minio) on k8s 1.18 env.

Flink service were getting restarted and timeout was happening. It got resolved:
1. As minio ran out of disk space, caused failure of checkpoints(this was the 
main cause).
2. Added duration/interval of checkpoint parameter to address it
execution.checkpointing.max-concurrent-checkpoints and 
execution.checkpointing.min-pause
Details of same at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing


On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:
Hi!

I see you're using sliding event time windows. What's the exact value of 
windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is large 
and windowSlideTimeMinutes is small then each record may be assigned to a large 
number of windows as the pipeline proceeds, thus gradually slows down 
checkpointing and finally causes a timeout.

Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年10月19日周二 下午7:29写道:
Hello everyone,

I am doing performance tests for one of our streaming applications and, after 
increasing the throughput a bit (~500 events per minute), it has started 
failing because checkpoints cannot be completed within 10 minutes. The Flink 
cluster is not exactly under my control and is running on Kubernetes with 
version 1.11.3 and RocksDB backend.

I can access the UI and logs and have confirmed:


  *   Logs do indicate expired checkpoints.
  *   There is no backpressure in any operator.
  *   When checkpoints do complete (seemingly at random):

 *   Size is 10-20MB.
 *   Sync and Async durations are at most 1-2 seconds.
 *   In one of the tasks, alignment takes 1-3 minutes, but start delays 
grow to up to 5 minutes.

  *   The aforementioned task (the one with 5-minute start delay) has 8 
sub-tasks and I see no indication of data skew. When the checkpoint times out, 
none of the sub-tasks have acknowledged the checkpoint.

The problematic task that is failing very often (and holding back downstream 
tasks) consists of the following operations:

timestampedEventStream = events
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy);

windowedStream = 
DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, keySelector)
.window(SlidingEventTimeWindows.of(
Time.minutes(windowLengthMinutes),
Time.minutes(windowSlideTimeMinutes)))
.allowedLateness(Time.minutes(allowedLatenessMinutes));

windowedStream
.process(new ProcessWindowFunction1(config))
// add sink

windowedStream
.process(new ProcessWindowFunction2(config))
// add sink

Both window functions are using managed state, but nothing out of the ordinary 
(as mentioned above, state size is actually very small). Do note that the same 
windowedStream is used twice.

I don’t see any obvious runtime issues and I don’t think the load is 
particularly high, but maybe there’s something wrong in my pipeline definition? 
What else could cause these timeouts?

Regards,
Alexis.



--
Regards,
Parag Surajmal Somani.


Re: Troubleshooting checkpoint timeout

2021-10-20 Thread Parag Somani
I had similar problem, where i have concurrent two checkpoints were
configured. Also, i used to save it in S3(using minio) on k8s 1.18 env.

Flink service were getting restarted and timeout was happening. It got
resolved:
1. As minio ran out of disk space, caused failure of checkpoints(this was
the main cause).
2. Added duration/interval of checkpoint parameter to address it
execution.checkpointing.max-concurrent-checkpoints and
execution.checkpointing.min-pause
Details of same at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing


On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng  wrote:

> Hi!
>
> I see you're using sliding event time windows. What's the exact value of
> windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is
> large and windowSlideTimeMinutes is small then each record may be assigned
> to a large number of windows as the pipeline proceeds, thus gradually slows
> down checkpointing and finally causes a timeout.
>
> Alexis Sarda-Espinosa 
> 于2021年10月19日周二 下午7:29写道:
>
>> Hello everyone,
>>
>>
>>
>> I am doing performance tests for one of our streaming applications and,
>> after increasing the throughput a bit (~500 events per minute), it has
>> started failing because checkpoints cannot be completed within 10 minutes.
>> The Flink cluster is not exactly under my control and is running on
>> Kubernetes with version 1.11.3 and RocksDB backend.
>>
>>
>>
>> I can access the UI and logs and have confirmed:
>>
>>
>>
>>- Logs do indicate expired checkpoints.
>>- There is no backpressure in any operator.
>>- When checkpoints do complete (seemingly at random):
>>   - Size is 10-20MB.
>>   - Sync and Async durations are at most 1-2 seconds.
>>   - In one of the tasks, alignment takes 1-3 minutes, but start
>>   delays grow to up to 5 minutes.
>>- The aforementioned task (the one with 5-minute start delay) has 8
>>sub-tasks and I see no indication of data skew. When the checkpoint times
>>out, none of the sub-tasks have acknowledged the checkpoint.
>>
>>
>>
>> The problematic task that is failing very often (and holding back
>> downstream tasks) consists of the following operations:
>>
>>
>>
>> timestampedEventStream = events
>>
>> .keyBy(keySelector)
>>
>> .assignTimestampsAndWatermarks(watermarkStrategy);
>>
>>
>>
>> windowedStream =
>> DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream,
>> keySelector)
>>
>> .window(SlidingEventTimeWindows.of(
>>
>> Time.minutes(windowLengthMinutes),
>>
>> Time.minutes(windowSlideTimeMinutes)))
>>
>> .allowedLateness(Time.minutes(allowedLatenessMinutes));
>>
>>
>>
>> windowedStream
>>
>> .process(new ProcessWindowFunction1(config))
>>
>> // add sink
>>
>>
>>
>> windowedStream
>>
>> .process(new ProcessWindowFunction2(config))
>>
>> // add sink
>>
>>
>>
>> Both window functions are using managed state, but nothing out of the
>> ordinary (as mentioned above, state size is actually very small). Do note
>> that the same windowedStream is used twice.
>>
>>
>>
>> I don’t see any obvious runtime issues and I don’t think the load is
>> particularly high, but maybe there’s something wrong in my pipeline
>> definition? What else could cause these timeouts?
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>

-- 
Regards,
Parag Surajmal Somani.


Re: [DISCUSS] Creating an external connector repository

2021-10-20 Thread Konstantin Knauf
Hi everyone,

regarding the stability of the APIs. I think everyone agrees that
connector APIs which are stable across minor versions (1.13->1.14) are the
mid-term goal. But:

a) These APIs are still quite young, and we shouldn't make them @Public
prematurely either.

b) Isn't this *mostly* orthogonal to where the connector code lives? Yes,
as long as there are breaking changes, the connectors need to be adopted
and require at least one release per Flink minor release.
Documentation-wise this can be addressed via a compatibility matrix for
each connector as Arvid suggested. IMO we shouldn't block this effort on
the stability of the APIs.

Cheers,

Konstantin



On Wed, Oct 20, 2021 at 8:56 AM Jark Wu  wrote:

> Hi,
>
> I think Thomas raised very good questions and would like to know your
> opinions if we want to move connectors out of flink in this version.
>
> (1) is the connector API already stable?
> > Separate releases would only make sense if the core Flink surface is
> > fairly stable though. As evident from Iceberg (and also Beam), that's
> > not the case currently. We should probably focus on addressing the
> > stability first, before splitting code. A success criteria could be
> > that we are able to build Iceberg and Beam against multiple Flink
> > versions w/o the need to change code. The goal would be that no
> > connector breaks when we make changes to Flink core. Until that's the
> > case, code separation creates a setup where 1+1 or N+1 repositories
> > need to move lock step.
>
> From another discussion thread [1], connector API is far from stable.
> Currently, it's hard to build connectors against multiple Flink versions.
> There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
>  maybe also in the future versions,  because Table related APIs are still
> @PublicEvolving and new Sink API is still @Experimental.
>
>
> (2) Flink testability without connectors.
> > Flink w/o Kafka connector (and few others) isn't
> > viable. Testability of Flink was already brought up, can we really
> > certify a Flink core release without Kafka connector? Maybe those
> > connectors that are used in Flink e2e tests to validate functionality
> > of core Flink should not be broken out?
>
> This is a very good question. How can we guarantee the new Source and Sink
> API are stable with only test implementation?
>
>
> Best,
> Jark
>
>
>
>
>
> On Tue, 19 Oct 2021 at 23:56, Chesnay Schepler  wrote:
>
> > Could you clarify what release cadence you're thinking of? There's quite
> > a big range that fits "more frequent than Flink" (per-commit, daily,
> > weekly, bi-weekly, monthly, even bi-monthly).
> >
> > On 19/10/2021 14:15, Martijn Visser wrote:
> > > Hi all,
> > >
> > > I think it would be a huge benefit if we can achieve more frequent
> > releases
> > > of connectors, which are not bound to the release cycle of Flink
> itself.
> > I
> > > agree that in order to get there, we need to have stable interfaces
> which
> > > are trustworthy and reliable, so they can be safely used by those
> > > connectors. I do think that work still needs to be done on those
> > > interfaces, but I am confident that we can get there from a Flink
> > > perspective.
> > >
> > > I am worried that we would not be able to achieve those frequent
> releases
> > > of connectors if we are putting these connectors under the Apache
> > umbrella,
> > > because that means that for each connector release we have to follow
> the
> > > Apache release creation process. This requires a lot of manual steps
> and
> > > prohibits automation and I think it would be hard to scale out frequent
> > > releases of connectors. I'm curious how others think this challenge
> could
> > > be solved.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Mon, 18 Oct 2021 at 22:22, Thomas Weise  wrote:
> > >
> > >> Thanks for initiating this discussion.
> > >>
> > >> There are definitely a few things that are not optimal with our
> > >> current management of connectors. I would not necessarily characterize
> > >> it as a "mess" though. As the points raised so far show, it isn't easy
> > >> to find a solution that balances competing requirements and leads to a
> > >> net improvement.
> > >>
> > >> It would be great if we can find a setup that allows for connectors to
> > >> be released independently of core Flink and that each connector can be
> > >> released separately. Flink already has separate releases
> > >> (flink-shaded), so that by itself isn't a new thing. Per-connector
> > >> releases would need to allow for more frequent releases (without the
> > >> baggage that a full Flink release comes with).
> > >>
> > >> Separate releases would only make sense if the core Flink surface is
> > >> fairly stable though. As evident from Iceberg (and also Beam), that's
> > >> not the case currently. We should probably focus on addressing the
> > >> stability first, before splitting code. A success criteria could be
> > >> that we are able to build 

Re: [DISCUSS] Creating an external connector repository

2021-10-20 Thread Jark Wu
Hi,

I think Thomas raised very good questions and would like to know your
opinions if we want to move connectors out of flink in this version.

(1) is the connector API already stable?
> Separate releases would only make sense if the core Flink surface is
> fairly stable though. As evident from Iceberg (and also Beam), that's
> not the case currently. We should probably focus on addressing the
> stability first, before splitting code. A success criteria could be
> that we are able to build Iceberg and Beam against multiple Flink
> versions w/o the need to change code. The goal would be that no
> connector breaks when we make changes to Flink core. Until that's the
> case, code separation creates a setup where 1+1 or N+1 repositories
> need to move lock step.

>From another discussion thread [1], connector API is far from stable.
Currently, it's hard to build connectors against multiple Flink versions.
There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
 maybe also in the future versions,  because Table related APIs are still
@PublicEvolving and new Sink API is still @Experimental.


(2) Flink testability without connectors.
> Flink w/o Kafka connector (and few others) isn't
> viable. Testability of Flink was already brought up, can we really
> certify a Flink core release without Kafka connector? Maybe those
> connectors that are used in Flink e2e tests to validate functionality
> of core Flink should not be broken out?

This is a very good question. How can we guarantee the new Source and Sink
API are stable with only test implementation?


Best,
Jark





On Tue, 19 Oct 2021 at 23:56, Chesnay Schepler  wrote:

> Could you clarify what release cadence you're thinking of? There's quite
> a big range that fits "more frequent than Flink" (per-commit, daily,
> weekly, bi-weekly, monthly, even bi-monthly).
>
> On 19/10/2021 14:15, Martijn Visser wrote:
> > Hi all,
> >
> > I think it would be a huge benefit if we can achieve more frequent
> releases
> > of connectors, which are not bound to the release cycle of Flink itself.
> I
> > agree that in order to get there, we need to have stable interfaces which
> > are trustworthy and reliable, so they can be safely used by those
> > connectors. I do think that work still needs to be done on those
> > interfaces, but I am confident that we can get there from a Flink
> > perspective.
> >
> > I am worried that we would not be able to achieve those frequent releases
> > of connectors if we are putting these connectors under the Apache
> umbrella,
> > because that means that for each connector release we have to follow the
> > Apache release creation process. This requires a lot of manual steps and
> > prohibits automation and I think it would be hard to scale out frequent
> > releases of connectors. I'm curious how others think this challenge could
> > be solved.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Mon, 18 Oct 2021 at 22:22, Thomas Weise  wrote:
> >
> >> Thanks for initiating this discussion.
> >>
> >> There are definitely a few things that are not optimal with our
> >> current management of connectors. I would not necessarily characterize
> >> it as a "mess" though. As the points raised so far show, it isn't easy
> >> to find a solution that balances competing requirements and leads to a
> >> net improvement.
> >>
> >> It would be great if we can find a setup that allows for connectors to
> >> be released independently of core Flink and that each connector can be
> >> released separately. Flink already has separate releases
> >> (flink-shaded), so that by itself isn't a new thing. Per-connector
> >> releases would need to allow for more frequent releases (without the
> >> baggage that a full Flink release comes with).
> >>
> >> Separate releases would only make sense if the core Flink surface is
> >> fairly stable though. As evident from Iceberg (and also Beam), that's
> >> not the case currently. We should probably focus on addressing the
> >> stability first, before splitting code. A success criteria could be
> >> that we are able to build Iceberg and Beam against multiple Flink
> >> versions w/o the need to change code. The goal would be that no
> >> connector breaks when we make changes to Flink core. Until that's the
> >> case, code separation creates a setup where 1+1 or N+1 repositories
> >> need to move lock step.
> >>
> >> Regarding some connectors being more important for Flink than others:
> >> That's a fact. Flink w/o Kafka connector (and few others) isn't
> >> viable. Testability of Flink was already brought up, can we really
> >> certify a Flink core release without Kafka connector? Maybe those
> >> connectors that are used in Flink e2e tests to validate functionality
> >> of core Flink should not be broken out?
> >>
> >> Finally, I think that the connectors that move into separate repos
> >> should remain part of the Apache Flink project. Larger organizations
> >> tend to approve the use of and contribution to open