Flink1.14 table api & sql针对递增维度聚合 ttl是如何处理的

2021-12-10 Thread guanyq
请大佬指导下:
需求: 通过flink sql 统计每天各个省份的订单受理量,显然这种维度统计时递增,如何设置ttl,只想让维度存储1周的数据。
维度递增很可能会导致内存溢出,请教下flink sql ttl 配置在官网哪里有说明么。



 





 

Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

2021-12-10 Thread Alexey Trenikhun
Hello,
I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts after 
some time, in log below job runs ~8 minutes, then suddenly leadership was 
revoked, job reaches terminal state and K8s restarts failed JM:

{"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1) 
(47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to 
RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager 
akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was revoked 
leadership. Clearing fencing 
token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping 
DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping 
KubernetesLeaderRetrievalDriver{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is 
closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot 
manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner was 
revoked the leadership with leader id 138b4029-88eb-409f-98cc-e296fe400eb8. 
Stopping the 
DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping 
SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher 
akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently 
running jobs of dispatcher 
akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster for 
job 
gim().","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim 
() switched from state RUNNING to 
SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"org.apache.flink.util.FlinkException:
 Scheduler is being stopped.\n\tat 
org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat
 org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat 

Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-10 Thread narasimha
Folks, what about the veverica platform. Is there any mitigation around it?

On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler  wrote:

> I would recommend to modify your log4j configurations to set
> log4j2.formatMsgNoLookups to true*.*
>
> As far as I can tell this is equivalent to upgrading log4j, which just
> disabled this lookup by default.
>
> On 10/12/2021 10:21, Richard Deurwaarder wrote:
>
> Hello,
>
> There has been a log4j2 vulnerability made public
> https://www.randori.com/blog/cve-2021-44228/ which is making some waves :)
> This post even explicitly mentions Apache Flink:
> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
>
> And fortunately, I saw this was already on your radar:
> https://issues.apache.org/jira/browse/FLINK-25240
>
> What would the advice be for flink users? Do you expect to push a minor to
> fix this? Or is it advisable to upgrade to the latest log4j2 version
> manually for now?
>
> Thanks for any advice!
>
>
>

-- 
A.Narasimha Swamy


Re:flink本地编译卡住

2021-12-10 Thread Yuepeng Pan



图片挂掉了,可以放到图床或者附带一些原提示信息。













在 2021-12-11 11:19:51,"Jeff"  写道:

根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C 
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?




 





 

flink本地编译卡住

2021-12-10 Thread Jeff
根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C 
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?




 

Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options

2021-12-10 Thread Timothy James
Hi,

The Hadoop s3a library itself supports some properties we need, but the
"FileSystem SQL Connector" (via FileSystemTableFactory) does not pass
connector options for these to the "Hadoop/Presto S3 File Systems plugins"
(via S3FileSystemFactory).

Instead, only Job-global Flink config values are passed to Hadoop s3a.
That won't work for us: we need to vary these values per Flink SQL table,
and not override our config for other use of S3 (such as Flink
checkpointing).

Contrast this with the Kafka connector, which supports an analogous
"properties.*" prefixed pass-through mechanism, and the Kinesis connector,
which supports all the specific properties we would need out of the box.

Our current intent is to alter FileSystemTableFactory to follow the
"properties.*" approach used by the Kafka connector.

*** ➡️ Our questions for you: ⬅️
- Know of anything like this? Anybody solved this?
- Know of anything that's going to break this approach?
- What are we missing?

For context, our particular use case requires options like:
- fs.s3a.assumed.role.arn
- fs.s3a.aws.credentials.provider, (or some other mechanism to pass
externalId)

We imagine there would be other use cases for this, and if we build it
ourselves there's the possibility of contributing it to the Flink repo for
everybody.

Relevant documentation:
-
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
-
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
-
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
-
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties
-
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid

Thank you!

Tim James
Decodable.co


Re: broadcast() without arguments

2021-12-10 Thread Alexey Trenikhun
Thank you Roman

From: Roman Khachatryan 
Sent: Friday, December 10, 2021 1:48:08 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: broadcast() without arguments

Hello,

The broadcast() without arguments can be used the same way as a
regular data stream, i.e.  regular transformations can be applied to
it. The difference is that every element will be sent to all
downstream subtasks and not just one.

The difference with broadcast() with arguments is that the latter only
allows to connect the broadcasted stream with another one, for example
to match the elements from two streams against each other.

Regards,
Roman

On Fri, Dec 10, 2021 at 6:37 AM Alexey Trenikhun  wrote:
>
> Hello,
> How broadcast() method without arguments should be used ?
>
> Thanks,
> Alexey


Latency monitoring in Flink 1.14.0

2021-12-10 Thread Geldenhuys, Morgan Karl
Greetings all,


I am attempting to setup latency monitoring for a flink 1.14.0 job. According 
to the 
documentation,
 I have done the following:


In my kubernetes setup I have added the following to the kubernetes-session.sh 
command:

-Dmetrics.latency.granularity=\"operator\" \
-Dmetrics.latency.interval=1000 \

However, when looking at Prometheus, I do not see histograms related to latency 
(prometheus is configured correctly).


I have added the following to my job pom and am using the new 
KafkaSource/KafkaSink classes:



org.apache.flink
flink-streaming-java_2.11
1.14.0


org.apache.flink
flink-clients_2.11
1.14.0


org.apache.flink
flink-connector-kafka_2.11
1.14.0


org.apache.flink
flink-statebackend-rocksdb_2.11
1.14.0



Would really appreciate some help here. Thanks in advance!


Regards,

Morgan.






Confusion about rebalance bytes sent metric in Flink UI

2021-12-10 Thread tao xiao
Hi team,

I have one operator that is connected to another 9 downstream operators
using rebalance. Each operator has 150 parallelisms[1]. I assume each
message in the upstream operation is sent to one of the parallel instances
of the 9 receiving operators so the total bytes sent should be roughly 9
times of bytes received in the upstream operator metric. However the Flink
UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
bytes received[2]. This looks to me like every message is duplicated to
each parallel instance of all receiving operators like what broadcast
does.  Is this correct?



[1] https://imgur.com/cGyb0QO
[2] https://imgur.com/SFqPiJA
-- 
Regards,
Tao


Advise on Apache Log4j Zero Day (CVE-2021-44228)

2021-12-10 Thread Konstantin Knauf
Dear Flink Community,

Yesterday, a new Zero Day for Apache Log4j was reported [1]. It is now
tracked under CVE-2021-44228 [2].

Apache Flink bundles a version of Log4j that is affected by this
vulnerability. We recommend users to follow the advisory [3] of the Apache
Log4j Community. For Apache Flink this currently translates to “setting
system property log4j2.formatMsgNoLookups to true” until Log4j has been
upgraded to 2.15.0 in Apache Flink.

This effort is tracked in FLINK-25240 [4]. It will be included in Flink
1.15.0, Flink 1.14.1 and Flink 1.13.3. We expect Flink 1.14.1 to be
released in the next 1-2 weeks. The other releases will follow in their
regular cadence.

This advice has also been published on the Apache Flink blog
https://flink.apache.org/2021/12/10/log4j-cve.html.

Best,

Konstantin

[1]
https://www.cyberkendra.com/2021/12/apache-log4j-vulnerability-details-and.html
[2] https://nvd.nist.gov/vuln/detail/CVE-2021-44228
[3] https://logging.apache.org/log4j/2.x/security.html
[4] https://issues.apache.org/jira/browse/FLINK-25240

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


WindowOperator TestHarness

2021-12-10 Thread Lars Skjærven
Hello,

We're trying to write a test for an implementation of *AggregateFunction*
following a *EventTimeSessionWindows.withGap*. We gave it a try using
*WindowOperator*() which we hoped could be used as an argument to
*KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're
hoping someone has a tip or two. Specifically, we can't find the right
*InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator*
is our implementation of the
*AggregateFunction. *

Does anyone have a template, or guide, to test a windowed aggregate
function?

Kind regards,
Lars


val myWindowOperator = new WindowOperator(
  EventTimeSessionWindows.withGap(Time.seconds(10)),
  new TimeWindow.Serializer(),
  new KeySelector[MyInputType, (String, String)] {
override def getKey(value: MyInputType): (String, String) = {
  (value.a, value.b)
}
  },
  Types.TUPLE(Types.STRING).createSerializer(
new ExecutionConfig
  ),
  new AggregatingStateDescriptor[MyInputType, MyAggregateState,
MyOutputType](
"test", new MyAggregator, classOf[MyAggregateState],
  ),
  ???,
  EventTimeTrigger.create(),
  0,
  null
)

testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
String), MyInputType, MyOutputType](
  myWindowOperator,
  new KeySelector[MyInputType, (String, String)] {
override def getKey(value: MyInputType): (String, String) = {
  (value.a, value.b)
}
  },
  createTuple2TypeInformation(Types.STRING, Types.STRING)
)


Re: FileSource with Parquet Format - parallelism level

2021-12-10 Thread Arvid Heise
Yes, Parquet files can be read in splits (=in parallel). Which enumerator
is used is determined here [1].

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L170-L170

On Fri, Dec 10, 2021 at 11:44 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi Roman,
> Thank you.
>
> I'm familiar with FLIP-27 and I was analyzing the new File Source.
>
> From there I saw that there are two FileEnumerators -> one that allows for
> file split and other that does not. BlockSplittingRecursiveEnumerator
> and NonSplittingRecursiveEnumerator.
> I was wondering if  BlockSplittingRecursiveEnumerator can be used for
> Parquet file.
>
> Actually does Parquet format supports reading file in blocks by different
> threads. Do those blocks have to be "merged" later or can I just read them
> row by row.
>
> Regards,
> Krzysztof Chmielewski
>
> pt., 10 gru 2021 o 09:27 Roman Khachatryan  napisał(a):
>
>> Hi,
>>
>> Yes, file source does support DoP > 1.
>> And in general, a single file can be read in parallel after FLIP-27.
>> However, parallel reading of a single Parquet file is currently not
>> supported AFAIK.
>>
>> Maybe Arvid or Fabian could shed more light here.
>>
>> Regards,
>> Roman
>>
>> On Thu, Dec 9, 2021 at 12:03 PM Krzysztof Chmielewski
>>  wrote:
>> >
>> > Hi,
>> > can I have a File DataStream Source that will work with Parquet Format
>> and have parallelism level higher than one?
>> >
>> > Is it possible to read  Parquet  file in chunks by multiple threads?
>> >
>> > Regards,
>> > Krzysztof Chmielewski
>>
>


Re: Regarding the size of Flink cluster

2021-12-10 Thread Timo Walther

Hi Jessy,

let me try to answer some of your questions.

> 16 Task Managers with 1 task slot and 1 CPU each

Every additional task manager also involves management overhead. So I 
would suggest option 1. But in the end you need to perform some 
benchmarks yourself. I could also imagine that a mixture could be 
beneficial depending on the isolation level of your pipelines.


> there will be a single copy in the HEAP

From the JavaDocs of BroadcastState:

"Each operator instance individually maintains and stores elements in 
the broadcast state."


I would assume that the HEAP contains n copies where n is the parallelism.

> I can process only n events/seconds(if the latency of the pipeline is 
1s.)


Latency is not necessarily thoughput. This depends on the pipeline. For 
example, if the pipeline contains only map functions without any keyBy. 
The operators are "chained" together. You can also influence the 
chaining [1] for better resource utilization. If you pipeline contains 
I/O to external systems, you can use async IO to increase the 
throughput. [2]


I would also recommend this section here:

https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources

Maybe you can elaborate on the operations in your pipeline.

> Can Flink process multiple events from the same key at the same time?

No, every key is routed to and executed by the same thread. However, you 
create an artificial key to spread the load more evenly.


> any blogs regarding the results of Flink's load testing

I would also recommend the FlinkForward YouTube channel. A lot of users 
stories including actual numbers and configurations are shown there.


Regards,
Timo


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/



On 10.12.21 11:46, Jessy Ping wrote:

Hi All,


I have the following questions regarding the sizing of the Flink cluster 
doing stateful computation using Datastream API. It will be better if 
the community can answer the below questions or doubts.




Suppose we have a pipeline as follows,


*Kafka real time events source1 & Kafka rules source 2 -> 
KeyedBroadcastProcessFunction -> Kafka Sink*



As you can see, we will be processing the real-time events from the 
Kafka source using the rules broadcasted from the rule source with the 
help of keyed broadcast function.



_Questions_


  * I have a machine with 16 CPUs and 32 GB Ram. Which configuration is
efficient for achieving the target parallelism of 16?

 1. A single task manager with 16 task slots
 2. 16 Task Managers with 1 task slot and 1 CPU each.



  * If I have a broadcast state in my pipeline and I have a single task
manager with 16 task slots for achieving the target parallelism of
16. Does Flink keep 16 copies of broadcast state in the single task
manager or there will be a single copy in the HEAP for the entire
task slots?


  * If a parallelism of n means, I can process only n events/seconds(if
the latency of the pipeline is 1s.). How many requests a single task
slot (containing a single task) can execute at a time ? 



  * Can Flink process multiple events from the same key at the same time?


  * I have found the following blog regarding the Flink cluster size,

https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

.
Do we have some other blogs, testimonials, or books regarding the
sample production setup/configuration of a Flink cluster for
achieving different ranges of throughput ? 



  * Are there any blogs regarding the results of Flink's load testing
results ?


Thanks

Jessy





flinksql????????

2021-12-10 Thread ??????
?? 
  
flinksqlA_now:AA_now??A??
 
 
//sql  

StatementSet stmtSet = tenv.createStatementSet () ; 
stmtSet.addInsertSql ( insertSqlMongoDB ) ; 
stmtSet.addInsertSql ( insertSql ) ; 
stmtSet.execute () ; 
// 
/**  ?? */ 
 { 
MongoUtil2  = MongoUtil2??  () ; 
MongoCollection < Document  oldData = instance.getCollection ( db, 
"t_up_tag_data_" +mongoKey ) ; 
MongoCollection

Regarding the size of Flink cluster

2021-12-10 Thread Jessy Ping
Hi All,


I have the following questions regarding the sizing of the Flink cluster
doing stateful computation using Datastream API. It will be better if the
community can answer the below questions or doubts.



Suppose we have a pipeline as follows,


*Kafka real time events source1 & Kafka rules source 2 ->
KeyedBroadcastProcessFunction -> Kafka Sink*


As you can see, we will be processing the real-time events from the Kafka
source using the rules broadcasted from the rule source with the help of
keyed broadcast function.


*Questions*



   - I have a machine with 16 CPUs and 32 GB Ram. Which configuration is
   efficient for achieving the target parallelism of 16?


   1. A single task manager with 16 task slots
   2. 16 Task Managers with 1 task slot and 1 CPU each.




   - If I have a broadcast state in my pipeline and I have a single task
   manager with 16 task slots for achieving the target parallelism of 16. Does
   Flink keep 16 copies of broadcast state in the single task manager or there
   will be a single copy in the HEAP for the entire task slots?



   - If a parallelism of n means, I can process only n events/seconds(if
   the latency of the pipeline is 1s.). How many requests a single task
   slot (containing a single task) can execute at a time ?



   - Can Flink process multiple events from the same key at the same time?



   - I have found the following blog regarding the Flink cluster size,
   
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines.
   Do we have some other blogs, testimonials, or books regarding the sample
   production setup/configuration of a Flink cluster for achieving
   different ranges of throughput ?



   - Are there any blogs regarding the results of Flink's load testing
   results ?


Thanks

Jessy


Re: FileSource with Parquet Format - parallelism level

2021-12-10 Thread Krzysztof Chmielewski
Hi Roman,
Thank you.

I'm familiar with FLIP-27 and I was analyzing the new File Source.

>From there I saw that there are two FileEnumerators -> one that allows for
file split and other that does not. BlockSplittingRecursiveEnumerator
and NonSplittingRecursiveEnumerator.
I was wondering if  BlockSplittingRecursiveEnumerator can be used for
Parquet file.

Actually does Parquet format supports reading file in blocks by different
threads. Do those blocks have to be "merged" later or can I just read them
row by row.

Regards,
Krzysztof Chmielewski

pt., 10 gru 2021 o 09:27 Roman Khachatryan  napisał(a):

> Hi,
>
> Yes, file source does support DoP > 1.
> And in general, a single file can be read in parallel after FLIP-27.
> However, parallel reading of a single Parquet file is currently not
> supported AFAIK.
>
> Maybe Arvid or Fabian could shed more light here.
>
> Regards,
> Roman
>
> On Thu, Dec 9, 2021 at 12:03 PM Krzysztof Chmielewski
>  wrote:
> >
> > Hi,
> > can I have a File DataStream Source that will work with Parquet Format
> and have parallelism level higher than one?
> >
> > Is it possible to read  Parquet  file in chunks by multiple threads?
> >
> > Regards,
> > Krzysztof Chmielewski
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread 刘建刚
Glad to see the suggestion. In our test, we found that small jobs with the
changing configs can not improve the performance much just as your test. I
have some suggestions:

   - The config can affect the memory usage. Will the related memory
   configs be changed?
   - Can you share the tpcds results for different configs? Although we
   change the default values, it is helpful to change them for different
   users. In this case, the experience can help a lot.

Best,
Liu Jiangang

Yun Gao  于2021年12月10日周五 17:20写道:

> Hi Yingjie,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> May I have a double confirmation for
> taskmanager.network.sort-shuffle.min-parallelism that
> since other frameworks like Spark have used sort-based shuffle for all the
> cases, does our
> current circumstance still have difference with them?
>
> Best,
> Yun
>
>
>
>
> --
> From:Yingjie Cao 
> Send Time:2021 Dec. 10 (Fri.) 16:17
> To:dev ; user ; user-zh <
> user-zh@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Hi dev & users:
>
> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>
> Best,
> Yingjie
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
> Yingjie Cao  于2021年12月3日周五 17:02写道:
>
> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10 result
> partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread 刘建刚
Glad to see the suggestion. In our test, we found that small jobs with the
changing configs can not improve the performance much just as your test. I
have some suggestions:

   - The config can affect the memory usage. Will the related memory
   configs be changed?
   - Can you share the tpcds results for different configs? Although we
   change the default values, it is helpful to change them for different
   users. In this case, the experience can help a lot.

Best,
Liu Jiangang

Yun Gao  于2021年12月10日周五 17:20写道:

> Hi Yingjie,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> May I have a double confirmation for
> taskmanager.network.sort-shuffle.min-parallelism that
> since other frameworks like Spark have used sort-based shuffle for all the
> cases, does our
> current circumstance still have difference with them?
>
> Best,
> Yun
>
>
>
>
> --
> From:Yingjie Cao 
> Send Time:2021 Dec. 10 (Fri.) 16:17
> To:dev ; user ; user-zh <
> user...@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Hi dev & users:
>
> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>
> Best,
> Yingjie
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
> Yingjie Cao  于2021年12月3日周五 17:02写道:
>
> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10 result
> partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>
>


PyFlink accumulate streaming data

2021-12-10 Thread Королькевич Михаил
Hello flink team! How to properly accumulate streaming data into the avro file partition by the hour. My current implementation data from the data stream is converted to a table and it is saved in an avro file.Similar to this:     t_env.execute_sql("""            CREATE TABLE mySink (              id STRING,              name STRING,              data_ranges ARRAY>,              meta ARRAY>,              current_hour INT                          ) partitioned by(current_hour) WITH (              'connector' = 'filesystem',              'format' = 'avro',              'path' = '/opt/pyflink-walkthrough/output/table',              'sink.rolling-policy.rollover-interval' = '1 hour',              'partition.time-extractor.timestamp-pattern'='$current_hour',              'sink.partition-commit.delay'='1 hour',              'sink.partition-commit.trigger'='process-time',              'sink.partition-commit.policy.kind'='success-file'                                      )        """)Maybe it can be done better? (I'm not sure if this works properly at all)

Re: broadcast() without arguments

2021-12-10 Thread Roman Khachatryan
Hello,

The broadcast() without arguments can be used the same way as a
regular data stream, i.e.  regular transformations can be applied to
it. The difference is that every element will be sent to all
downstream subtasks and not just one.

The difference with broadcast() with arguments is that the latter only
allows to connect the broadcasted stream with another one, for example
to match the elements from two streams against each other.

Regards,
Roman

On Fri, Dec 10, 2021 at 6:37 AM Alexey Trenikhun  wrote:
>
> Hello,
> How broadcast() method without arguments should be used ?
>
> Thanks,
> Alexey


Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-10 Thread Chesnay Schepler
I would recommend to modify your log4j configurations to set 
log4j2.formatMsgNoLookups to true/./

/
/
As far as I can tell this is equivalent to upgrading log4j, which just 
disabled this lookup by default.

/
/
On 10/12/2021 10:21, Richard Deurwaarder wrote:

Hello,

There has been a log4j2 vulnerability made public 
https://www.randori.com/blog/cve-2021-44228/ which is making some waves :)
This post even explicitly mentions Apache Flink: 
https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/


And fortunately, I saw this was already on your radar: 
https://issues.apache.org/jira/browse/FLINK-25240


What would the advice be for flink users? Do you expect to push a 
minor to fix this? Or is it advisable to upgrade to the latest log4j2 
version manually for now?


Thanks for any advice!




CVE-2021-44228 - Log4j2 vulnerability

2021-12-10 Thread Richard Deurwaarder
Hello,

There has been a log4j2 vulnerability made public
https://www.randori.com/blog/cve-2021-44228/ which is making some waves :)
This post even explicitly mentions Apache Flink:
https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/

And fortunately, I saw this was already on your radar:
https://issues.apache.org/jira/browse/FLINK-25240

What would the advice be for flink users? Do you expect to push a minor to
fix this? Or is it advisable to upgrade to the latest log4j2 version
manually for now?

Thanks for any advice!


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yun Gao
Hi Yingjie,

Very thanks for drafting the FLIP and initiating the discussion! 

May I have a double confirmation for 
taskmanager.network.sort-shuffle.min-parallelism that
since other frameworks like Spark have used sort-based shuffle for all the 
cases, does our
current circumstance still have difference with them? 

Best,
Yun




--
From:Yingjie Cao 
Send Time:2021 Dec. 10 (Fri.) 16:17
To:dev ; user ; user-zh 

Subject:Re: [DISCUSS] Change some default config values of blocking shuffle

Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
Yingjie Cao  于2021年12月3日周五 17:02写道:

Hi dev & users,

We propose to change some default values of blocking shuffle to improve the 
user out-of-box experience (not influence streaming). The default values we 
want to change are as follows:

1. Data compression (taskmanager.network.blocking-shuffle.compression.enabled): 
Currently, the default value is 'false'.  Usually, data compression can reduce 
both disk and network IO which is good for performance. At the same time, it 
can save storage space. We propose to change the default value to true.

2. Default shuffle implementation 
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default 
value is 'Integer.MAX', which means by default, Flink jobs will always use 
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for both 
stability and performance. So we propose to reduce the default value to a 
proper smaller one, for example, 128. (We tested 128, 256, 512 and 1024 with a 
tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle 
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the 
default value is '32M'. Previously, when choosing the default value, both ‘32M' 
and '64M' are OK for tests and we chose the smaller one in a cautious way. 
However, recently, it is reported in the mailing list that the default value is 
not enough which caused a buffer request timeout issue. We already created a 
ticket to improve the behavior. At the same time, we propose to increase this 
default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle 
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default value is 
'64' which means '64' network buffers (32k per buffer by default). This default 
value is quite modest and the performance can be influenced. We propose to 
increase this value to a larger one, for example, 512 (the default TM and 
network buffer configuration can serve more than 10 result partitions 
concurrently).

We already tested these default values together with tpc-ds benchmark in a 
cluster and both the performance and stability improved a lot. These changes 
can help to improve the out-of-box experience of blocking shuffle. What do you 
think about these changes? Is there any concern? If there are no objections, I 
will make these changes soon.

Best,
Yingjie



Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yun Gao
Hi Yingjie,

Very thanks for drafting the FLIP and initiating the discussion! 

May I have a double confirmation for 
taskmanager.network.sort-shuffle.min-parallelism that
since other frameworks like Spark have used sort-based shuffle for all the 
cases, does our
current circumstance still have difference with them? 

Best,
Yun




--
From:Yingjie Cao 
Send Time:2021 Dec. 10 (Fri.) 16:17
To:dev ; user ; user-zh 

Subject:Re: [DISCUSS] Change some default config values of blocking shuffle

Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
Yingjie Cao  于2021年12月3日周五 17:02写道:

Hi dev & users,

We propose to change some default values of blocking shuffle to improve the 
user out-of-box experience (not influence streaming). The default values we 
want to change are as follows:

1. Data compression (taskmanager.network.blocking-shuffle.compression.enabled): 
Currently, the default value is 'false'.  Usually, data compression can reduce 
both disk and network IO which is good for performance. At the same time, it 
can save storage space. We propose to change the default value to true.

2. Default shuffle implementation 
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default 
value is 'Integer.MAX', which means by default, Flink jobs will always use 
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for both 
stability and performance. So we propose to reduce the default value to a 
proper smaller one, for example, 128. (We tested 128, 256, 512 and 1024 with a 
tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle 
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the 
default value is '32M'. Previously, when choosing the default value, both ‘32M' 
and '64M' are OK for tests and we chose the smaller one in a cautious way. 
However, recently, it is reported in the mailing list that the default value is 
not enough which caused a buffer request timeout issue. We already created a 
ticket to improve the behavior. At the same time, we propose to increase this 
default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle 
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default value is 
'64' which means '64' network buffers (32k per buffer by default). This default 
value is quite modest and the performance can be influenced. We propose to 
increase this value to a larger one, for example, 512 (the default TM and 
network buffer configuration can serve more than 10 result partitions 
concurrently).

We already tested these default values together with tpc-ds benchmark in a 
cluster and both the performance and stability improved a lot. These changes 
can help to improve the out-of-box experience of blocking shuffle. What do you 
think about these changes? Is there any concern? If there are no objections, I 
will make these changes soon.

Best,
Yingjie



Re: Hybrid Source with Parquet Files from GCS + KafkaSource

2021-12-10 Thread Roman Khachatryan
Hi,

Have you tried constructing a Hybrid source from a File source created
with FileSource.forBulkFileFormat [1] and "gs://bucket" scheme [2]
directly?

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/FileSource.html#forBulkFileFormat-org.apache.flink.connector.file.src.reader.BulkFormat-org.apache.flink.core.fs.Path...-
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/

Regards,
Roman

On Thu, Dec 9, 2021 at 1:04 PM Meghajit Mazumdar
 wrote:
>
> Hello,
>
> We have a requirement as follows:
>
> We want to stream events from 2 sources: Parquet files stored in a GCS 
> Bucket, and a Kafka topic.
> With the release of Hybrid Source in Flink 1.14, we were able to construct a 
> Hybrid Source which produces events from two sources: a FileSource which 
> reads data from a locally saved Parquet File, and a KafkaSource consuming 
> events from a remote Kafka broker.
>
> I was wondering if instead of using a local Parquet file, whether it is 
> possible to directly stream the file from a GCS bucket and construct a File 
> Source out of it at runtime ? The Parquet Files are quite big and it's a bit 
> expensive to download.
>
> Does Flink have such a functionality ? Or, has anyone come across such a use 
> case previously ? Would greatly appreciate some help on this.
>
> Looking forward to hearing from you.
>
> Thanks,
> Megh


Re: FileSource with Parquet Format - parallelism level

2021-12-10 Thread Roman Khachatryan
Hi,

Yes, file source does support DoP > 1.
And in general, a single file can be read in parallel after FLIP-27.
However, parallel reading of a single Parquet file is currently not
supported AFAIK.

Maybe Arvid or Fabian could shed more light here.

Regards,
Roman

On Thu, Dec 9, 2021 at 12:03 PM Krzysztof Chmielewski
 wrote:
>
> Hi,
> can I have a File DataStream Source that will work with Parquet Format and 
> have parallelism level higher than one?
>
> Is it possible to read  Parquet  file in chunks by multiple threads?
>
> Regards,
> Krzysztof Chmielewski


Re: stateSerializer(1.14.0) not compatible with previous stateSerializer(1.13.1)

2021-12-10 Thread Roman Khachatryan
Hi,

Compatibility might depend on specific serializers,
could you please share which serializers you use to access the state?

Regards,
Roman

On Fri, Dec 10, 2021 at 3:41 AM 李诗君  wrote:
>
> I am trying to upgrade my flink cluster version from 1.13.1 to 1.14.0 , I did 
> like below steps:
>
> 1. savepoint running tasks in version1.13.1
> 2. stop tasks and upgrade cluster version to 1.14.0
> 3. recover tasks with savepoints
>
> and this happened:
>
>
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)
>  ~[flink-table_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[flink-fsp-connector-rksc-1.0-SNAPSHOT.jar:?]
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e) 
> must not be incompatible with the old state serializer 
> (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e).
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> ... 15 more


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yingjie Cao
Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability

Yingjie Cao  于2021年12月3日周五 17:02写道:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yingjie Cao
Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability

Yingjie Cao  于2021年12月3日周五 17:02写道:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>