Re:Re:Re: flink sql collect函数使用问题

2021-12-03 Thread RS
SELECT class_no, collect(info)

FROM (

SELECT class_no, ROW(student_no, name, age) AS info

FROM source_table

)

GROUP BY class_no;


从SQL层面想到比较接近的方法,但multiset无法转array


从你的需求描述看,mongodb目标表的这种班级设计平时可能不太需要,如果是为了查某个班所有的学生的话,在查询的时候加个where条件即可,没有必要把明细数据再放到一个数组里面
感觉可能是你定义表结构和实际使用方面的问题,可以换个角度思考下

在 2021-12-03 08:36:57,"casel.chen"  写道:
>可我要的最终结果不是string,最好是通用的Row类型,这样的话下次聚合其他维度就不用重复开发UDF了。
>类似我这样的需求应该其他人也会遇到吧?
>功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。
>   输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身
>
>
>目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-12-02 09:58:28,"cyril cui"  写道:
>>af里acc为个list,merge的时候合并,输出的时候 list拼成string即可
>>
>>casel.chen  于2021年12月2日周四 上午9:46写道:
>>
>>> 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group
>>> by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql?
>>> 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!
>>>
>>> kafka源表:
>>> 班级 学号  姓名  年龄
>>> 1 20001张三   15
>>> 2 20011李四   16
>>> 1 20002王五   16
>>> 2 20012吴六   15
>>>
>>> create table source_table (
>>>class_no: INT,
>>>student_no: INT,
>>>name: STRING,
>>>age: INT
>>> ) with (
>>>'connector' = 'kafka',
>>>...
>>> );
>>>
>>>
>>>
>>> 通过flink sql处理输出 ==>
>>>
>>>
>>> mongodb目标表:
>>> 班级 学生信息
>>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>>> 20002, "name":"王五", "age": 16}]
>>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>>> 20012, "name":"吴六", "age": 15}]
>>>
>>> create table sink_table (
>>>   class_no INT,
>>>   students: ARRAY>
>>> ) with (
>>>   'connector' = 'mongodb',
>>>   ...
>>> );
>>>
>>>


Re: Order of events in Broadcast State

2021-12-03 Thread Alexey Trenikhun
[1] - 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/
The Broadcast State Pattern | Apache 
Flink
The Broadcast State Pattern # In this section you will learn about how to use 
broadcast state in practise. Please refer to Stateful Stream Processing to 
learn about the concepts behind stateful stream processing. Provided APIs # To 
show the provided APIs, we will start with an example before presenting their 
full functionality. As our running example, we will use the case where we have 
a ...
nightlies.apache.org



From: Alexey Trenikhun 
Sent: Friday, December 3, 2021 4:33 PM
To: Flink User Mail List 
Subject: Order of events in Broadcast State

Hello,
Trying to understand what statement "Order of events in Broadcast State may 
differ across tasks" in [1] means.
Let's say I have keyed function "A" which broadcasting stream of rules, 
KeyedBroadcastProcessFunction  "B" receives rules and updates broadcast state, 
like example in [1]. Let's say "A" broadcasts "rule 1" with name X, then "A" 
(same key) broadcasts "rule 2" with same name X, is there guarantee that 
eventually broadcast state will contain "rule 2" or since there is no ordering, 
B could receive "rule 2", then "rule 1" and broadcast state will end up with 
{X="rule 1"} forever ?

Thanks,
Alexey


Log level for insufficient task slots message

2021-12-03 Thread Mason Chen
Hi all,

java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not acquire the minimum required resources.

Is an exception/message that is thrown when the users misconfigures the job 
with insufficient task slots. Currently, this is logged as info level in Flink 
1.13.3 making it hard to find the error on our Splunk logging infrastructure.

What do people think about with regards to changing this logging to ERROR level?

Best,
Mason

Order of events in Broadcast State

2021-12-03 Thread Alexey Trenikhun
Hello,
Trying to understand what statement "Order of events in Broadcast State may 
differ across tasks" in [1] means.
Let's say I have keyed function "A" which broadcasting stream of rules, 
KeyedBroadcastProcessFunction  "B" receives rules and updates broadcast state, 
like example in [1]. Let's say "A" broadcasts "rule 1" with name X, then "A" 
(same key) broadcasts "rule 2" with same name X, is there guarantee that 
eventually broadcast state will contain "rule 2" or since there is no ordering, 
B could receive "rule 2", then "rule 1" and broadcast state will end up with 
{X="rule 1"} forever ?

Thanks,
Alexey


enable.auto.commit=true and checkpointing turned on

2021-12-03 Thread Vishal Santoshi
Hello folks,

2 questions
 1. If we have enabled enable.auto.commit and enabled checkpointing and we
restart a flink application ( without checkpoint or savepoint ) , would the
kafka consumer start consuming from the last offset committed to kafka.

2. What if in the above scenario, we have "auto.offset.reset" set to
"latest". ? Would that ignore the consumer group offset in kafka ?


Regards.


Re: Parquet schema per bucket in Streaming File Sink

2021-12-03 Thread Zack Loebel
Unfortunately this does not solve my use case. Because I want to be able to
create and change the various outputs at runtime (the partition keys would
be dynamic) and as such the sql/extraction would have to change during
execution.
Which I did not believe to be supported. I'm also operating at the
datastream level (although of course I could move the datastream into
sql-land).

Best,
Zack


On Tue, Nov 30, 2021 at 2:41 AM Francesco Guardiani 
wrote:

> Hi Zack,
>
> > I want to customize this job to "explode" the map as column names and
> values
>
> You can do this in a select statement extracting manually the map values
> using the map access built-in
> ,
> e.g.:
>
> SELECT mymap['a'] AS a, mymap['b'] AS b
>
> > specifically the BucketAssigner and the CheckpointRollingPolicy both
> appear to be required to have a bucketId of a String.
>
> I wonder if what you're looking for is the PARTITIONED BY feature:
>
> CREATE TABLE MySinkTable (
>   ...) PARTITIONED BY (partitionKey1, partitionKey2)
>
> Does this solves your use case?
>
> FG
>
>
> On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel  wrote:
>
>> Hey all,
>>
>> I have a job which writes data that is a similar shape to a location in
>> s3. Currently it writes a map of data with each row. I want to customize
>> this job to "explode" the map as column names and values, these are
>> consistent for a single bucket. Is there any way to do this? Provide a
>> custom parquet schema per bucket within a single dynamic sink?
>>
>> I've started looking at the changes within the main codebase to make this
>> feasible. It seems straightforward to provide the bucketId to the
>> writerFactory, and the bucketId could be a type containing the relevant
>> schema information.
>> Although it appears that the BulkFormatBuilder has several spots where
>> BucketId appears to be required to be a String: specifically
>> the BucketAssigner and the CheckpointRollingPolicy both appear to be
>> required to have a bucketId of a String.
>>
>> I'm curious if this is a change the community would be open to, and or if
>> there is another way to accomplish what I'm looking for that I've missed.
>>
>> Thanks,
>> Zack
>>
>>

-- 
Have a great day!
Zack Loebel-Begelman


Re: Stateful functions module configurations (module.yaml) per deployment environment

2021-12-03 Thread Deniz Koçak
Hi Igal,

We are using official images from Ververica as the Flink installation.
Actually, I was hoping to specify the name of file names to use during
the runtime via `mainArgs` in the deployment configuration (or any
other way may be). By this way we can specify the target yaml files,
but I think this is not possible?

===
kind: JAR
mainArgs: '--active-profile nxt'
===

Therefore, it's easier to use single jar in our pipelines instead of
creating a different jar file for each env. (at least for development
and production).

For solution 2, you refer flink distro. , like /flink/lib folder in
the official Docker image?

Thanks,
Deniz

On Fri, Dec 3, 2021 at 3:06 PM Igal Shilman  wrote:
>
> Hi Deniz,
>
> StateFun would be looking for module.yaml(s) in the classpath.
> If you are submitting the job to an existing Flink cluster this really means 
> that it needs to be either:
> 1. packaged with the jar (like you are already doing)
> 2. be present at the classpath, this means that you can place your 
> module.yaml at the /lib directory of your Flink installation, I suppose that 
> you have different installations in different environments.
>
> I'm not aware of a way to submit any additional files with the jar via the 
> flink cli, but perhaps someone else can chime in :-)
>
> Cheers,
> Igal.
>
>
> On Thu, Dec 2, 2021 at 3:29 PM Deniz Koçak  wrote:
>>
>> Hi,
>>
>> We have a simple stateful-function job, consuming from Kafka, calling
>> an HTTP endpoint (on AWS via an Elastic Load Balancer) and publishing
>> the result back via Kafka again.
>>
>> * We created a jar file to be deployed on a standalone cluster (it's
>> not a docker Image), therefore we add `statefun-flink-distribution`
>> version 3.0.0 as a dependency in that jar file.
>> * Entry class in our job configuration is
>> `org.apache.flink.statefun.flink.core.StatefulFunctionsJob` and we
>> simply keep a single module.yaml file in resources folder for the
>> module configuration.
>>
>> My question here is, we would like to deploy that jar to different
>> environments (dev. and prod.) and not sure how we can pass different
>> module configurations (module.yaml or module_nxt.yaml/module_prd.yaml)
>> to the job during startup without creating separate jar files for
>> different environments?
>>
>> Thanks,
>> Deniz


Re: Stateful function endpoint self-signed certificate problem

2021-12-03 Thread Deniz Koçak
Hi Igal,

Thanks for the response, we sorted it out by deploying the required
certs. to our images.

Thanks,
Deniz

On Fri, Dec 3, 2021 at 3:15 PM Igal Shilman  wrote:
>
> Hi Deniz,
> My apologies for the late reply, I assume that by now you have figured this 
> out since I've seen your followup question :-)
>
> StateFun uses the trust store configured in the JVM, so if you can install 
> your certificate there, StateFun should transparently pick it up.
>
> Good luck,
> Igal.
>
> On Fri, Nov 26, 2021 at 10:23 AM Deniz Koçak  wrote:
>>
>> Hi,
>>
>> We have been running a simple stateful functions (version 3.0.0) job,
>> which simply forwards the incoming messages via Kafka source to an
>> HTTPS endpoint on AWS. Our HTTP endpoint is behind a Load Balancer on
>> AWS and this Load Balancer is listening on 443 for incoming HTTPS
>> traffic. Certificate used by the LB, was created by our organization
>> so its a self signed one.
>>
>> Therefore, whenever stateful function tries to make a call to load
>> balancer on ort 443, it gives the exception below
>>
>> javax.net.ssl.SSLHandshakeException: PKIX path building failed:
>> sun.security.provider.certpath.SunCertPathBuilderException: unable to
>> find valid certification path to requested target
>>
>> I wonder how can I solve that problem or at least is it possible to
>> ignore the self-signed cert. issue via module configuration?
>>
>> spec:
>> endpoints:
>> - endpoint:
>> meta:
>> kind: http
>> spec:
>> functions: prebet/*
>> urlPathTemplate: https://AWS-LoadBalancer-Internal-Hostname
>> call: 10 min
>>
>> Thanks,


Re: Stateful function endpoint self-signed certificate problem

2021-12-03 Thread Igal Shilman
Hi Deniz,
My apologies for the late reply, I assume that by now you have figured this
out since I've seen your followup question :-)

StateFun uses the trust store configured in the JVM, so if you can install
your certificate there, StateFun should transparently pick it up.

Good luck,
Igal.

On Fri, Nov 26, 2021 at 10:23 AM Deniz Koçak  wrote:

> Hi,
>
> We have been running a simple stateful functions (version 3.0.0) job,
> which simply forwards the incoming messages via Kafka source to an
> HTTPS endpoint on AWS. Our HTTP endpoint is behind a Load Balancer on
> AWS and this Load Balancer is listening on 443 for incoming HTTPS
> traffic. Certificate used by the LB, was created by our organization
> so its a self signed one.
>
> Therefore, whenever stateful function tries to make a call to load
> balancer on ort 443, it gives the exception below
>
> javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to
> find valid certification path to requested target
>
> I wonder how can I solve that problem or at least is it possible to
> ignore the self-signed cert. issue via module configuration?
>
> spec:
> endpoints:
> - endpoint:
> meta:
> kind: http
> spec:
> functions: prebet/*
> urlPathTemplate: https://AWS-LoadBalancer-Internal-Hostname
> call: 10 min
>
> Thanks,
>


Re: Stateful functions module configurations (module.yaml) per deployment environment

2021-12-03 Thread Igal Shilman
Hi Deniz,

StateFun would be looking for module.yaml(s) in the classpath.
If you are submitting the job to an existing Flink cluster this really
means that it needs to be either:
1. packaged with the jar (like you are already doing)
2. be present at the classpath, this means that you can place your
module.yaml at the /lib directory of your Flink installation, I
suppose that you have different installations in different environments.

I'm not aware of a way to submit any additional files with the jar via the
flink cli, but perhaps someone else can chime in :-)

Cheers,
Igal.


On Thu, Dec 2, 2021 at 3:29 PM Deniz Koçak  wrote:

> Hi,
>
> We have a simple stateful-function job, consuming from Kafka, calling
> an HTTP endpoint (on AWS via an Elastic Load Balancer) and publishing
> the result back via Kafka again.
>
> * We created a jar file to be deployed on a standalone cluster (it's
> not a docker Image), therefore we add `statefun-flink-distribution`
> version 3.0.0 as a dependency in that jar file.
> * Entry class in our job configuration is
> `org.apache.flink.statefun.flink.core.StatefulFunctionsJob` and we
> simply keep a single module.yaml file in resources folder for the
> module configuration.
>
> My question here is, we would like to deploy that jar to different
> environments (dev. and prod.) and not sure how we can pass different
> module configurations (module.yaml or module_nxt.yaml/module_prd.yaml)
> to the job during startup without creating separate jar files for
> different environments?
>
> Thanks,
> Deniz
>


Re: Cannot consum from Kinesalite using FlinkKinesisConsumer

2021-12-03 Thread Mika Naylor

Hey Jonas,

May I ask what version of Kinesalite you're targeting? With 3.3.3 and
STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis
parameter cannot be greater than the currentTimestampInMillis" which may
be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION =
"TRIM_HORIZON" I was able to consume events from the stream.

This was with 1.14.0 of the Kinesis Flink connector.

Kind regards,
Mika


On 02.12.2021 23:05, jonas eyob wrote:

Hi all, I have a really simple pipeline to consume events from a local
kinesis (kinesalite) and print them out to stdout. But struggling to make
sense of why it's failing almost immediately

The pipeline code:

/* Added this to verify it wasn't a problem with AWS CBOR which needs
to be disabled */
System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true")
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true")
System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
"true")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val consumerConfig = new Properties()

consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")
consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"FAKE_ACCESS_KEY")
consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"FAKE_SECRET_ACCESS_KEY")
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST")
consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
"http://localhost:4567;)

env
 .addSource(
   new FlinkKinesisConsumer[String](
 "user-profile-events-local",
 new SimpleStringSchema,
 consumerConfig
   )
 )
 .print()

env.execute("echo stream")

When running this I am getting this:

Error I get from running this locally:

22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
switched from INITIALIZING to RUNNING.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient
createSocketFactoryRegistry
WARNING: SSL Certificate checking for endpoints has been explicitly
disabled.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
WARN  org.apache.flink.runtime.taskmanager.Task - Source: Custom Source ->
Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264) switched
from RUNNING to FAILED with failure cause:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
null (Service: AmazonKinesis; Status Code: 400; Error Code:

Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-03 Thread Lars Bachmann
Hi Alexander,

yes in the first iteration the use case is to get visibility on failed ES 
requests. Usually we expose metrics to count failures and integrate them into 
dashboards and setup alerting rules which fire in case they hit a certain 
threshold.

In not Flink based applications which index data into ES we also applied 
failure handlers which evaluated the kind of error and triggered different 
actions. For example there are errors from which you can recover 
(ConnectException because of some network issues) - in this case we retried the 
indexing request or eventually even forced a stop of the client until the 
failure was resolved. On the other hand there can be errors from which you can 
not recover. In our scenario this was the case for indexing requests with 
malformed data (mapping failures) - here we just ignored the failure and 
dropped the messages.

I’m not sure if we want to move this logic as well to the Flink applications, 
but I just wanted to mention that there can be situations where you might want 
to control the behavior of your application depending on the failure.

Best Regards,

Lars


> Am 03.12.2021 um 09:05 schrieb Alexander Preuß 
> :
> 
> Hi Lars,
> 
> What is your use case for the failure handler, just collecting metrics? We 
> want to remove the configurable failure handler in the new Sink API 
> implementation of the Elasticsearch connector in Flink 1.15 because it can be 
> a huge footgun with regards to delivery guarantees.
> 
> Best Regards,
> Alexander
> 
> On Thu, Dec 2, 2021 at 6:23 PM Lars Bachmann  > wrote:
> Hi David,
> 
> Thanks for the reply. I think especially in an error/failure handler metrics 
> are important in order to have proper monitoring/alerting in such cases. 
> Would be awesome if this could be added to Flink at some point :).
> 
> Regards,
> 
> Lars
> 
>> Am 02.12.2021 um 18:13 schrieb David Morávek > >:
>> 
>> Hi Lars,
>> 
>> quickly looking at the ES connector code, I think you're right and there is 
>> no way to do that :(  In general I'd say that being able to expose metrics 
>> is a valid request.
>> 
>> I can imagine having some kind of `RichActionRequestFailureHandler` with 
>> `{get|set}RuntimeContext` methods. More or less the same thing we already do 
>> with for example the `RichFunction`. This unfortunately requires some work 
>> on the Flink side.
>> 
>> cc @Arvid
>> 
>> On Thu, Dec 2, 2021 at 5:52 PM > > wrote:
>> Hi,
>> 
>> is there a way to expose custom metrics within an elasticsearch failure 
>> handler (ActionRequestFailureHandler)? To register custom metrics I need 
>> access to the runtime context but I don't see a way to access the 
>> context in the failure handler.
>> 
>> Thanks and regards,
>> 
>> Lars
> 



Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Till Rohrmann
Thanks for starting this discussion Yingjie,

How will our tests be affected by these changes? Will Flink require more
resources and, thus, will it risk destabilizing our testing infrastructure?

I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Cheers,
Till

On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao  wrote:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Till Rohrmann
Thanks for starting this discussion Yingjie,

How will our tests be affected by these changes? Will Flink require more
resources and, thus, will it risk destabilizing our testing infrastructure?

I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Cheers,
Till

On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao  wrote:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


回复: ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-03 Thread su wenwen
看报错和log4j 文件格式有关,log4j.properties的文件改为log4j2.xml 试一下


发件人: summer 
发送时间: 2021年12月2日 11:32
收件人: user-zh@flink.apache.org 
主题: ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed

在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default
configuration (logging only errors to the console), or user
programmatically provided configurations. Set system property
'log4j2.debug' to show Log4j 2 internal initialization logging. See
https://logging.apache.org/log4j/2.x/manual/configuration.html for
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed


请问这是什么原因造成的?


Re: PyFlink import internal packages

2021-12-03 Thread Shuiqiang Chen
Hi,

Actually, you are able to develop your app in the clean python way. It's
fine to split the code into multiple files and there is no need to call
`env.add_python_file()` explicitly. When submitting the PyFlink job you can
specify python files  and entry main module with option --pyFiles and
--pyModule[1], like:

$ ./bin/flink run --pyModule flink_app.main --pyFiles
${WORKSPACE}/flink_app

In this way, all files under the directory will be added to the PYTHONPAHT
of both the local client and the remote python UDF worker.

Hope this helps!

Best,
Shuiqiang

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#submitting-pyflink-jobs

Королькевич Михаил  于2021年12月3日周五 下午5:23写道:

> Hi Flink Team,
>
> Im trying to implement app on pyflink.
> I would like to structure the directory as follows:
>
>
> flink_app/
> data_service/
> s3.py
> filesystem.py
> validator/
> validator.py
> metrics/
> statictic.py
> quality.py
> common/
> constants.py
> main.py <- entry job
>
> Two questions:
> 1) is it possible import constants from common in the data_service
> package? In clean python we can use an absolute path like "from
> flink_app.common import constants".
> All files imported to flink "
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_python_file('/path_to_flink_app/flink_app')
> "
> 2) Can I split the pipeline from main.py to many files like import env to
> another files and return datastream/table back.
>


回复: 退订

2021-12-03 Thread su wenwen
退订发送到 user-zh-unsubscr...@flink.apache.org


发件人: ™薇维苿尉℃ 
发送时间: 2021年12月3日 17:34
收件人: user-zh 
主题: 退订

退订


????

2021-12-03 Thread ?6?4??????????


????????

2021-12-03 Thread ?6?4??????????


PyFlink import internal packages

2021-12-03 Thread Королькевич Михаил
Hi Flink Team, Im trying to implement app on pyflink. I would like to structure the directory as follows:  flink_app/data_service/s3.pyfilesystem.pyvalidator/validator.pymetrics/statictic.pyquality.pycommon/constants.pymain.py <- entry job Two questions:1) is it possible import constants from common in the data_service package? In clean python we can use an absolute path like "from flink_app.common import constants".All files imported to flink "env = StreamExecutionEnvironment.get_execution_environment()env.add_python_file('/path_to_flink_app/flink_app')"2) Can I split the pipeline from main.py to many files like import env to another files and return datastream/table back.

[DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Yingjie Cao
Hi dev & users,

We propose to change some default values of blocking shuffle to improve the
user out-of-box experience (not influence streaming). The default values we
want to change are as follows:

1. Data compression
(taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
default value is 'false'.  Usually, data compression can reduce both disk
and network IO which is good for performance. At the same time, it can save
storage space. We propose to change the default value to true.

2. Default shuffle implementation
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
value is 'Integer.MAX', which means by default, Flink jobs will always use
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
both stability and performance. So we propose to reduce the default value
to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
1024 with a tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
default value is '32M'. Previously, when choosing the default value, both
‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
way. However, recently, it is reported in the mailing list that the default
value is not enough which caused a buffer request timeout issue. We already
created a ticket to improve the behavior. At the same time, we propose to
increase this default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default
value is '64' which means '64' network buffers (32k per buffer by default).
This default value is quite modest and the performance can be influenced.
We propose to increase this value to a larger one, for example, 512 (the
default TM and network buffer configuration can serve more than 10
result partitions concurrently).

We already tested these default values together with tpc-ds benchmark in a
cluster and both the performance and stability improved a lot. These
changes can help to improve the out-of-box experience of blocking shuffle.
What do you think about these changes? Is there any concern? If there are
no objections, I will make these changes soon.

Best,
Yingjie


[DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Yingjie Cao
Hi dev & users,

We propose to change some default values of blocking shuffle to improve the
user out-of-box experience (not influence streaming). The default values we
want to change are as follows:

1. Data compression
(taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
default value is 'false'.  Usually, data compression can reduce both disk
and network IO which is good for performance. At the same time, it can save
storage space. We propose to change the default value to true.

2. Default shuffle implementation
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
value is 'Integer.MAX', which means by default, Flink jobs will always use
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
both stability and performance. So we propose to reduce the default value
to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
1024 with a tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
default value is '32M'. Previously, when choosing the default value, both
‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
way. However, recently, it is reported in the mailing list that the default
value is not enough which caused a buffer request timeout issue. We already
created a ticket to improve the behavior. At the same time, we propose to
increase this default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default
value is '64' which means '64' network buffers (32k per buffer by default).
This default value is quite modest and the performance can be influenced.
We propose to increase this value to a larger one, for example, 512 (the
default TM and network buffer configuration can serve more than 10
result partitions concurrently).

We already tested these default values together with tpc-ds benchmark in a
cluster and both the performance and stability improved a lot. These
changes can help to improve the out-of-box experience of blocking shuffle.
What do you think about these changes? Is there any concern? If there are
no objections, I will make these changes soon.

Best,
Yingjie


Re: Re: how to run streaming process after batch process is completed?

2021-12-03 Thread Joern Kottmann
Hello,

Are there plans to support checkpoints for batch mode? I currently load the
state back via the DataStream API, but this gets more and more complicated
and doesn't always lead to a perfect state restore (as flink could have
done).

This is one of my most wanted Flink features these days.

Regards,
Jörn




On Thu, Dec 2, 2021 at 9:24 AM Yun Gao  wrote:

> Hi Vtygoss,
>
> Very thanks for sharing the scenarios!
>
> Currently for batch mode checkpoint is not support, thus it could not
> create a snapshot after the job is finished. However, there might be some
> alternative solutions:
>
> 1. Hybrid source [1] targets at allowing first read from a bounded source,
> then switch
> to an unbounded source, which seems to work in this case. however,
> currently it might not
> support the table / sql yet, which might be done in 1.15.
> 2. The batch job might first write the result to an intermediate table,
> then for the unbounded
> stream job, it might first load the table into state with DataStream API
> on startup or use dimension
> join to continue processing new records.
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>
> --Original Mail --
> *Sender:*vtygoss 
> *Send Date:*Wed Dec 1 17:52:17 2021
> *Recipients:*Alexander Preuß 
> *CC:*user@flink.apache.org 
> *Subject:*Re: how to run streaming process after batch process is
> completed?
>
>> Hi Alexander,
>>
>>
>> This is my ideal data pipeline.
>>
>> - 1. Sqoop transfer bounded data from database to hive. And I think flink
>> batch process is more efficient than streaming process, so i want to
>> process this bounded data in batch mode and write result in HiveTable2.
>>
>> - 2. There ares some tools to transfer CDC / BINLOG to kafka, and to
>> write incremental unbounded data in HiveTable1.  I want to process this
>> unbounded data in streaming mode and update incremental result in
>> HiveTable2.
>>
>>
>> So this is the problem. The flink streaming sql application cannot be
>> restored from  batch process application. e.g. SQL: insert into table_2
>> select count(1) from table_1. In batch mode, the result stored in table_2
>> is N. And i expect that the accumulator number starts from N, not 0 when
>> streaming process started.
>>
>>
>> Thanks for your reply.
>>
>>
>> Best Regard!
>>
>>
>> (sending again because I accidentally left out the user ml in the reply
>> on the first try)...
>>
>> 在 2021年11月30日 21:42,Alexander Preuß 写道:
>>
>> Hi Vtygoss,
>>
>> Can you explain a bit more about your ideal pipeline? Is the batch data
>> bounded data or could you also process it in streaming execution mode? And
>> is the streaming data derived from the batch data or do you just want to
>> ensure that the batch has been finished before running the processing of
>> the streaming data?
>>
>> Best Regards,
>> Alexander
>>
>> (sending again because I accidentally left out the user ml in the reply
>> on the first try)
>>
>> On Tue, Nov 30, 2021 at 12:38 PM vtygoss  wrote:
>>
>>> Hi, community!
>>>
>>>
>>> By Flink, I want to unify batch process and streaming process in data
>>> production pipeline. Batch process is used to process inventory data, then
>>> streaming process is used to process incremental data. But I meet a
>>> problem, there is no  state in batch and the result is error if i run
>>> stream process directly.
>>>
>>>
>>> So how to run streaming process accurately  after batch process is
>>> completed?   Is there any doc or demo to handle this scenario?
>>>
>>>
>>> Thanks for your any reply or suggestion!
>>>
>>>
>>> Best Regards!
>>>
>>>
>>>
>>>
>>>


Re: Pyflink/Flink Java parquet streaming file sink for a dynamic schema stream

2021-12-03 Thread Georg Heiler
Hi,

the schema of the after part depends on each table i.e. holds different
columns for each table.
So do you receive debezium changelog statements for all/ >1 table? I.e. is
the schema in the after part different?

Best,
Georg

Am Fr., 3. Dez. 2021 um 08:35 Uhr schrieb Kamil ty :

> Yes the general JSON schema should follow a debezium JSON schema. The
> fields that need to be saved to the parquet file are in the "after" key.
>
> On Fri, 3 Dec 2021, 07:10 Georg Heiler,  wrote:
>
>> Do the JSONs have the same schema overall? Or is each potentially
>> structured differently?
>>
>> Best,
>> Georg
>>
>> Am Fr., 3. Dez. 2021 um 00:12 Uhr schrieb Kamil ty :
>>
>>> Hello,
>>>
>>> I'm wondering if there is a possibility to create a parquet streaming
>>> file sink in Pyflink (in Table API) or in Java Flink (in Datastream api).
>>>
>>> To give an example of the expected behaviour. Each element of the stream
>>> is going to contain a json string. I want to save this stream to parquet
>>> files without having to explicitly define the schema/types of the messages
>>> (also using a single sink).
>>>
>>> If this is possible, (might be in Java Flink using a custom
>>> ParquetBulkWriterFactory etc.) any direction for the implementation would
>>> be appreciated.
>>>
>>> Best regards
>>> Kamil
>>>
>>


Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-03 Thread Alexander Preuß
Hi Lars,

What is your use case for the failure handler, just collecting metrics? We
want to remove the configurable failure handler in the new Sink API
implementation of the Elasticsearch connector in Flink 1.15 because it can
be a huge footgun with regards to delivery guarantees.

Best Regards,
Alexander

On Thu, Dec 2, 2021 at 6:23 PM Lars Bachmann 
wrote:

> Hi David,
>
> Thanks for the reply. I think especially in an error/failure handler
> metrics are important in order to have proper monitoring/alerting in such
> cases. Would be awesome if this could be added to Flink at some point :).
>
> Regards,
>
> Lars
>
> Am 02.12.2021 um 18:13 schrieb David Morávek :
>
> Hi Lars,
>
> quickly looking at the ES connector code, I think you're right and there
> is no way to do that :(  In general I'd say that being able to expose
> metrics is a valid request.
>
> I can imagine having some kind of `RichActionRequestFailureHandler` with
> `{get|set}RuntimeContext` methods. More or less the same thing we already
> do with for example the `RichFunction`. This unfortunately requires some
> work on the Flink side.
>
> cc @Arvid
>
> On Thu, Dec 2, 2021 at 5:52 PM  wrote:
>
>> Hi,
>>
>> is there a way to expose custom metrics within an elasticsearch failure
>> handler (ActionRequestFailureHandler)? To register custom metrics I need
>> access to the runtime context but I don't see a way to access the
>> context in the failure handler.
>>
>> Thanks and regards,
>>
>> Lars
>>
>
>