回复: Elasticsearch sink connector timeout

2021-06-06 Thread Jacky Yin 殷传旺
In flink-es connector 6.*, you can set the socket timeout by implementing a 
customized RestClientFactory。 Here is the code snippet.

@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) 
{
restClientBuilder
.setRequestConfigCallback(new 
ElasticSearchRequestConfigCallback())
}

class ElasticSearchRequestConfigCallback implements 
RestClientBuilder.RequestConfigCallback {
@Override
public RequestConfig.Builder 
customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setSocketTimeout(requestTimeout);
}
}


The default socket timeout is 30 seconds which should be ok for most cases. So 
if you find that the es load is normal, but the response is slow or the number 
of rejected request is high, probably you should check
1) if the concurrent count of es bulk requests is too much?
2) if the bulk size is too big?
3) if too many indexes are included in one bulk request?


BR,
Jacky

发件人: Yangze Guo 
发送时间: 2021年6月7日 11:41
收件人: Kai Fu 
抄送: user 
主题: Re: Elasticsearch sink connector timeout

Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu  wrote:
>
> With some investigation in the task manager's log, the exception was raised 
> from RetryRejectedExecutionFailureHandler path, the related logs are showing 
> below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
>  [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 5981 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 5983 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 

Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-06 Thread JasonLee
hi

那你只需要设置从 latest-offset 开始消费,并且禁用 checkpoint 就行了,至于重启的次数,可以通过 metrics 中的
numRestarts 去获取.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn日志清理

2021-06-06 Thread zjfpla...@hotmail.com
大家好,
请问下如下问题:
flink on yarn模式,日志清理机制有没有的?
是不是也是按照log4j/logback/log4j2等的清理机制来的?还是yarn上配置的。
是实时流作业,非离线一次性作业,一直跑着的



zjfpla...@hotmail.com


Re: Elasticsearch sink connector timeout

2021-06-06 Thread Yangze Guo
Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu  wrote:
>
> With some investigation in the task manager's log, the exception was raised 
> from RetryRejectedExecutionFailureHandler path, the related logs are showing 
> below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
>  [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 5981 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 5983 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 6002 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 6004 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6006 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  

Re: Question about State TTL and Interval Join

2021-06-06 Thread Yun Tang
Hi Chris,

Interval Join should clean state which is not joined during interval and you 
don't need to set state TTL. (Actually, the states used in interval join are 
not exposed out and you cannot set TTL for those state as TTL is only public 
for user self-described states.)

The checkpoint size continues to increase does not mean your actual state also 
increases. RocksDB actually write a deleter when remove element and those 
useless data would be cleared physically after compaction. You could judge 
whether state really grows up by using non-incremental checkpoints to see how 
much state size will be.

Moreover, the OOM should not be related to RocksDB as it used off-heap native 
memory, and you might need some work to dig what occupied the JVM memory during 
checkpoints.

Best
Yun Tang

From: McBride, Chris 
Sent: Saturday, June 5, 2021 3:17
To: user@flink.apache.org 
Subject: Question about State TTL and Interval Join


We currently have a flink 1.8 application deployed on Kinesis Data Analytics 
using the RocksDB State backend. Our application is joining across 3 different 
kinesis streams using an interval join. We noticed that our checkpoint sizes 
continue to increase over time, we eventually have OOM failures writing 
checkpoints and need to restart the application without restoring from a 
savepoint.



Does this kind of application require a state TTL on the join operator? I 
assumed since it was an interval join, events that fell outside of the lower 
timebound would automatically be expired from the state. Is that a correct 
assumption?



Thanks,

Chris




Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-06 Thread Chirag Dewan
 Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and 
works fine otherwise.
We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.

atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)

at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)

atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)

at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)

atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)

Any leads would be appreciated. Thanks
Chirag



  

Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-06 Thread Yun Tang
hi,

本质上来说,你的做法有点hack其实不推荐,如果非要这么做的话,你还可以通过 numRestarts [1] 的指标来看重启了多少次。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#availability

祝好
唐云

From: yidan zhao 
Sent: Friday, June 4, 2021 11:52
To: user-zh 
Subject: Re: 关于flink sql的kafka source的开始消费offset相关问题。

本质需求是我一个转发任务,本身检查点失败以及任务失败一般都是压力高,所以我希望重启是忽略堆积的数据从最新数据开始消费。我希望任务失败了就自动重启从最新开始继续转发。

yidan zhao  于2021年6月4日周五 上午11:51写道:
>
> 那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web
> UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。
>
> JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道:
> >
> > hi
> >
> > sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink app performance test framework

2021-06-06 Thread Yangze Guo
Hi, Luck,

I may not fully understand your requirements. If you just want to test
the performance of typical streaming jobs with the Flink, you can
refer to the nexmark[1]. If you just care about the performance
regression of your specific production jobs, I don't know there is
such a framework.

[1] https://github.com/nexmark/nexmark


Best,
Yangze Guo

On Sun, Jun 6, 2021 at 7:35 AM luck li  wrote:
>
> Hi flink community,
>
> Is there any test framework that we can use to test flink jobs performance?
> We would like to automate process for regression tests during flink version 
> upgrade and job performance tests when rolling out new changes to prod.
>
> Any suggestions would be appreciated!
>
> Thank you
> Best regards
> Luck


Re: Add control mode for flink

2021-06-06 Thread 刘建刚
Thank you for the reply. I have checked the post you mentioned. The dynamic
config may be useful sometimes. But it is hard to keep data consistent in
flink, for example, what if the dynamic config will take effect when
failover. Since dynamic config is a desire for users, maybe flink can
support it in some way.

For the control mode, dynamic config is just one of the control modes. In
the google doc, I have list some other cases. For example, control events
are generated in operators or external services. Besides user's dynamic
config, flink system can support some common dynamic configuration, like
qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that,
other control features can be added easily later, like changing log level
when job is running. In the end, flink will not just process data, but also
interact with users to receive control events like a service.

Steven Wu  于2021年6月4日周五 下午11:11写道:

> I am not sure if we should solve this problem in Flink. This is more like
> a dynamic config problem that probably should be solved by some
> configuration framework. Here is one post from google search:
> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>
> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:
>
>> Hi everyone,
>>
>>   Flink jobs are always long-running. When the job is running, users
>> may want to control the job but not stop it. The control reasons can be
>> different as following:
>>
>>1.
>>
>>Change data processing’ logic, such as filter condition.
>>2.
>>
>>Send trigger events to make the progress forward.
>>3.
>>
>>Define some tools to degrade the job, such as limit input qps,
>>sampling data.
>>4.
>>
>>Change log level to debug current problem.
>>
>>   The common way to do this is to stop the job, do modifications and
>> start the job. It may take a long time to recover. In some situations,
>> stopping jobs is intolerable, for example, the job is related to money or
>> important activities.So we need some technologies to control the running
>> job without stopping the job.
>>
>>
>> We propose to add control mode for flink. A control mode based on the
>> restful interface is first introduced. It works by these steps:
>>
>>
>>1. The user can predefine some logic which supports config control,
>>such as filter condition.
>>2. Run the job.
>>3. If the user wants to change the job's running logic, just send a
>>restful request with the responding config.
>>
>> Other control modes will also be considered in the future. More
>> introduction can refer to the doc
>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>> . If the community likes the proposal, more discussion is needed and a more
>> detailed design will be given later. Any suggestions and ideas are welcome.
>>
>>


回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-06 Thread smq
你好,正常情况下flinkonyarn的container日志中应该有.err.out.log这三个日志,你贴的这个-slog.file应该就是jobmanager.log的位置,但是我这个程序是没有.log日志这个文件的,只有err和out,我试着在程序containner里查看日志,发现能正常显示日志的container中有以上三种日志,不能正常显示日志的程序中,只有两个日志,没有.log日志文件。所以也看不到运行时的一些info信息。我觉得这个是yarn创建的日志文件,目前还没找到原因。我尝试过运行两个一模一样的程序,打成两个jar包,但是其中一个正常,另一个不正常,这种情况不影响程序运行,只是缺一些日志。





-- 原始邮件 --
发件人: r pp http://apache-flink.147419.n8.nabble.com/
 
 
 
  --
  Best,
  nbsp; pp



 --
 Best,
 pp



-- 
Best,
 pp

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Jingsong Li
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao  wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>current hash-based blocking shuffle implementation writes too many files
>>concurrently which gives high pressure to the file system, for example,
>>maintenance of too many file metas, exhaustion of inodes or file
>>descriptors. All of these can be potential stability issues. Sort-Merge
>>based blocking shuffle don’t have the problem because for one result
>>partition, only one file is written at the same time.
>>2. *Performance*: Large amounts of small shuffle files and random IO
>>can influence shuffle performance a lot especially for hdd (for ssd,
>>sequential read is also important because of read ahead and cache). For
>>batch jobs processing massive data, small amount of data per subpartition
>>is common because of high parallelism. Besides, data skew is another cause
>>of small subpartition files. By merging data of all subpartitions together
>>in one file, more sequential read can be achieved.
>>3. *Resource*: For current hash-based implementation, each
>>subpartition needs at least one buffer. For large scale batch shuffles, 
>> the
>>memory consumption can be huge. For example, we need at least 320M network
>>memory per result partition if parallelism is set to 1 and because of
>>the huge network consumption, it is hard to config the network memory for
>>large scale batch job and  sometimes parallelism can not be increased just
>>because of insufficient network memory  which leads to bad user 
>> experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee


flink sql cdc作数据同步作业数太多

2021-06-06 Thread casel.chen
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql 
cdc基于库级别同步?这样作业数量会少很多。

Re: Flink checkpoint 速度很慢 问题排查

2021-06-06 Thread yidan zhao
可以的,本身异步操作的本质就是线程池。 至于是你自己提供线程池,去执行某个同步操作。还是直接使用client/sdk等封装的异步方法内部默认的线程池这个无所谓。

Jacob <17691150...@163.com> 于2021年6月5日周六 下午1:15写道:
>
> thanks,
>
> 我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis
>
> 我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
> 
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Yingjie Cao
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details.  FYI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing

Best,
Yingjie

Yingjie Cao  于2020年10月15日周四 上午11:02写道:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>1. *Stability*: For high parallelism (tens of thousands) batch job,
>current hash-based blocking shuffle implementation writes too many files
>concurrently which gives high pressure to the file system, for example,
>maintenance of too many file metas, exhaustion of inodes or file
>descriptors. All of these can be potential stability issues. Sort-Merge
>based blocking shuffle don’t have the problem because for one result
>partition, only one file is written at the same time.
>2. *Performance*: Large amounts of small shuffle files and random IO
>can influence shuffle performance a lot especially for hdd (for ssd,
>sequential read is also important because of read ahead and cache). For
>batch jobs processing massive data, small amount of data per subpartition
>is common because of high parallelism. Besides, data skew is another cause
>of small subpartition files. By merging data of all subpartitions together
>in one file, more sequential read can be achieved.
>3. *Resource*: For current hash-based implementation, each
>subpartition needs at least one buffer. For large scale batch shuffles, the
>memory consumption can be huge. For example, we need at least 320M network
>memory per result partition if parallelism is set to 1 and because of
>the huge network consumption, it is hard to config the network memory for
>large scale batch job and  sometimes parallelism can not be increased just
>because of insufficient network memory  which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>


ubsubscribe

2021-06-06 Thread Zhipeng Zhang
-- 
best,
Zhipeng


Re: Re: Is it possible to use OperatorState, when NOT implementing a source or sink function?

2021-06-06 Thread Yun Gao
Hi Marco,

It seems to me that the imbalance problem and the state is independent for this 
issue: the data distribution
is only decided by the KeySelector used. The only limitation for state is that 
the keyed state is bind to the 
KeySelector used across the tasks.  If the imbalance is the root problem, have 
you checked
how many keys in total does the job have ? 

Best,
Yun



 --Original Mail --
Sender:Marco Villalobos 
Send Date:Sat Jun 5 23:26:09 2021
Recipients:JING ZHANG 
CC:user 
Subject:Re: Is it possible to use OperatorState, when NOT implementing a source 
or sink function?

Ohthat won't work for me either.  I needed to use MapState.  

Perhaps I should describe my problem.  I am using a KeyedState process 
function, but the workload that it is processing is not distributing well 
across the cluster. I have four task managers, but the way my data is keyed in 
this operator, it only goes to one task manager node. 

I need state, but I don't really need it keyed.
On Sat, Jun 5, 2021 at 4:56 AM Marco Villalobos  
wrote:

Does that work in the DataStream API in Batch Execution Mode?
On Sat, Jun 5, 2021 at 12:04 AM JING ZHANG  wrote:

Hi,please use `CheckpointedFunction`, you could initialize your operator state 
in `initializeState` method by using context.getOperatorStateStore().***

Best regards,
JING ZHANG


Marco Villalobos  于2021年6月5日周六 下午1:55写道:

Is it possible to use OperatorState, when NOT implementing a source or sink 
function?

If yes, then how?

Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-06 Thread Yun Gao
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as 
expected to me... 
Could you also show us the dag of the job ? And does some operators in the 
source task
use multiple-threads to emit records?

Best,
Yun



 --Original Mail --
Sender:Thomas Wang 
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Failed to cancel a job using the STOP rest API

One thing I noticed is that if I set drain = true, the job could be stopped 
correctly. Maybe that's because I'm using a Parquet file sink which is a 
bulk-encoded format and only writes to disk during checkpoints?

Thomas
On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:

Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not 
quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
at 
org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
... 9 more
Caused by: java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
... 21 more
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
at 

Is it possible to customize avro schema name when using SQL

2021-06-06 Thread tao xiao
Hi team,

I want to use avro-confluent to encode the data using SQL but the schema
registered by the encoder hard code the schema name to 'record'. is it
possible to dictate the name?

-- 
Regards,
Tao