Flink SQL + savepoint

2019-10-30 Thread Fanbin Bu
Hi,

it is highly recommended that we assign the uid to the operator for the
sake of savepoint. How do we do this for Flink SQL? According to
https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
it is not possible.

Does that mean, I can't use savepoint to restart my program if I use Flink
SQL?

Thanks,

Fanbin


Preserving (best effort) messages order between operators

2019-10-30 Thread Averell
Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 如何过滤异常的timestamp?

2019-10-30 Thread 邢瑞斌
Hi 唐云,

谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗?


Yun Tang  于2019年10月31日周四 上午2:26写道:

> Hi 瑞斌
>
> 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter
> operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。
>
> 祝好
> 唐云
> 
> From: 邢瑞斌 
> Sent: Wednesday, October 30, 2019 17:57
> To: user-zh@flink.apache.org 
> Subject: 如何过滤异常的timestamp?
>
> Hi:
>
>
> 从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢?
>
> 我现在的想法是:
>
> 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是
> ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢?
>
> 求教,谢谢大家!
>


Re: Flink S3 error

2019-10-30 Thread vino yang
Hi Harrison,

So did you check whether the file exists or not? And what's your question?

Best,
Vino

Harrison Xu  于2019年10月31日周四 上午5:24写道:

> I'm seeing this exception with the S3 uploader - it claims a previously
> part file was not found. Full jobmanager logs attached. (Flink 1.8)
>
> java.io.FileNotFoundException: No such file or directory: 
> s3a://qcache/tmp/kafka/meta/rq_features/dt=2019-10-30T15/partition_1/_part-4-1169_tmp_21400e5e-3921-4f33-a980-ac953b50b4b7
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>   at 
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:98)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverRecoverableUpload(S3RecoverableMultipartUploadFactory.java:75)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:95)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:50)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Bucket.restoreInProgressFile(Bucket.java:146)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Bucket.(Bucket.java:133)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Bucket.restore(Bucket.java:404)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Buckets.initializeState(Buckets.java:154)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:344)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
>
>


???????????? state ????????????????

2019-10-30 Thread Jun Zhang



??


https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
 ??2019??10??31?? 
10:16??wangl...@geekplus.com.cn

Flink State 过期清除 TTL 问题

2019-10-30 Thread wangl...@geekplus.com.cn
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn


怎样把 state 定时写到外部存储

2019-10-30 Thread wangl...@geekplus.com.cn

消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
有没有什么方式可以定期读 state 写到外部存储? 
我现在用的是 Flink1.7.2 版本。





wangl...@geekplus.com.cn


Re: low performance in running queries

2019-10-30 Thread Zhenghua Gao
I think more runtime information would help figure out where the problem is.
1) how many parallelisms actually working
2) the metrics for each operator
3) the jvm profiling information, etc

*Best Regards,*
*Zhenghua Gao*


On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
wrote:

> Thanks Gao for the reply. I used the parallelism parameter with different
> values like 6 and 8 but still the execution time is not comparable with a
> single threaded python script. What would be the reasonable value for the
> parallelism?
>
> Best,
>
> Habib
> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>
> The reason might be the parallelism of your task is only 1, that's too
> low.
> See [1] to specify proper parallelism  for your job, and the execution
> time should be reduced significantly.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
> wrote:
>
>> Hi all,
>>
>> I am running Flink on a standalone cluster and getting very long
>> execution time for the streaming queries like WordCount for a fixed text
>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
>> have a text file with size of 2GB. When I run the Flink on a standalone
>> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize,
>> it took around two hours to finish counting this file while a simple
>> python script can do it in around 7 minutes. Just wondering what is
>> wrong with my setup. I ran the experiments on a cluster with six
>> taskManagers, but I still get very long execution time like 25 minutes
>> or so. I tried to increase the JVM heap size to have lower execution
>> time but it did not help. I attached the log file and the Flink
>> configuration file to this email.
>>
>> Best,
>>
>> Habib
>>
>> --
> Habib Mostafaei, Ph.D.
> Postdoctoral researcher
> TU Berlin,
> FG INET, MAR 4.003
> Marchstraße 23, 10587 Berlin
>
>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread tison
Thanks for your attentions!

@shixiaoga...@gmail.com 

Yes correct. We try to avoid jobs affect one another. Also a local
ClusterClient
in case saves the overhead about retry before leader elected and persist
JobGraph before submission in RestClusterClient as well as the net cost.

@Paul Lam 

1. Here is already a note[1] about multiple part jobs. I am also confused a
bit
on this concept at first :-) Things go in similar way if you program
contains the
only JobGraph so that I think per-program acts like per-job-graph in this
case
which provides compatibility for many of one job graph program.

Besides, we have to respect user program which doesn't with current
implementation because we return abruptly when calling env#execute which
hijack user control so that they cannot deal with the job result or the
future of
it. I think this is why we have to add a detach/attach option.

2. For compilation part, I think it could be a workaround that you upload
those
resources in a commonly known address such as HDFS so that compilation
can read from either client or cluster.

Best,
tison.

[1]
https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430


Newport, Billy  于2019年10月30日周三 下午10:41写道:

> We execute multiple job graphs routinely because we cannot submit a single
> graph without it blowing up. I believe Regina spoke to this in Berlin
> during her talk. We instead if we are processing a database ingestion with
> 200 tables in it, we do a job graph per table rather than a single job
> graph that does all tables instead. A single job graph can be in the tens
> of thousands of nodes in our largest cases and we have found flink (as os
> 1.3/1.6.4) cannot handle graphs of that size. We’re currently testing 1.9.1
> but have not retested the large graph scenario.
>
>
>
> Billy
>
>
>
>
>
> *From:* Paul Lam [mailto:paullin3...@gmail.com]
> *Sent:* Wednesday, October 30, 2019 8:41 AM
> *To:* SHI Xiaogang
> *Cc:* tison; dev; user
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> Hi,
>
>
>
> Thanks for starting the discussion.
>
>
>
> WRT the per-job semantic, it looks natural to me that per-job means
> per-job-graph,
>
> because in my understanding JobGraph is the representation of a job. Could
> you
>
> share some use case in which a user program should contain multiple job
> graphs?
>
>
>
> WRT the per-program mode, I’m also in flavor of a unified cluster-side
> execution
>
> for user program, so +1 from my side.
>
>
>
> But I think there may be some values for the current per-job mode: we now
> have
>
> some common resources available on the client machine that would be read
> by main
>
> methods in user programs. If migrated to per-program mode, we must
> explicitly
>
> set the specific resources for each user program and ship them to the
> cluster,
>
> it would be a bit inconvenient.  Also, as the job graph is compiled at the
> client,
>
> we can recognize the errors caused by user code before starting the
> cluster
>
> and easily get access to the logs.
>
>
>
> Best,
>
> Paul Lam
>
>
>
> 在 2019年10月30日,16:22,SHI Xiaogang  写道:
>
>
>
> Hi
>
>
>
> Thanks for bringing this.
>
>
>
> The design looks very nice to me in that
>
> 1. In the new per-job mode, we don't need to compile user programs in the
> client and can directly run user programs with user jars. That way, it's
> easier for resource isolation in multi-tenant platforms and is much safer.
>
> 2. The execution of user programs can be unified in session and per-job
> modes. In session mode, user jobs are submitted via a remote ClusterClient
> while in per-job mode user jobs are submitted via a local ClusterClient.
>
>
>
> Regards,
>
> Xiaogang
>
>
>
> tison  于2019年10月30日周三 下午3:30写道:
>
> (CC user list because I think users may have ideas on how per-job mode
> should look like)
>
>
>
> Hi all,
>
> In the discussion about Flink on k8s[1] we encounter a problem that
> opinions
> diverge in how so-called per-job mode works. This thread is aimed at
> stating
> a dedicated discussion about per-job semantic and how to implement it.
>
> **The AS IS per-job mode**
>
> * in standalone deployment, we bundle user jar with Flink jar, retrieve
> JobGraph which is the very first JobGraph from user program in classpath,
> and then start a Dispatcher with this JobGraph preconfigured, which
> launches it as "recovered" job.
>
> * in YARN deployment, we accept submission via CliFrontend, extract
> JobGraph
> which is the very first JobGraph from user program submitted, serialize
> the JobGraph and upload it to YARN as resource, and then when AM starts,
> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
> preconfigured, follows are the same.
>
> Specifically, in order to support multiple parts job, if YARN deployment
> configured as "attached", it starts a SessionCluster, proceeds the progress
> and shutdown the cluster on job 

Re: Streaming File Sink - Parquet File Writer

2019-10-30 Thread Kostas Kloudas
Hi Vinay,

You are correct when saying that the bulk formats only support
onCheckpointRollingPolicy.

The reason for this has to do with the fact that currently Flink
relies on the Hadoop writer for Parquet.

Bulk formats keep important details about how they write the actual
data (such as compression
schemes, offsets, etc) in metadata and they write this metadata with
the file (e.g. parquet writes
them as a footer). The hadoop writer gives no access to these
metadata. Given this, there is
no way for flink to be able to checkpoint a part file securely without
closing it.

The solution would be to write our own writer and not go through the
hadoop one, but there
are no concrete plans for this, as far as I know.

Cheers,
Kostas


On Tue, Oct 29, 2019 at 12:57 PM Vinay Patil  wrote:
>
> Hi,
>
> I am not able to roll the files based on file size as the bulkFormat has 
> onCheckpointRollingPolicy.
>
> One way is to write CustomStreamingFileSink and provide RollingPolicy like 
> RowFormatBuilder. Is this the correct way to go ahead ?
>
> Another way is to write ParquetEncoder and use RowFormatBuilder.
>
> P.S. Curious to know Why was the RollingPolicy not exposed in case of 
> BulkFormat ?
>
> Regards,
> Vinay Patil


Flink S3 error

2019-10-30 Thread Harrison Xu
I'm seeing this exception with the S3 uploader - it claims a previously
part file was not found. Full jobmanager logs attached. (Flink 1.8)

java.io.FileNotFoundException: No such file or directory:
s3a://qcache/tmp/kafka/meta/rq_features/dt=2019-10-30T15/partition_1/_part-4-1169_tmp_21400e5e-3921-4f33-a980-ac953b50b4b7
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at 
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:98)
at 
org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
at 
org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverRecoverableUpload(S3RecoverableMultipartUploadFactory.java:75)
at 
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:95)
at 
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:50)
at 
com.quora.dataInfra.s3connector.flink.filesystem.Bucket.restoreInProgressFile(Bucket.java:146)
at 
com.quora.dataInfra.s3connector.flink.filesystem.Bucket.(Bucket.java:133)
at 
com.quora.dataInfra.s3connector.flink.filesystem.Bucket.restore(Bucket.java:404)
at 
com.quora.dataInfra.s3connector.flink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
at 
com.quora.dataInfra.s3connector.flink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
at 
com.quora.dataInfra.s3connector.flink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
at 
com.quora.dataInfra.s3connector.flink.filesystem.Buckets.initializeState(Buckets.java:154)
at 
com.quora.dataInfra.s3connector.flink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:344)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread kant kodali
"I think you would have to implement your own custom operator that would
output changes to it’s internal state as a side output"

Yes, I am looking for this but I am not sure how to do this? Should I use
the processFunction(like the event-driven applications) ?

On Wed, Oct 30, 2019 at 8:53 AM Piotr Nowojski  wrote:

> Hi Kant,
>
> Checkpointing interval is configurable, but I wouldn’t count on it working
> well with even 10s intervals.
>
> I think what you are this is not supported by Flink generically. Maybe
> Queryable state I mentioned before? But I have never used it.
>
> I think you would have to implement your own custom operator that would
> output changes to it’s internal state as a side output.
>
> Piotrek
>
> On 30 Oct 2019, at 16:14, kant kodali  wrote:
>
> Hi Piotr,
>
> I am talking about the internal state. How often this state gets
> checkpointed? if it is every few seconds then it may not meet our real-time
> requirement(sub second).
>
> The question really is can I read this internal state in a streaming
> fashion in an update mode? The state processor API seems to expose DataSet
> but not DataStream so I am not sure how to read internal state in
> streaming fashion in an update made?
>
> Thanks!
>
> On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure what are you trying to achieve. What do you mean by “state
>> of full outer join”? The result of it? Or it’s internal state? Also keep in
>> mind, that internal state of the operators in Flink is already
>> snapshoted/written down to an external storage during checkpointing
>> mechanism.
>>
>> The result should be simple, just write it to some Sink.
>>
>> For the internal state, it sounds like you are doing something not the
>> way it was intended… having said that, you can try one of the following
>> options:
>> a) Implement your own outer join operator (might not be as easy if you
>> are using Table API/SQL) and just create a side output for the state
>> changes.
>> b) Use state processor API to read the content of a savepoint/checkpoint
>> [1][2]
>> c) Use queryable state [3] (I’m not sure about this, I have never used
>> queryable state)
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>>
>> On 29 Oct 2019, at 16:42, kant kodali  wrote:
>>
>> Hi All,
>>
>> I want to do a full outer join on two streaming data sources and store
>> the state of full outer join in some external storage like rocksdb or
>> something else. And then want to use this intermediate state as a streaming
>> source again, do some transformation and write it to some external store.
>> is that possible with Flink 1.9?
>>
>> Also what storage systems support push mechanism for the intermediate
>> data? For example, In the use case above does rocksdb support push/emit
>> events in a streaming fashion?
>>
>> Thanks!
>>
>>
>>
>


Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-30 Thread Piotr Nowojski
But from the stack trace that you have posted it looks like you are using 
Hadoop’s S3 implementation for the checkpointing? If so, can you try using 
Presto and check whether you still encounter the same issue?

Also, could you explain how to reproduce the issue? What configuration are you 
using? Does it happens always? On the first checkpoint?

Piotrek 

> On 30 Oct 2019, at 17:43, spoganshev  wrote:
> 
> Actually, I forgot to mention that it happens when there's also a presto
> library in plugins folder (we are using presto for checkpoints and hadoop
> for file sinks in the job itself)
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: 如何过滤异常的timestamp?

2019-10-30 Thread Yun Tang
Hi 瑞斌

如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter
 operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。

祝好
唐云

From: 邢瑞斌 
Sent: Wednesday, October 30, 2019 17:57
To: user-zh@flink.apache.org 
Subject: 如何过滤异常的timestamp?

Hi:

从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢?

我现在的想法是:

将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是
,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢?

求教,谢谢大家!


Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-30 Thread Till Rohrmann
Hi Regina, sorry for not getting back to you earlier. I've gone through the
logs and I couldn't find something suspicious. What I can see though is the
following:

When you start the cluster, you submit a couple of jobs. This starts at
9:20. In total 120 slots are being required to run these jobs. Since you
start a TaskExecutor with a single slot, you need 120 containers to run all
jobs. Flink has sent all container requests by 9:21:40. So far so good.

Shortly after, the cluster receives the first allocated containers.
However, it lasts until 9:29:58 that Flink has received all 120 containers.
I assume it is because the Hadoop cluster is quite contested. Afterwards
one sees that 230 excess containers are being returned.

Given that the cluster is configured
with yarn.heartbeat.container-request-interval: 6, the Hadoop RM
heartbeat is set to one minute. Hence, we report every minute the current
number of required containers. Due to Yarn adding these requests up [1], it
does not seem surprising that we end up with 230 excess containers.
Assuming a constant container fulfillment we end up with 120 + 108 + 96 +
... + 12 + 0 = 540 requested containers on the Yarn RM side. This roughly
matches the 120 + 230 (excess containers).

I'm not exactly sure how we can solve the Yarn problem. Flink would have to
make sure that every container request is only sent once to the Yarn RM
(e.g. canceling the container request after one heartbeat interval has been
passed). However, this is not super trivial and might end up being super
brittle.

Another idea could be to add a minimum and maximum number of
`TaskExecutors` the cluster should keep alive. That way one would only pay
the price of too many excess containers at startup but then the system
would keep at least minimum number of TaskExecutors alive. If you wish this
would imitate a bit the legacy mode Yarn behaviour where you start the
cluster with a fixed number of TaskExecutors.

[1] https://issues.apache.org/jira/browse/YARN-1902

Cheers,
Till

On Wed, Oct 30, 2019 at 4:11 AM Yang Wang  wrote:

> Hi Chan,
>
> If it is a bug, i think it is critical. Could you share the job manager
> logs with me too? I have some time to
> analyze and hope to find the root cause.
>
>
> Best,
> Yang
>
> Chan, Regina  于2019年10月30日周三 上午10:55写道:
>
>> Till, were you able find anything? Do you need more logs?
>>
>>
>>
>>
>>
>> *From:* Till Rohrmann 
>> *Sent:* Saturday, October 26, 2019 1:17 PM
>> *To:* Chan, Regina [Engineering] 
>> *Cc:* Yang Wang ; user 
>> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
>> about the number of pending container requests has diverged
>>
>>
>>
>> Forget my last email. I received the on time code and could access the
>> logs.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Sat, Oct 26, 2019 at 6:49 PM Till Rohrmann 
>> wrote:
>>
>> Hi Regina,
>>
>>
>>
>> I couldn't access the log files because LockBox asked to create a new
>> password and now it asks me for the one time code to confirm this change.
>> It says that it will send the one time code to my registered email which I
>> don't have.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Fri, Oct 25, 2019 at 10:14 PM Till Rohrmann 
>> wrote:
>>
>> Great, thanks a lot Regina. I'll check the logs tomorrow. If info level
>> is not enough, then I'll let you know.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Fri, Oct 25, 2019, 21:20 Chan, Regina  wrote:
>>
>> Till, I added you to this lockbox area where you should be able to
>> download the logs. You should have also received an email with an account
>> created in lockbox where you can set a password. Let me know if you have
>> any issues.
>>
>>
>>
>>
>>
>>
>>
>> *From:* Till Rohrmann 
>> *Sent:* Friday, October 25, 2019 1:24 PM
>> *To:* Chan, Regina [Engineering] 
>> *Cc:* Yang Wang ; user 
>> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
>> about the number of pending container requests has diverged
>>
>>
>>
>> Could you provide me with the full logs of the cluster
>> entrypoint/JobManager. I'd like to see what's going on there.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Fri, Oct 25, 2019, 19:10 Chan, Regina  wrote:
>>
>> Till,
>>
>>
>>
>> We’re still seeing a large number of returned containers even with this
>> heart beat set to something higher. Do you have hints as to what’s going
>> on? It seems to be bursty in nature. The bursty requests cause the job to
>> fail with the cluster not having enough resources because it’s in the
>> process of releasing them.
>>
>> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate enough slots to run the job. Please make sure that the
>> cluster has enough resources.” It causes the job to run very
>> inconsistently.
>>
>>
>>
>> Since legacy mode is now gone in 1.9, we don’t really see many options
>> here.
>>
>>
>>
>> *Run Profile*
>>
>> *Number of returned excess containers*
>>
>> 12G per TM, 2 slots
>> 

Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-30 Thread spoganshev
Actually, I forgot to mention that it happens when there's also a presto
library in plugins folder (we are using presto for checkpoints and hadoop
for file sinks in the job itself)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread Piotr Nowojski
Hi Kant,

Checkpointing interval is configurable, but I wouldn’t count on it working well 
with even 10s intervals. 
 
I think what you are this is not supported by Flink generically. Maybe 
Queryable state I mentioned before? But I have never used it.

I think you would have to implement your own custom operator that would output 
changes to it’s internal state as a side output.

Piotrek

> On 30 Oct 2019, at 16:14, kant kodali  wrote:
> 
> Hi Piotr,
> 
> I am talking about the internal state. How often this state gets 
> checkpointed? if it is every few seconds then it may not meet our real-time 
> requirement(sub second).
> 
> The question really is can I read this internal state in a streaming fashion 
> in an update mode? The state processor API seems to expose DataSet but not 
> DataStream so I am not sure how to read internal state in streaming fashion 
> in an update made?
> 
> Thanks!
> 
> On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski  > wrote:
> Hi,
> 
> I’m not sure what are you trying to achieve. What do you mean by “state of 
> full outer join”? The result of it? Or it’s internal state? Also keep in 
> mind, that internal state of the operators in Flink is already 
> snapshoted/written down to an external storage during checkpointing mechanism.
> 
> The result should be simple, just write it to some Sink.
> 
> For the internal state, it sounds like you are doing something not the way it 
> was intended… having said that, you can try one of the following options:
> a) Implement your own outer join operator (might not be as easy if you are 
> using Table API/SQL) and just create a side output for the state changes.
> b) Use state processor API to read the content of a savepoint/checkpoint 
> [1][2]
> c) Use queryable state [3] (I’m not sure about this, I have never used 
> queryable state)
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>  
> 
> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html 
> 
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>  
> 
> 
>> On 29 Oct 2019, at 16:42, kant kodali > > wrote:
>> 
>> Hi All,
>> 
>> I want to do a full outer join on two streaming data sources and store the 
>> state of full outer join in some external storage like rocksdb or something 
>> else. And then want to use this intermediate state as a streaming source 
>> again, do some transformation and write it to some external store. is that 
>> possible with Flink 1.9?
>> 
>> Also what storage systems support push mechanism for the intermediate data? 
>> For example, In the use case above does rocksdb support push/emit events in 
>> a streaming fashion?
>> 
>> Thanks!
> 



Re: Flink 1.5+ performance in a Java standalone environment

2019-10-30 Thread Jakub Danilewicz
Hi,

I can confirm that the performance drop is directly related to FLIP-6 changes. 
Applying this modification to the code posted above restores the previous graph 
processing speed under Flink 1.5.6:

---

org.apache.flink.configuration.Configuration customConfig = new 
org.apache.flink.configuration.Configuration();
customConfig.setString("mode", "legacy");
final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(customConfig);
env.setParallelism(parallelism);

---

Disabling the "taskmanager.network.credit-model" parameter in Configuration 
provides only a very slight improvement in the performance under Flink 1.5.6.

Now the big question: what about newer versions where the legacy mode is not 
supported anymore? I checked Flink 1.8.2 and it does not work.

Is there any way to make the new mode as performant as the "legacy" one in the 
standalone scenarios? Alternatively may we expect improvements in this area in 
the upcoming releases?

Best,

Jakub

On 2019/10/30 14:11:19, Piotr Nowojski  wrote: 
> Hi,
> 
> In Flink 1.5 there were three big changes, that could affect performance. 
> 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> 2. Credit base flow control (especially if you are using SSL)
> 3. Low latency network changes
> 
> I would suspect them in that order. First and second you can disable via 
> configuration switches [1] and [2] respectively.
> 
> [1] “mode:legacy" 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
>  
> 
> [2] "taskmanager.network.credit-model:false”
> 
> Could you try disabling them out?
> 
> Piotrek
> 
> > On 28 Oct 2019, at 14:10, Jakub Danilewicz  
> > wrote:
> > 
> > Thanks for your replies.
> > 
> > We use Flink from within a standalone Java 8 application (no Hadoop, no 
> > clustering), so it's basically boils down to running a simple code like 
> > this:
> > 
> > import java.util.*;
> > import org.apache.flink.api.java.ExecutionEnvironment;
> > import org.apache.flink.graph.*;
> > import org.apache.flink.graph.library.CommunityDetection;
> > 
> > public class FlinkTester {
> >final Random random = new Random(1);
> >final float density = 3.0F;
> > 
> >public static void main(String[] args) throws Exception {
> >new FlinkTester().execute(100, 4);
> >}
> > 
> >private void execute(int numEdges, int parallelism) throws Exception {
> >final ExecutionEnvironment env = 
> > ExecutionEnvironment.createLocalEnvironment(parallelism);
> >final Graph graph = createGraph(numEdges, env);
> > 
> >final long start = System.currentTimeMillis();
> >List> vertices = graph.run(new 
> > CommunityDetection(10, 0.5)).getVertices().collect();
> >System.out.println(vertices.size() + " vertices processed in " + 
> > (System.currentTimeMillis()-start)/1000 + " s");
> >}
> > 
> >private Graph createGraph(int numEdges, 
> > ExecutionEnvironment env) {
> >System.out.println("Creating new graph of " + numEdges + " 
> > edges...");
> > 
> >final int maxNumVertices = (int)(numEdges/density);
> >final Map> vertexMap = new 
> > HashMap<>(maxNumVertices);
> >final Map> edgeMap = new 
> > HashMap<>(numEdges);
> > 
> >while (edgeMap.size() < numEdges) {
> >long sourceId = random.nextInt(maxNumVertices) + 1;
> >long targetId = sourceId;
> >while (targetId == sourceId)
> >targetId = random.nextInt(maxNumVertices) + 1;
> > 
> >final String edgeKey = sourceId + "#" + targetId;
> >if (!edgeMap.containsKey(edgeKey)) {
> >edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
> >if (!vertexMap.containsKey(sourceId))
> >vertexMap.put(sourceId, new Vertex<>(sourceId, 
> > sourceId));
> >if (!vertexMap.containsKey(targetId))
> >vertexMap.put(targetId, new Vertex<>(targetId, 
> > targetId));
> >}
> >}
> > 
> >System.out.println(edgeMap.size() + " edges created between " + 
> > vertexMap.size() + " vertices.");
> >return Graph.fromCollection(vertexMap.values(), edgeMap.values(), 
> > env);
> >}
> > }
> > 
> > No matter what graph algorithm you pick for benchmarking (above it's 
> > CommunityDetection) the bigger the graph the wider performance gap (and 
> > higher CPU/memory consumption) you observe when comparing the execution 
> > times between the old engine (<= Flink 1.4.2) and the new one (checked on 
> > 1.5.6, 1.8.2 and 1.9.1).
> > 
> > Just run the code yourselves (you may play with the number of edges and 
> > parallel threads).
> > 
> > Best,
> > 
> > Jakub
> > 
> 
> 


Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread kant kodali
Hi Piotr,

I am talking about the internal state. How often this state gets
checkpointed? if it is every few seconds then it may not meet our real-time
requirement(sub second).

The question really is can I read this internal state in a streaming
fashion in an update mode? The state processor API seems to expose DataSet
but not DataStream so I am not sure how to read internal state in
streaming fashion in an update made?

Thanks!

On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure what are you trying to achieve. What do you mean by “state of
> full outer join”? The result of it? Or it’s internal state? Also keep in
> mind, that internal state of the operators in Flink is already
> snapshoted/written down to an external storage during checkpointing
> mechanism.
>
> The result should be simple, just write it to some Sink.
>
> For the internal state, it sounds like you are doing something not the way
> it was intended… having said that, you can try one of the following options:
> a) Implement your own outer join operator (might not be as easy if you are
> using Table API/SQL) and just create a side output for the state changes.
> b) Use state processor API to read the content of a savepoint/checkpoint
> [1][2]
> c) Use queryable state [3] (I’m not sure about this, I have never used
> queryable state)
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>
> On 29 Oct 2019, at 16:42, kant kodali  wrote:
>
> Hi All,
>
> I want to do a full outer join on two streaming data sources and store the
> state of full outer join in some external storage like rocksdb or something
> else. And then want to use this intermediate state as a streaming source
> again, do some transformation and write it to some external store. is
> that possible with Flink 1.9?
>
> Also what storage systems support push mechanism for the intermediate
> data? For example, In the use case above does rocksdb support push/emit
> events in a streaming fashion?
>
> Thanks!
>
>
>


RE: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread Newport, Billy
We execute multiple job graphs routinely because we cannot submit a single 
graph without it blowing up. I believe Regina spoke to this in Berlin during 
her talk. We instead if we are processing a database ingestion with 200 tables 
in it, we do a job graph per table rather than a single job graph that does all 
tables instead. A single job graph can be in the tens of thousands of nodes in 
our largest cases and we have found flink (as os 1.3/1.6.4) cannot handle 
graphs of that size. We’re currently testing 1.9.1 but have not retested the 
large graph scenario.

Billy


From: Paul Lam [mailto:paullin3...@gmail.com]
Sent: Wednesday, October 30, 2019 8:41 AM
To: SHI Xiaogang
Cc: tison; dev; user
Subject: Re: [DISCUSS] Semantic and implementation of per-job mode

Hi,

Thanks for starting the discussion.

WRT the per-job semantic, it looks natural to me that per-job means 
per-job-graph,
because in my understanding JobGraph is the representation of a job. Could you
share some use case in which a user program should contain multiple job graphs?

WRT the per-program mode, I’m also in flavor of a unified cluster-side execution
for user program, so +1 from my side.

But I think there may be some values for the current per-job mode: we now have
some common resources available on the client machine that would be read by main
methods in user programs. If migrated to per-program mode, we must explicitly
set the specific resources for each user program and ship them to the cluster,
it would be a bit inconvenient.  Also, as the job graph is compiled at the 
client,
we can recognize the errors caused by user code before starting the cluster
and easily get access to the logs.

Best,
Paul Lam


在 2019年10月30日,16:22,SHI Xiaogang 
mailto:shixiaoga...@gmail.com>> 写道:

Hi

Thanks for bringing this.

The design looks very nice to me in that
1. In the new per-job mode, we don't need to compile user programs in the 
client and can directly run user programs with user jars. That way, it's easier 
for resource isolation in multi-tenant platforms and is much safer.
2. The execution of user programs can be unified in session and per-job modes. 
In session mode, user jobs are submitted via a remote ClusterClient while in 
per-job mode user jobs are submitted via a local ClusterClient.

Regards,
Xiaogang

tison mailto:wander4...@gmail.com>> 于2019年10月30日周三 
下午3:30写道:
(CC user list because I think users may have ideas on how per-job mode should 
look like)

Hi all,

In the discussion about Flink on k8s[1] we encounter a problem that opinions
diverge in how so-called per-job mode works. This thread is aimed at stating
a dedicated discussion about per-job semantic and how to implement it.

**The AS IS per-job mode**

* in standalone deployment, we bundle user jar with Flink jar, retrieve
JobGraph which is the very first JobGraph from user program in classpath,
and then start a Dispatcher with this JobGraph preconfigured, which
launches it as "recovered" job.

* in YARN deployment, we accept submission via CliFrontend, extract JobGraph
which is the very first JobGraph from user program submitted, serialize
the JobGraph and upload it to YARN as resource, and then when AM starts,
retrieve the JobGraph as resource and start Dispatcher with this JobGraph
preconfigured, follows are the same.

Specifically, in order to support multiple parts job, if YARN deployment
configured as "attached", it starts a SessionCluster, proceeds the progress
and shutdown the cluster on job finished.

**Motivation**

The implementation mentioned above, however, suffers from problems. The major
two of them are 1. only respect the very first JobGraph from user program 2.
compile job in client side

1. Only respect the very first JobGraph from user program

There is already issue about this topic[2]. As we extract JobGraph from user
program by hijacking Environment#execute we actually abort any execution
after the first call to #execute. Besides it surprises users many times that
any logic they write in the program is possibly never executed, here the
problem is that the semantic of "job" from Flink perspective. I'd like to say
in current implementation "per-job" is actually "per-job-graph". However,
in practices since we support jar submission it is "per-program" semantic
wanted.

2. Compile job in client side

Well, standalone deployment is not in the case. But in YARN deployment, we
compile job and get JobGraph in client side, and then upload it to YARN.
This approach, however, somehow breaks isolation. We have observed that user
program contains exception handling logic which call System.exit in main
method, which causes a compilation of the job exit the whole client at once.
It is a critical problem if we manage multiple Flink job in a unique platform.
In this case, it shut down the whole service.

Besides there are many times I was asked why per-job mode doesn't run
"just like" session mode but with a dedicated cluster. It might imply that
current 

Re: low performance in running queries

2019-10-30 Thread Piotr Nowojski
Hi,

I would also suggest to just attach a code profiler to the process during those 
2 hours and gather some results. It might answer some questions what is taking 
so long time.

Piotrek

> On 30 Oct 2019, at 15:11, Chris Miller  wrote:
> 
> I haven't run any benchmarks with Flink or even used it enough to directly 
> help with your question, however I suspect that the following article might 
> be relevant:
> 
> http://dsrg.pdos.csail.mit.edu/2016/06/26/scalability-cost/ 
> 
> 
> Given the computation you're performing is trivial, it's possible that the 
> additional overhead of serialisation, interprocess communication, state 
> management etc that distributed systems like Flink require are dominating the 
> runtime here. 2 hours (or even 25 minutes) still seems too long to me 
> however, so hopefully it really is just a configuration issue of some sort. 
> Either way, if you do figure this out or anyone with good knowledge of the 
> article above in relation to Flink is able to give their thoughts, I'd be 
> very interested in hearing more.
> 
> Regards,
> Chris
> 
> 
> -- Original Message --
> From: "Habib Mostafaei"  >
> To: "Zhenghua Gao" mailto:doc...@gmail.com>>
> Cc: "user" mailto:user@flink.apache.org>>; "Georgios 
> Smaragdakis"  >; "Niklas Semmler" 
> mailto:nik...@inet.tu-berlin.de>>
> Sent: 30/10/2019 12:25:28
> Subject: Re: low performance in running queries
> 
>> Thanks Gao for the reply. I used the parallelism parameter with different 
>> values like 6 and 8 but still the execution time is not comparable with a 
>> single threaded python script. What would be the reasonable value for the 
>> parallelism?
>> 
>> Best,
>> 
>> Habib
>> 
>> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>>> The reason might be the parallelism of your task is only 1, that's too low.
>>> See [1] to specify proper parallelism  for your job, and the execution time 
>>> should be reduced significantly.
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html 
>>> 
>>> 
>>> Best Regards,
>>> Zhenghua Gao
>>> 
>>> 
>>> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei >> > wrote:
>>> Hi all,
>>> 
>>> I am running Flink on a standalone cluster and getting very long 
>>> execution time for the streaming queries like WordCount for a fixed text 
>>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I 
>>> have a text file with size of 2GB. When I run the Flink on a standalone 
>>> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize, 
>>> it took around two hours to finish counting this file while a simple 
>>> python script can do it in around 7 minutes. Just wondering what is 
>>> wrong with my setup. I ran the experiments on a cluster with six 
>>> taskManagers, but I still get very long execution time like 25 minutes 
>>> or so. I tried to increase the JVM heap size to have lower execution 
>>> time but it did not help. I attached the log file and the Flink 
>>> configuration file to this email.
>>> 
>>> Best,
>>> 
>>> Habib
>>> 
>> -- 
>> Habib Mostafaei, Ph.D.
>> Postdoctoral researcher
>> TU Berlin,
>> FG INET, MAR 4.003
>> Marchstraße 23, 10587 Berlin



Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread Piotr Nowojski
Hi,

I’m not sure what are you trying to achieve. What do you mean by “state of full 
outer join”? The result of it? Or it’s internal state? Also keep in mind, that 
internal state of the operators in Flink is already snapshoted/written down to 
an external storage during checkpointing mechanism.

The result should be simple, just write it to some Sink.

For the internal state, it sounds like you are doing something not the way it 
was intended… having said that, you can try one of the following options:
a) Implement your own outer join operator (might not be as easy if you are 
using Table API/SQL) and just create a side output for the state changes.
b) Use state processor API to read the content of a savepoint/checkpoint [1][2]
c) Use queryable state [3] (I’m not sure about this, I have never used 
queryable state)

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
 

[2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html 

[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
 


> On 29 Oct 2019, at 16:42, kant kodali  wrote:
> 
> Hi All,
> 
> I want to do a full outer join on two streaming data sources and store the 
> state of full outer join in some external storage like rocksdb or something 
> else. And then want to use this intermediate state as a streaming source 
> again, do some transformation and write it to some external store. is that 
> possible with Flink 1.9?
> 
> Also what storage systems support push mechanism for the intermediate data? 
> For example, In the use case above does rocksdb support push/emit events in a 
> streaming fashion?
> 
> Thanks!



Re: Flink 1.5+ performance in a Java standalone environment

2019-10-30 Thread Piotr Nowojski
Hi,

In Flink 1.5 there were three big changes, that could affect performance. 
1. FLIP-6 changes (As previously Yang and Fabian mentioned)
2. Credit base flow control (especially if you are using SSL)
3. Low latency network changes

I would suspect them in that order. First and second you can disable via 
configuration switches [1] and [2] respectively.

[1] “mode:legacy" 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
 

[2] "taskmanager.network.credit-model:false”

Could you try disabling them out?

Piotrek

> On 28 Oct 2019, at 14:10, Jakub Danilewicz  
> wrote:
> 
> Thanks for your replies.
> 
> We use Flink from within a standalone Java 8 application (no Hadoop, no 
> clustering), so it's basically boils down to running a simple code like this:
> 
> import java.util.*;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.graph.*;
> import org.apache.flink.graph.library.CommunityDetection;
> 
> public class FlinkTester {
>final Random random = new Random(1);
>final float density = 3.0F;
> 
>public static void main(String[] args) throws Exception {
>new FlinkTester().execute(100, 4);
>}
> 
>private void execute(int numEdges, int parallelism) throws Exception {
>final ExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment(parallelism);
>final Graph graph = createGraph(numEdges, env);
> 
>final long start = System.currentTimeMillis();
>List> vertices = graph.run(new 
> CommunityDetection(10, 0.5)).getVertices().collect();
>System.out.println(vertices.size() + " vertices processed in " + 
> (System.currentTimeMillis()-start)/1000 + " s");
>}
> 
>private Graph createGraph(int numEdges, 
> ExecutionEnvironment env) {
>System.out.println("Creating new graph of " + numEdges + " edges...");
> 
>final int maxNumVertices = (int)(numEdges/density);
>final Map> vertexMap = new 
> HashMap<>(maxNumVertices);
>final Map> edgeMap = new 
> HashMap<>(numEdges);
> 
>while (edgeMap.size() < numEdges) {
>long sourceId = random.nextInt(maxNumVertices) + 1;
>long targetId = sourceId;
>while (targetId == sourceId)
>targetId = random.nextInt(maxNumVertices) + 1;
> 
>final String edgeKey = sourceId + "#" + targetId;
>if (!edgeMap.containsKey(edgeKey)) {
>edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
>if (!vertexMap.containsKey(sourceId))
>vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
>if (!vertexMap.containsKey(targetId))
>vertexMap.put(targetId, new Vertex<>(targetId, targetId));
>}
>}
> 
>System.out.println(edgeMap.size() + " edges created between " + 
> vertexMap.size() + " vertices.");
>return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
>}
> }
> 
> No matter what graph algorithm you pick for benchmarking (above it's 
> CommunityDetection) the bigger the graph the wider performance gap (and 
> higher CPU/memory consumption) you observe when comparing the execution times 
> between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 
> 1.8.2 and 1.9.1).
> 
> Just run the code yourselves (you may play with the number of edges and 
> parallel threads).
> 
> Best,
> 
> Jakub
> 



Re: low performance in running queries

2019-10-30 Thread Chris Miller
I haven't run any benchmarks with Flink or even used it enough to 
directly help with your question, however I suspect that the following 
article might be relevant:


http://dsrg.pdos.csail.mit.edu/2016/06/26/scalability-cost/

Given the computation you're performing is trivial, it's possible that 
the additional overhead of serialisation, interprocess communication, 
state management etc that distributed systems like Flink require are 
dominating the runtime here. 2 hours (or even 25 minutes) still seems 
too long to me however, so hopefully it really is just a configuration 
issue of some sort. Either way, if you do figure this out or anyone with 
good knowledge of the article above in relation to Flink is able to give 
their thoughts, I'd be very interested in hearing more.


Regards,
Chris


-- Original Message --
From: "Habib Mostafaei" 
To: "Zhenghua Gao" 
Cc: "user" ; "Georgios Smaragdakis" 
; "Niklas Semmler" 


Sent: 30/10/2019 12:25:28
Subject: Re: low performance in running queries

Thanks Gao for the reply. I used the parallelism parameter with 
different values like 6 and 8 but still the execution time is not 
comparable with a single threaded python script. What would be the 
reasonable value for the parallelism?


Best,

Habib

On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
The reason might be the parallelism of your task is only 1, that's too 
low.
See [1] to specify proper parallelism  for your job, and the execution 
time should be reduced significantly.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html


Best Regards,
Zhenghua Gao


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
 wrote:

Hi all,

I am running Flink on a standalone cluster and getting very long
execution time for the streaming queries like WordCount for a fixed 
text

file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
have a text file with size of 2GB. When I run the Flink on a 
standalone
cluster, i.e., one JobManager and one taskManager with 25GB of 
heapsize,

it took around two hours to finish counting this file while a simple
python script can do it in around 7 minutes. Just wondering what is
wrong with my setup. I ran the experiments on a cluster with six
taskManagers, but I still get very long execution time like 25 
minutes

or so. I tried to increase the JVM heap size to have lower execution
time but it did not help. I attached the log file and the Flink
configuration file to this email.

Best,

Habib


--
Habib Mostafaei, Ph.D.
Postdoctoral researcher
TU Berlin,
FG INET, MAR 4.003
Marchstraße 23, 10587 Berlin

Re: PreAggregate operator with timeout trigger

2019-10-30 Thread Piotr Nowojski
Hi,

If you want to register a processing/event time trigger in your custom 
operator, you can take a look how other operators are doing it, by calling
AbstractStreamOperator#getInternalTimerService [1]. You can look around the 
Flink’s code base for usages of this method, there are at least couple of them 
(like CepOperator or IntervalJoinOperator).

Hope that helps,
Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable-
 


> On 28 Oct 2019, at 10:09, Felipe Gutierrez  
> wrote:
> 
> Hi all,
> 
> I have my own stream operator which trigger an aggregation based on the 
> number of items received 
> (OneInputStreamOperator#processElement(StreamRecord)). However, it is 
> possible to not trigger my aggregation if my operator does not receive the 
> max items that have been set. So, I need a timeout trigger.
> 
> I am confused if I need to extend Trigger on 
> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a 
> parameter of the operator class MyPreAggregate-AbstractUdfStreamOperator V, IN, OUT, W extends Window>. what is the best approach?
> 
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com 
> 


Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-30 Thread Piotr Nowojski
Hi,

Thanks for reporting the issue, I’ve created the jira ticket for that [1]. We 
will investigate it and try to address it somehow. 

Could you try out if the same issue happen when you use flink-s3-fs-presto [2]?

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-14574
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#shaded-hadooppresto-s3-file-systems
 


> On 26 Oct 2019, at 23:11, spoganshev  wrote:
> 
> We've added flink-s3-fs-hadoop library to plugins folder and trying to
> bootstrap state to S3 using S3A protocol. The following exception happens
> (unless hadoop library is put to lib folder instead of plugins). Looks like
> S3A filesystem is trying to use "local" filesystem for temporary files and
> fails:
> 
> 
> java.lang.Exception: Could not write timer service of MapPartition
> (d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream.
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
>   at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>   at
> org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59)
>   at
> org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84)
>   at
> org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>   at
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
>   at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Could not open output stream for state
> backend
>   at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at
> org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61)
>   at java.io.DataOutputStream.write(DataOutputStream.java:107)
>   at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
>   at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
>   at
> org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58)
>   at
> org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163)
>   at
> org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57)
>   at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141)
>   at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128)
>   at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72)
>   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
>   at
> org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199)
>   at
> org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117)
>   at
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101)
>   at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
>   at
> 

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread Paul Lam
Hi,

Thanks for starting the discussion.

WRT the per-job semantic, it looks natural to me that per-job means 
per-job-graph,
because in my understanding JobGraph is the representation of a job. Could you 
share some use case in which a user program should contain multiple job graphs?

WRT the per-program mode, I’m also in flavor of a unified cluster-side 
execution 
for user program, so +1 from my side. 

But I think there may be some values for the current per-job mode: we now have 
some common resources available on the client machine that would be read by 
main 
methods in user programs. If migrated to per-program mode, we must explicitly 
set the specific resources for each user program and ship them to the cluster, 
it would be a bit inconvenient.  Also, as the job graph is compiled at the 
client,
we can recognize the errors caused by user code before starting the cluster 
and easily get access to the logs. 

Best,
Paul Lam

> 在 2019年10月30日,16:22,SHI Xiaogang  写道:
> 
> Hi
> 
> Thanks for bringing this. 
> 
> The design looks very nice to me in that 
> 1. In the new per-job mode, we don't need to compile user programs in the 
> client and can directly run user programs with user jars. That way, it's 
> easier for resource isolation in multi-tenant platforms and is much safer.
> 2. The execution of user programs can be unified in session and per-job 
> modes. In session mode, user jobs are submitted via a remote ClusterClient 
> while in per-job mode user jobs are submitted via a local ClusterClient.
> 
> Regards,
> Xiaogang
> 
> tison mailto:wander4...@gmail.com>> 于2019年10月30日周三 
> 下午3:30写道:
> (CC user list because I think users may have ideas on how per-job mode should 
> look like)
> 
> Hi all,
> 
> In the discussion about Flink on k8s[1] we encounter a problem that opinions
> diverge in how so-called per-job mode works. This thread is aimed at stating
> a dedicated discussion about per-job semantic and how to implement it.
> 
> **The AS IS per-job mode**
> 
> * in standalone deployment, we bundle user jar with Flink jar, retrieve
> JobGraph which is the very first JobGraph from user program in classpath,
> and then start a Dispatcher with this JobGraph preconfigured, which
> launches it as "recovered" job.
> 
> * in YARN deployment, we accept submission via CliFrontend, extract JobGraph
> which is the very first JobGraph from user program submitted, serialize
> the JobGraph and upload it to YARN as resource, and then when AM starts,
> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
> preconfigured, follows are the same.
> 
> Specifically, in order to support multiple parts job, if YARN deployment
> configured as "attached", it starts a SessionCluster, proceeds the progress
> and shutdown the cluster on job finished.
> 
> **Motivation**
> 
> The implementation mentioned above, however, suffers from problems. The major
> two of them are 1. only respect the very first JobGraph from user program 2.
> compile job in client side
> 
> 1. Only respect the very first JobGraph from user program
> 
> There is already issue about this topic[2]. As we extract JobGraph from user
> program by hijacking Environment#execute we actually abort any execution
> after the first call to #execute. Besides it surprises users many times that
> any logic they write in the program is possibly never executed, here the
> problem is that the semantic of "job" from Flink perspective. I'd like to say
> in current implementation "per-job" is actually "per-job-graph". However,
> in practices since we support jar submission it is "per-program" semantic
> wanted.
> 
> 2. Compile job in client side
> 
> Well, standalone deployment is not in the case. But in YARN deployment, we
> compile job and get JobGraph in client side, and then upload it to YARN.
> This approach, however, somehow breaks isolation. We have observed that user
> program contains exception handling logic which call System.exit in main
> method, which causes a compilation of the job exit the whole client at once.
> It is a critical problem if we manage multiple Flink job in a unique platform.
> In this case, it shut down the whole service.
> 
> Besides there are many times I was asked why per-job mode doesn't run
> "just like" session mode but with a dedicated cluster. It might imply that
> current implementation mismatches users' demand.
> 
> **Proposal**
> 
> In order to provide a "per-program" semantic mode which acts "just like" 
> session
> mode but with a dedicated cluster, I propose a workflow as below. It acts like
> starting a drive on cluster but is not a general driver solution as proposed
> here[3], the main purpose of the workflow below is for providing a 
> "per-program"
> semantic mode.
> 
> *From CliFrontend*
> 
> 1. CliFrontend receives submission, gathers all configuration and starts a
> corresponding ClusterDescriptor.
> 
> 2. ClusterDescriptor deploys a cluster with main class 
> ProgramClusterEntrypoint
> while shipping 

Re: low performance in running queries

2019-10-30 Thread Zhenghua Gao
The reason might be the parallelism of your task is only 1, that's too low.
See [1] to specify proper parallelism  for your job, and the execution time
should be reduced significantly.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html

*Best Regards,*
*Zhenghua Gao*


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
wrote:

> Hi all,
>
> I am running Flink on a standalone cluster and getting very long
> execution time for the streaming queries like WordCount for a fixed text
> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
> have a text file with size of 2GB. When I run the Flink on a standalone
> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize,
> it took around two hours to finish counting this file while a simple
> python script can do it in around 7 minutes. Just wondering what is
> wrong with my setup. I ran the experiments on a cluster with six
> taskManagers, but I still get very long execution time like 25 minutes
> or so. I tried to increase the JVM heap size to have lower execution
> time but it did not help. I attached the log file and the Flink
> configuration file to this email.
>
> Best,
>
> Habib
>
>


Re: low performance in running queries

2019-10-30 Thread Habib Mostafaei
Thanks Gao for the reply. I used the parallelism parameter with 
different values like 6 and 8 but still the execution time is not 
comparable with a single threaded python script. What would be the 
reasonable value for the parallelism?


Best,

Habib

On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
The reason might be the parallelism of your task is only 1, that's too 
low.
See [1] to specify proper parallelism  for your job, and the execution 
time should be reduced significantly.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html


*Best Regards,*
*Zhenghua Gao*


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
mailto:ha...@inet.tu-berlin.de>> wrote:


Hi all,

I am running Flink on a standalone cluster and getting very long
execution time for the streaming queries like WordCount for a
fixed text
file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
have a text file with size of 2GB. When I run the Flink on a
standalone
cluster, i.e., one JobManager and one taskManager with 25GB of
heapsize,
it took around two hours to finish counting this file while a simple
python script can do it in around 7 minutes. Just wondering what is
wrong with my setup. I ran the experiments on a cluster with six
taskManagers, but I still get very long execution time like 25
minutes
or so. I tried to increase the JVM heap size to have lower execution
time but it did not help. I attached the log file and the Flink
configuration file to this email.

Best,

Habib


--
Habib Mostafaei, Ph.D.
Postdoctoral researcher
TU Berlin,
FG INET, MAR 4.003
Marchstraße 23, 10587 Berlin



Re: Sending custom statsd tags

2019-10-30 Thread Chesnay Schepler
Not possible, you'll have to extend the StatsDReporter yourself to add 
arbitrary tags.


On 30/10/2019 12:52, Prakhar Mathur wrote:

Hi,

We are running Flink 1.6.2. We are using flink-metrics-statsd jar in 
order to send metrics to telegraf. In order to send custom metrics, we 
are using MetricGroups. Currently, we are trying to send a few 
custom tags but unable to find any examples or documentation regarding 
the same.


Regards
Prakhar Mathur





Sending custom statsd tags

2019-10-30 Thread Prakhar Mathur
Hi,

We are running Flink 1.6.2. We are using flink-metrics-statsd jar in order
to send metrics to telegraf. In order to send custom metrics, we are
using MetricGroups.
Currently, we are trying to send a few custom tags but unable to find
any examples or documentation regarding the same.

Regards
Prakhar Mathur


clear method of Trigger not be called after purge

2019-10-30 Thread Utopia
Hi guys,

I set allowLateness on SessionWindow. Then I return FIRE_AND_PURGE in 
onEventTime method of Trigger, but clear method not be called. While clear will 
be called, if I return FIRE_AND_PURGE in onElement method.

Best regards.
Utopia


如何过滤异常的timestamp?

2019-10-30 Thread 邢瑞斌
Hi:

从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢?

我现在的想法是:

将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是
,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢?

求教,谢谢大家!


Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Alex Wang
Hello Yang, Frank
Thank you for your reply.

Frank I have created a fat jar called flink-sql-submit.jar , the file size
is 8.2M .

You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as it can make
> maintenance difficult. Eg deployment is challenging, upgrade of flink,
> providing it on new nodes etc.
>

I am in favor of your point of view, I want to maintain the project's
dependencies, rather than put it in the Flink lib directory.
I will continue to solve this problem. I will give you feedback if I have
an answer.

Thanks again, Yang and Frank


vino yang  于2019年10月30日周三 下午4:05写道:

> Hi Franke,
>
> From the information provided by Alex:
>
> >> mvn build jar include com.mysql.jdbc.Driver.
>
> it seems he has packaged a fat jar?
>
> Best,
> Vino
>
> Jörn Franke  于2019年10月30日周三 下午2:47写道:
>
>>
>>
>> You can create a fat jar (also called Uber jar) that includes all
>> dependencies in your application jar.
>>
>> I would avoid to put things in the Flink lib directory as it can make
>> maintenance difficult. Eg deployment is challenging, upgrade of flink,
>> providing it on new nodes etc.
>>
>>
>> Am 30.10.2019 um 04:46 schrieb Alex Wang :
>>
>> 
>> Hello everyone, I am a newbie.
>> I am learning the flink-sql-submit project. From @Jark Wu  :
>> https://github.com/wuchong/flink-sql-submit
>>
>> My local environment is:
>> 1. flink1.9.0 standalone
>> 2. kafka_2.11-2.2.0 single
>>
>> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
>> Reference:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>
>> Then I run flink-sql-submit , sh run.sh q1
>> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
>>
>> My question is:
>> I configured mysql-connector-java in the pom.xml file, mvn build jar
>> include com.mysql.jdbc.Driver.
>> Why is this error still reported? I put the jar package in
>> $FLINK_HOME/lib and the problem can be solved.
>> Do you need to put these jars in $FLINK_HOME/lib when the project relies
>> on too many jar packages?
>> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I
>> solve this problem?
>>
>> Can @Jark Wu  give me some advice? Or can someone give me some advice?
>> Thank you.
>>
>> 1. pom.xml
>>
>> 
>>> mysql
>>> mysql-connector-java
>>> 5.1.38
>>> 
>>
>> 2. mvn clean; mvn package
>>
>> $ ll -rth target
>>>
>>>  [±master ●]
>>> total 32312
>>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
>>> flink-sql-submit-1.0-SNAPSHOT.jar
>>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>>>
>>
>> 3. flink-sql-submit.jar include java.sql.Driver
>>
>> " zip.vim version v28
>>> " Browsing zipfile
>>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>>> " Select a file with cursor and press ENTER
>>>
>>> META-INF/MANIFEST.MF
>>> META-INF/
>>> q1.sql
>>> user_behavior.log
>>> com/
>>> com/github/
>>> com/github/wuchong/
>>> com/github/wuchong/sqlsubmit/
>>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>>> com/github/wuchong/sqlsubmit/cli/
>>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>>> META-INF/maven/
>>> META-INF/maven/com.github.wuchong/
>>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>>> META-INF/services/
>>> META-INF/services/java.sql.Driver
>>> com/mysql/
>>> com/mysql/fabric/
>>> com/mysql/fabric/FabricCommunicationException.class
>>> com/mysql/fabric/FabricConnection.class
>>> com/mysql/fabric/FabricStateResponse.class
>>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>>> com/mysql/fabric/HashShardMapping.class
>>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>>> com/mysql/fabric/RangeShardMapping.class
>>> com/mysql/fabric/Response.class
>>> com/mysql/fabric/Server.class
>>> com/mysql/fabric/ServerGroup.class
>>> com/mysql/fabric/ServerMode.class
>>> com/mysql/fabric/ServerRole.class
>>> etc ...
>>>
>>
>>
>>
>> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
>> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
>> Eerror:
>> 2019-10-30 10:27:35
>> java.lang.IllegalArgumentException: JDBC driver class not found.
>> At
>> 

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Alex Wang
Hello Yang, Frank
Thank you for your reply.

Frank I have created a fat jar called flink-sql-submit.jar , the file size
is 8.2M .

You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as it can make
> maintenance difficult. Eg deployment is challenging, upgrade of flink,
> providing it on new nodes etc.
>

I am in favor of your point of view, I want to maintain the project's
dependencies, rather than put it in the Flink lib directory.
I will continue to solve this problem. I will give you feedback if I have
an answer.

Thanks again, Yang and Frank


vino yang  于2019年10月30日周三 下午4:05写道:

> Hi Franke,
>
> From the information provided by Alex:
>
> >> mvn build jar include com.mysql.jdbc.Driver.
>
> it seems he has packaged a fat jar?
>
> Best,
> Vino
>
> Jörn Franke  于2019年10月30日周三 下午2:47写道:
>
>>
>>
>> You can create a fat jar (also called Uber jar) that includes all
>> dependencies in your application jar.
>>
>> I would avoid to put things in the Flink lib directory as it can make
>> maintenance difficult. Eg deployment is challenging, upgrade of flink,
>> providing it on new nodes etc.
>>
>>
>> Am 30.10.2019 um 04:46 schrieb Alex Wang :
>>
>> 
>> Hello everyone, I am a newbie.
>> I am learning the flink-sql-submit project. From @Jark Wu  :
>> https://github.com/wuchong/flink-sql-submit
>>
>> My local environment is:
>> 1. flink1.9.0 standalone
>> 2. kafka_2.11-2.2.0 single
>>
>> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
>> Reference:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>
>> Then I run flink-sql-submit , sh run.sh q1
>> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
>>
>> My question is:
>> I configured mysql-connector-java in the pom.xml file, mvn build jar
>> include com.mysql.jdbc.Driver.
>> Why is this error still reported? I put the jar package in
>> $FLINK_HOME/lib and the problem can be solved.
>> Do you need to put these jars in $FLINK_HOME/lib when the project relies
>> on too many jar packages?
>> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I
>> solve this problem?
>>
>> Can @Jark Wu  give me some advice? Or can someone give me some advice?
>> Thank you.
>>
>> 1. pom.xml
>>
>> 
>>> mysql
>>> mysql-connector-java
>>> 5.1.38
>>> 
>>
>> 2. mvn clean; mvn package
>>
>> $ ll -rth target
>>>
>>>  [±master ●]
>>> total 32312
>>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
>>> flink-sql-submit-1.0-SNAPSHOT.jar
>>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>>>
>>
>> 3. flink-sql-submit.jar include java.sql.Driver
>>
>> " zip.vim version v28
>>> " Browsing zipfile
>>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>>> " Select a file with cursor and press ENTER
>>>
>>> META-INF/MANIFEST.MF
>>> META-INF/
>>> q1.sql
>>> user_behavior.log
>>> com/
>>> com/github/
>>> com/github/wuchong/
>>> com/github/wuchong/sqlsubmit/
>>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>>> com/github/wuchong/sqlsubmit/cli/
>>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>>> META-INF/maven/
>>> META-INF/maven/com.github.wuchong/
>>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>>> META-INF/services/
>>> META-INF/services/java.sql.Driver
>>> com/mysql/
>>> com/mysql/fabric/
>>> com/mysql/fabric/FabricCommunicationException.class
>>> com/mysql/fabric/FabricConnection.class
>>> com/mysql/fabric/FabricStateResponse.class
>>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>>> com/mysql/fabric/HashShardMapping.class
>>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>>> com/mysql/fabric/RangeShardMapping.class
>>> com/mysql/fabric/Response.class
>>> com/mysql/fabric/Server.class
>>> com/mysql/fabric/ServerGroup.class
>>> com/mysql/fabric/ServerMode.class
>>> com/mysql/fabric/ServerRole.class
>>> etc ...
>>>
>>
>>
>>
>> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
>> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
>> Eerror:
>> 2019-10-30 10:27:35
>> java.lang.IllegalArgumentException: JDBC driver class not found.
>> At
>> 

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread SHI Xiaogang
Hi

Thanks for bringing this.

The design looks very nice to me in that
1. In the new per-job mode, we don't need to compile user programs in the
client and can directly run user programs with user jars. That way, it's
easier for resource isolation in multi-tenant platforms and is much safer.
2. The execution of user programs can be unified in session and per-job
modes. In session mode, user jobs are submitted via a remote ClusterClient
while in per-job mode user jobs are submitted via a local ClusterClient.

Regards,
Xiaogang

tison  于2019年10月30日周三 下午3:30写道:

> (CC user list because I think users may have ideas on how per-job mode
> should look like)
>
> Hi all,
>
> In the discussion about Flink on k8s[1] we encounter a problem that
> opinions
> diverge in how so-called per-job mode works. This thread is aimed at
> stating
> a dedicated discussion about per-job semantic and how to implement it.
>
> **The AS IS per-job mode**
>
> * in standalone deployment, we bundle user jar with Flink jar, retrieve
> JobGraph which is the very first JobGraph from user program in classpath,
> and then start a Dispatcher with this JobGraph preconfigured, which
> launches it as "recovered" job.
>
> * in YARN deployment, we accept submission via CliFrontend, extract
> JobGraph
> which is the very first JobGraph from user program submitted, serialize
> the JobGraph and upload it to YARN as resource, and then when AM starts,
> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
> preconfigured, follows are the same.
>
> Specifically, in order to support multiple parts job, if YARN deployment
> configured as "attached", it starts a SessionCluster, proceeds the progress
> and shutdown the cluster on job finished.
>
> **Motivation**
>
> The implementation mentioned above, however, suffers from problems. The
> major
> two of them are 1. only respect the very first JobGraph from user program
> 2.
> compile job in client side
>
> 1. Only respect the very first JobGraph from user program
>
> There is already issue about this topic[2]. As we extract JobGraph from
> user
> program by hijacking Environment#execute we actually abort any execution
> after the first call to #execute. Besides it surprises users many times
> that
> any logic they write in the program is possibly never executed, here the
> problem is that the semantic of "job" from Flink perspective. I'd like to
> say
> in current implementation "per-job" is actually "per-job-graph". However,
> in practices since we support jar submission it is "per-program" semantic
> wanted.
>
> 2. Compile job in client side
>
> Well, standalone deployment is not in the case. But in YARN deployment, we
> compile job and get JobGraph in client side, and then upload it to YARN.
> This approach, however, somehow breaks isolation. We have observed that
> user
> program contains exception handling logic which call System.exit in main
> method, which causes a compilation of the job exit the whole client at
> once.
> It is a critical problem if we manage multiple Flink job in a unique
> platform.
> In this case, it shut down the whole service.
>
> Besides there are many times I was asked why per-job mode doesn't run
> "just like" session mode but with a dedicated cluster. It might imply that
> current implementation mismatches users' demand.
>
> **Proposal**
>
> In order to provide a "per-program" semantic mode which acts "just like"
> session
> mode but with a dedicated cluster, I propose a workflow as below. It acts
> like
> starting a drive on cluster but is not a general driver solution as
> proposed
> here[3], the main purpose of the workflow below is for providing a
> "per-program"
> semantic mode.
>
> *From CliFrontend*
>
> 1. CliFrontend receives submission, gathers all configuration and starts a
> corresponding ClusterDescriptor.
>
> 2. ClusterDescriptor deploys a cluster with main class
> ProgramClusterEntrypoint
> while shipping resources including user program.
>
> 3. ProgramClusterEntrypoint#main contains logic starting components
> including
> Standalone Dispatcher, configuring user program to start a
> RpcClusterClient,
> and then invoking main method of user program.
>
> 4. RpcClusterClient acts like MiniClusterClient which is able to submit the
> JobGraph after leader elected so that we don't fallback to round-robin or
> fail submission due to no leader.
>
> 5. Whether or not deliver job result depends on user program logic, since
> we
> can already get a JobClient from execute. ProgramClusterEntrypoint exits on
> user program exits and all jobs submitted globally terminate.
>
> This way fits in the direction of FLIP-73 because strategy starting a
> RpcClusterClient can be regarded as a special Executor. After
> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
> configuration to
> user program so that when Executor generated, it knows to use a
> RpcClusterClient
> for submission and the address of Dispatcher.
>
> **Compatibility**
>
> 

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread vino yang
Hi Franke,

>From the information provided by Alex:

>> mvn build jar include com.mysql.jdbc.Driver.

it seems he has packaged a fat jar?

Best,
Vino

Jörn Franke  于2019年10月30日周三 下午2:47写道:

>
>
> You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as it can make
> maintenance difficult. Eg deployment is challenging, upgrade of flink,
> providing it on new nodes etc.
>
>
> Am 30.10.2019 um 04:46 schrieb Alex Wang :
>
> 
> Hello everyone, I am a newbie.
> I am learning the flink-sql-submit project. From @Jark Wu  :
> https://github.com/wuchong/flink-sql-submit
>
> My local environment is:
> 1. flink1.9.0 standalone
> 2. kafka_2.11-2.2.0 single
>
> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
> Reference:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>
> Then I run flink-sql-submit , sh run.sh q1
> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
>
> My question is:
> I configured mysql-connector-java in the pom.xml file, mvn build jar
> include com.mysql.jdbc.Driver.
> Why is this error still reported? I put the jar package in $FLINK_HOME/lib
> and the problem can be solved.
> Do you need to put these jars in $FLINK_HOME/lib when the project relies
> on too many jar packages?
> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I
> solve this problem?
>
> Can @Jark Wu  give me some advice? Or can someone give me some advice?
> Thank you.
>
> 1. pom.xml
>
> 
>> mysql
>> mysql-connector-java
>> 5.1.38
>> 
>
> 2. mvn clean; mvn package
>
> $ ll -rth target
>>
>>  [±master ●]
>> total 32312
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
>> flink-sql-submit-1.0-SNAPSHOT.jar
>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>>
>
> 3. flink-sql-submit.jar include java.sql.Driver
>
> " zip.vim version v28
>> " Browsing zipfile
>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>> " Select a file with cursor and press ENTER
>>
>> META-INF/MANIFEST.MF
>> META-INF/
>> q1.sql
>> user_behavior.log
>> com/
>> com/github/
>> com/github/wuchong/
>> com/github/wuchong/sqlsubmit/
>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>> com/github/wuchong/sqlsubmit/cli/
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>> META-INF/maven/
>> META-INF/maven/com.github.wuchong/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>> META-INF/services/
>> META-INF/services/java.sql.Driver
>> com/mysql/
>> com/mysql/fabric/
>> com/mysql/fabric/FabricCommunicationException.class
>> com/mysql/fabric/FabricConnection.class
>> com/mysql/fabric/FabricStateResponse.class
>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>> com/mysql/fabric/HashShardMapping.class
>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>> com/mysql/fabric/RangeShardMapping.class
>> com/mysql/fabric/Response.class
>> com/mysql/fabric/Server.class
>> com/mysql/fabric/ServerGroup.class
>> com/mysql/fabric/ServerMode.class
>> com/mysql/fabric/ServerRole.class
>> etc ...
>>
>
>
>
> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
> Eerror:
> 2019-10-30 10:27:35
> java.lang.IllegalArgumentException: JDBC driver class not found.
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
> At
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> At
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> At
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> At java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread vino yang
Hi Franke,

>From the information provided by Alex:

>> mvn build jar include com.mysql.jdbc.Driver.

it seems he has packaged a fat jar?

Best,
Vino

Jörn Franke  于2019年10月30日周三 下午2:47写道:

>
>
> You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as it can make
> maintenance difficult. Eg deployment is challenging, upgrade of flink,
> providing it on new nodes etc.
>
>
> Am 30.10.2019 um 04:46 schrieb Alex Wang :
>
> 
> Hello everyone, I am a newbie.
> I am learning the flink-sql-submit project. From @Jark Wu  :
> https://github.com/wuchong/flink-sql-submit
>
> My local environment is:
> 1. flink1.9.0 standalone
> 2. kafka_2.11-2.2.0 single
>
> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
> Reference:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>
> Then I run flink-sql-submit , sh run.sh q1
> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
>
> My question is:
> I configured mysql-connector-java in the pom.xml file, mvn build jar
> include com.mysql.jdbc.Driver.
> Why is this error still reported? I put the jar package in $FLINK_HOME/lib
> and the problem can be solved.
> Do you need to put these jars in $FLINK_HOME/lib when the project relies
> on too many jar packages?
> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I
> solve this problem?
>
> Can @Jark Wu  give me some advice? Or can someone give me some advice?
> Thank you.
>
> 1. pom.xml
>
> 
>> mysql
>> mysql-connector-java
>> 5.1.38
>> 
>
> 2. mvn clean; mvn package
>
> $ ll -rth target
>>
>>  [±master ●]
>> total 32312
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
>> flink-sql-submit-1.0-SNAPSHOT.jar
>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>>
>
> 3. flink-sql-submit.jar include java.sql.Driver
>
> " zip.vim version v28
>> " Browsing zipfile
>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>> " Select a file with cursor and press ENTER
>>
>> META-INF/MANIFEST.MF
>> META-INF/
>> q1.sql
>> user_behavior.log
>> com/
>> com/github/
>> com/github/wuchong/
>> com/github/wuchong/sqlsubmit/
>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>> com/github/wuchong/sqlsubmit/cli/
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>> META-INF/maven/
>> META-INF/maven/com.github.wuchong/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>> META-INF/services/
>> META-INF/services/java.sql.Driver
>> com/mysql/
>> com/mysql/fabric/
>> com/mysql/fabric/FabricCommunicationException.class
>> com/mysql/fabric/FabricConnection.class
>> com/mysql/fabric/FabricStateResponse.class
>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>> com/mysql/fabric/HashShardMapping.class
>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>> com/mysql/fabric/RangeShardMapping.class
>> com/mysql/fabric/Response.class
>> com/mysql/fabric/Server.class
>> com/mysql/fabric/ServerGroup.class
>> com/mysql/fabric/ServerMode.class
>> com/mysql/fabric/ServerRole.class
>> etc ...
>>
>
>
>
> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
> Eerror:
> 2019-10-30 10:27:35
> java.lang.IllegalArgumentException: JDBC driver class not found.
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
> At
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> At
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> At
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> At java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 

[DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread tison
(CC user list because I think users may have ideas on how per-job mode
should look like)

Hi all,

In the discussion about Flink on k8s[1] we encounter a problem that opinions
diverge in how so-called per-job mode works. This thread is aimed at stating
a dedicated discussion about per-job semantic and how to implement it.

**The AS IS per-job mode**

* in standalone deployment, we bundle user jar with Flink jar, retrieve
JobGraph which is the very first JobGraph from user program in classpath,
and then start a Dispatcher with this JobGraph preconfigured, which
launches it as "recovered" job.

* in YARN deployment, we accept submission via CliFrontend, extract JobGraph
which is the very first JobGraph from user program submitted, serialize
the JobGraph and upload it to YARN as resource, and then when AM starts,
retrieve the JobGraph as resource and start Dispatcher with this JobGraph
preconfigured, follows are the same.

Specifically, in order to support multiple parts job, if YARN deployment
configured as "attached", it starts a SessionCluster, proceeds the progress
and shutdown the cluster on job finished.

**Motivation**

The implementation mentioned above, however, suffers from problems. The
major
two of them are 1. only respect the very first JobGraph from user program 2.
compile job in client side

1. Only respect the very first JobGraph from user program

There is already issue about this topic[2]. As we extract JobGraph from user
program by hijacking Environment#execute we actually abort any execution
after the first call to #execute. Besides it surprises users many times that
any logic they write in the program is possibly never executed, here the
problem is that the semantic of "job" from Flink perspective. I'd like to
say
in current implementation "per-job" is actually "per-job-graph". However,
in practices since we support jar submission it is "per-program" semantic
wanted.

2. Compile job in client side

Well, standalone deployment is not in the case. But in YARN deployment, we
compile job and get JobGraph in client side, and then upload it to YARN.
This approach, however, somehow breaks isolation. We have observed that user
program contains exception handling logic which call System.exit in main
method, which causes a compilation of the job exit the whole client at once.
It is a critical problem if we manage multiple Flink job in a unique
platform.
In this case, it shut down the whole service.

Besides there are many times I was asked why per-job mode doesn't run
"just like" session mode but with a dedicated cluster. It might imply that
current implementation mismatches users' demand.

**Proposal**

In order to provide a "per-program" semantic mode which acts "just like"
session
mode but with a dedicated cluster, I propose a workflow as below. It acts
like
starting a drive on cluster but is not a general driver solution as proposed
here[3], the main purpose of the workflow below is for providing a
"per-program"
semantic mode.

*From CliFrontend*

1. CliFrontend receives submission, gathers all configuration and starts a
corresponding ClusterDescriptor.

2. ClusterDescriptor deploys a cluster with main class
ProgramClusterEntrypoint
while shipping resources including user program.

3. ProgramClusterEntrypoint#main contains logic starting components
including
Standalone Dispatcher, configuring user program to start a RpcClusterClient,
and then invoking main method of user program.

4. RpcClusterClient acts like MiniClusterClient which is able to submit the
JobGraph after leader elected so that we don't fallback to round-robin or
fail submission due to no leader.

5. Whether or not deliver job result depends on user program logic, since we
can already get a JobClient from execute. ProgramClusterEntrypoint exits on
user program exits and all jobs submitted globally terminate.

This way fits in the direction of FLIP-73 because strategy starting a
RpcClusterClient can be regarded as a special Executor. After
ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
configuration to
user program so that when Executor generated, it knows to use a
RpcClusterClient
for submission and the address of Dispatcher.

**Compatibility**

In my opinion this mode can be totally an add-on to current codebase. We
actually don't replace current per-job mode with so-called "per-program"
mode.
It happens that current per-job mode would be useless if we have such
"per-program" mode so that we possibly deprecate it for preferring the
other.

I'm glad to discuss more into details if you're interested in, but let's say
we'd better first reach a consensus on the overall design :-)

Looking forward to your reply!

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-9953
[2] https://issues.apache.org/jira/browse/FLINK-10879
[3]
https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#


Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Jörn Franke


You can create a fat jar (also called Uber jar) that includes all dependencies 
in your application jar.

I would avoid to put things in the Flink lib directory as it can make 
maintenance difficult. Eg deployment is challenging, upgrade of flink, 
providing it on new nodes etc.


> Am 30.10.2019 um 04:46 schrieb Alex Wang :
> 
> 
> Hello everyone, I am a newbie.
> I am learning the flink-sql-submit project. From @Jark Wu  
> :https://github.com/wuchong/flink-sql-submit
> 
> My local environment is:
> 1. flink1.9.0 standalone
> 2. kafka_2.11-2.2.0 single
> 
> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
> Reference: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
> 
> Then I run flink-sql-submit , sh run.sh q1
> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
> 
> My question is: 
> I configured mysql-connector-java in the pom.xml file, mvn build jar include 
> com.mysql.jdbc.Driver. 
> Why is this error still reported? I put the jar package in $FLINK_HOME/lib 
> and the problem can be solved.
> Do you need to put these jars in $FLINK_HOME/lib when the project relies on 
> too many jar packages?
> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I solve 
> this problem?
> 
> Can @Jark Wu  give me some advice? Or can someone give me some advice? Thank 
> you.
> 
> 1. pom.xml
> 
>> 
>> mysql
>> mysql-connector-java
>> 5.1.38
>> 
> 2. mvn clean; mvn package
> 
>> $ ll -rth target 
>>[±master 
>> ●]
>> total 32312
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32 
>> flink-sql-submit-1.0-SNAPSHOT.jar
>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
> 
> 3. flink-sql-submit.jar include java.sql.Driver  
> 
>> " zip.vim version v28
>> " Browsing zipfile 
>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>> " Select a file with cursor and press ENTER
>> 
>> META-INF/MANIFEST.MF
>> META-INF/
>> q1.sql
>> user_behavior.log
>> com/
>> com/github/
>> com/github/wuchong/
>> com/github/wuchong/sqlsubmit/
>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>> com/github/wuchong/sqlsubmit/cli/
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>> META-INF/maven/
>> META-INF/maven/com.github.wuchong/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>> META-INF/services/
>> META-INF/services/java.sql.Driver
>> com/mysql/
>> com/mysql/fabric/
>> com/mysql/fabric/FabricCommunicationException.class
>> com/mysql/fabric/FabricConnection.class
>> com/mysql/fabric/FabricStateResponse.class
>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>> com/mysql/fabric/HashShardMapping.class
>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>> com/mysql/fabric/RangeShardMapping.class
>> com/mysql/fabric/Response.class
>> com/mysql/fabric/Server.class
>> com/mysql/fabric/ServerGroup.class
>> com/mysql/fabric/ServerMode.class
>> com/mysql/fabric/ServerRole.class
>> etc ...
> 
>  
> 
> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w 
> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql  
> Eerror:
> 2019-10-30 10:27:35
> java.lang.IllegalArgumentException: JDBC driver class not found.
> At 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
> At 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
> At 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> At 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> At 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> At 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> At 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> At java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
> At 

Re: flink1.9.1 on yarn sql 部署问题

2019-10-30 Thread Dian Fu
问题1: 只需要在提交作业的机器上有flink即可
问题2: 两种方式都可以
问题3: 是的。lib目录下的jar和用户的jar会一起提交到YARN

> 在 2019年10月30日,上午10:30,hb <343122...@163.com> 写道:
> 
> hello:
> 
> 
> 环境:  flink1.9.1, on yarn hadoop2.6
> flink只安装在了一台提交的机器上,
> 
> 
> lib目录下有文件:
> flink-dist_2.11-1.9.1.jar
> flink-json-1.9.0-sql-jar.jar
> flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
> flink-sql-connector-kafka_2.11-1.9.0.jar
> flink-table_2.11-1.9.1.jar
> flink-table-blink_2.11-1.9.1.jar
> log4j-1.2.17.jar
> slf4j-log4j12-1.7.15.jar
> 
> 
> //
> flink-shaded-hadoop-2-uber-2.6.5-7.0.jar  ,
> flink-sql-connector-kafka_2.11-1.9.0.jar
> flink-json-1.9.0-sql-jar.jar
> 这3个包是安装后,拷贝进去的
> 
> 
> 问题1:on yarn模式,我是否需要在每台机器上都安装flink软件目录,还是只需要在提交机器上有flink软件目录就行了?
> 
> 
> 问题2: 我需要用到blink-planner 连接外部kafka(1.1版本,json格式) 来生成sql表,是否 
> 需要在lib目录下添加 
> flink-sql-connector-kafka_2.11-1.9.0.jar
> flink-json-1.9.0-sql-jar.jar
> 还是 在pom文件中 指定依赖,打成fat包
>
>org.apache.flink
>flink-connector-kafka_2.11
>${flink.version}  
>
> 
>org.apache.flink
>flink-json
>${flink.version}
> 
> 问题3: flink run  on yarn , 会额外附加lib目录下的jar包到用户jar下,再提交到yarn上运行么
> 
> 
> 
> 
> 



Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Jörn Franke


You can create a fat jar (also called Uber jar) that includes all dependencies 
in your application jar.

I would avoid to put things in the Flink lib directory as it can make 
maintenance difficult. Eg deployment is challenging, upgrade of flink, 
providing it on new nodes etc.


> Am 30.10.2019 um 04:46 schrieb Alex Wang :
> 
> 
> Hello everyone, I am a newbie.
> I am learning the flink-sql-submit project. From @Jark Wu  
> :https://github.com/wuchong/flink-sql-submit
> 
> My local environment is:
> 1. flink1.9.0 standalone
> 2. kafka_2.11-2.2.0 single
> 
> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
> Reference: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
> 
> Then I run flink-sql-submit , sh run.sh q1
> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
> 
> My question is: 
> I configured mysql-connector-java in the pom.xml file, mvn build jar include 
> com.mysql.jdbc.Driver. 
> Why is this error still reported? I put the jar package in $FLINK_HOME/lib 
> and the problem can be solved.
> Do you need to put these jars in $FLINK_HOME/lib when the project relies on 
> too many jar packages?
> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I solve 
> this problem?
> 
> Can @Jark Wu  give me some advice? Or can someone give me some advice? Thank 
> you.
> 
> 1. pom.xml
> 
>> 
>> mysql
>> mysql-connector-java
>> 5.1.38
>> 
> 2. mvn clean; mvn package
> 
>> $ ll -rth target 
>>[±master 
>> ●]
>> total 32312
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32 
>> flink-sql-submit-1.0-SNAPSHOT.jar
>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
> 
> 3. flink-sql-submit.jar include java.sql.Driver  
> 
>> " zip.vim version v28
>> " Browsing zipfile 
>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>> " Select a file with cursor and press ENTER
>> 
>> META-INF/MANIFEST.MF
>> META-INF/
>> q1.sql
>> user_behavior.log
>> com/
>> com/github/
>> com/github/wuchong/
>> com/github/wuchong/sqlsubmit/
>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>> com/github/wuchong/sqlsubmit/cli/
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>> META-INF/maven/
>> META-INF/maven/com.github.wuchong/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>> META-INF/services/
>> META-INF/services/java.sql.Driver
>> com/mysql/
>> com/mysql/fabric/
>> com/mysql/fabric/FabricCommunicationException.class
>> com/mysql/fabric/FabricConnection.class
>> com/mysql/fabric/FabricStateResponse.class
>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>> com/mysql/fabric/HashShardMapping.class
>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>> com/mysql/fabric/RangeShardMapping.class
>> com/mysql/fabric/Response.class
>> com/mysql/fabric/Server.class
>> com/mysql/fabric/ServerGroup.class
>> com/mysql/fabric/ServerMode.class
>> com/mysql/fabric/ServerRole.class
>> etc ...
> 
>  
> 
> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w 
> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql  
> Eerror:
> 2019-10-30 10:27:35
> java.lang.IllegalArgumentException: JDBC driver class not found.
> At 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
> At 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
> At 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> At 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> At 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> At 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> At 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> At java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
> At