Re: Flink job repeated restart failure

2021-03-25 Thread vinaya
Hi Arvid,

Thank you for the suggestion.

Indeed, the specified setting was commented out in the Flink configuration
(flink-conf.yaml).

  # io.tmp.dirs: /tmp

Is there a fallback (e.g. /tmp) if io.tmp.dirs and
System.getProperty("java.io.tmpdir") are both not set?

Will configure this setting to a valid value as suggested.

Kind regards,
Vinaya



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink on Native K8S模式下如何配置StandBy做高可用?

2021-03-25 Thread Yang Wang
目前已经有了一个ticket来跟进了,https://issues.apache.org/jira/browse/FLINK-17707

应该在1.13里面可以支持

Best,
Yang

casel.chen  于2021年3月26日周五 上午8:23写道:

> Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!


退订

2021-03-25 Thread aegean0...@163.com


退订

| |
aegean0933
邮箱:aegean0...@163.com
|

Re: Native kubernetes execution and History server

2021-03-25 Thread Yang Wang
Thanks Guowei for the comments and Lukáš Drbal for sharing the feedback.

I think it is not only for Kubernetes application mode, but also Yarn
application and standalone application,
the job id will be set to ZERO if not configured explicitly in HA mode.

For standalone application, we could use "--job-id" to specify the user
defined job id.

However, for Yarn and Kubernetes applications, we do not have a public
config options for this.
Concerning we might support multiple jobs in a single Flink application
when HA enabled in the future,
introducing such a public config option may be inopportune.


Best,
Yang

Lukáš Drbal  于2021年3月25日周四 下午7:09写道:

> Hello Guowei,
>
> I just checked it and it works!
>
> Thanks a lot!
>
> Here is workaround which use UUID as jobId:
> -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")
>
>
> L.
>
> On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma  wrote:
>
>> Hi,
>> Thanks for providing the logs. From the logs this is a known bug.[1]
>> Maybe you could use `$internal.pipeline.job-id` to set your own
>> job-id.(Thanks to Wang Yang)
>> But keep in mind this is only for internal use and may be changed in
>> some release. So you should keep an eye on [1] for the correct solution.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19358
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal 
>> wrote:
>>
>>> Hello,
>>>
>>> sure. Here is log from first run which succeed -
>>> https://pastebin.com/tV75ZS5S
>>> and here is from second run (it's same for all next) -
>>> https://pastebin.com/pwTFyGvE
>>>
>>> My Docker file is pretty simple, just take wordcount + S3
>>>
>>> FROM flink:1.12.2
>>>
>>> RUN mkdir -p $FLINK_HOME/usrlib
>>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>>>  $FLINK_HOME/usrlib/wordcount.jar
>>>
>>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
>>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>>>
>>> Thanks!
>>>
>>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>>>
 Hi,
 After some discussion with Wang Yang offline, it seems that there might
 be a jobmanager failover. So would you like to share full jobmanager log?
 Best,
 Guowei


 On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
 wrote:

> Hi,
>
> I would like to use native kubernetes execution [1] for one batch job
> and let scheduling on kubernetes. Flink version: 1.12.2.
>
> Kubernetes job:
> apiVersion: batch/v1beta1
> kind: CronJob
> metadata:
>   name: scheduled-job
> spec:
>   schedule: "*/1 * * * *"
>   jobTemplate:
> spec:
>   template:
> metadata:
>   labels:
> app: super-flink-batch-job
> spec:
>   containers:
>   - name: runner
> image: localhost:5000/batch-flink-app-v3:latest
> imagePullPolicy: Always
> command:
>   - /bin/sh
>   - -c
>   - /opt/flink/bin/flink run-application --target
> kubernetes-application -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=batch-job-cluster
> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
> -Ds3.secret-key=SECRETKEY
> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=s3://flink/flink-ha
> local:///opt/flink/usrlib/job.jar
>   restartPolicy: OnFailure
>
>
> This works well for me but I would like to write the result to the
> archive path and show it in the History server (running as separate
> deployment in k8)
>
> Anytime it creates JobId= which
> obviously leads to
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_282]
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> 

Re: [EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-25 Thread Shuiqiang Chen
Hi Kevin, Xinbin,


Hi Shuiqiang,
>
> Thanks for the quick response on creating the ticket for Kinesis
> Connector. Do you mind giving me the chance to try to implement the
> connector over the weekend?
>
> I am interested in contributing to Flink, and I think this can be a good
> starting point to me
>
> Best
> Bin
>

Sorry for the confusion. Xinbin had personal letter that could not be
displayed in this email thread that he is interested in solving this issue
https://issues.apache.org/jira/browse/FLINK-21966.  So both of you can
participate in this contribution as you like.

Best,
Shuiqiang

Bohinski, Kevin  于2021年3月25日周四 下午12:00写道:

> Hi Shuiqiang,
>
>
>
> Thanks for letting me know. Feel free to send any beginner level
> contributions for this effort my way  .
>
>
>
> Best,
>
> kevin
>
>
>
> *From: *Shuiqiang Chen 
> *Date: *Wednesday, March 24, 2021 at 10:31 PM
> *To: *"Bohinski, Kevin" 
> *Cc: *user 
> *Subject: *[EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi Kevin,
>
>
>
> Kinesis connector is not supported yet in Python DataStream API. We will
> add it in the future.
>
>
>
> Best,
>
> Shuiqiang
>
>
>
> Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:
>
> Is there a kinesis example?
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:40 PM
> *To: *"Bohinski, Kevin" 
> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Nevermind, found this for anyone else looking:
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
> 
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:38 PM
> *To: *user 
> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi,
>
>
>
> Is there an example kafka/kinesis source or sink for the PyFlink
> DataStream API?
>
>
>
> Best,
>
> kevin
>
>


Re: flink sql count distonct 优化

2021-03-25 Thread guomuhua
Jark wrote
> 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> agg支持这个参数了。可以期待下。
> 
> Best,
> Jark
> 
> On Wed, 24 Mar 2021 at 19:29, Robin Zhang 

> vincent2015qdlg@

> 
> wrote:
> 
>> Hi,guomuhua
>>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>>
>> Best,
>> Robin
>>
>>
>> guomuhua wrote
>> > 在SQL中,如果开启了 local-global 参数:set
>> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> > 或者开启了Partial-Final 参数:set
>> table.optimizer.distinct-agg.split.enabled=true;
>> >  set
>> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> > 还需要对应的将SQL改写为两段式吗?
>> > 例如:
>> > 原SQL:
>> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >
>> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> > SELECT day, SUM(cnt) total
>> > FROM (
>> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> > GROUP BY day
>> >
>> > 还是flink会帮我自动改写SQL,我不用关心?
>> >
>> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> > 
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>>
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>

感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-25 Thread Jark Wu
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。

Best,
Jark

On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
wrote:

> Hi,guomuhua
>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>
> Best,
> Robin
>
>
> guomuhua wrote
> > 在SQL中,如果开启了 local-global 参数:set
> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> > 或者开启了Partial-Final 参数:set
> table.optimizer.distinct-agg.split.enabled=true;
> >  set
> > table.optimizer.distinct-agg.split.bucket-num=1024;
> > 还需要对应的将SQL改写为两段式吗?
> > 例如:
> > 原SQL:
> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >
> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> > SELECT day, SUM(cnt) total
> > FROM (
> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> > FROM T GROUP BY day, MOD(buy_id, 1024))
> > GROUP BY day
> >
> > 还是flink会帮我自动改写SQL,我不用关心?
> >
> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> > 
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


【数据加工流程求助】

2021-03-25 Thread guoyb
如果我的需求需要写入明细数据,还要根据明细数据做聚合,然后再把汇总数据入库。
这种数据加工的流程应该怎么设计比较好!

Re: The Role of TimerService in ProcessFunction

2021-03-25 Thread Chirag Dewan
Thanks for the clarification Dawid. Resolves my confusion.
Sent from Yahoo Mail on Android 
 
  On Fri, 19 Mar 2021 at 2:41 pm, Dawid Wysakowicz 
wrote:
Hi Chirag,
 
I agree it might be a little bit confusing.
 
Let me try to explain the reasoning. To do that I'll first try to rephrase the 
reasoning from FLINK-8560 for introducing the KeyedProcessFunction. It was 
introduced so that users have a typed access to the current key via Context and 
OnTimerContext. This is actually the only difference between the two function.
 
Somewhat as a consequence of the above, the KeyedProcessFunction can be used 
solely on a keyed stream, however ProcessFunction can be used in both. That was 
actually the only way to use a ProcessFunction on a keyed stream prior to 
introducing the KeyedProcessFunction. If you don't need access to the current 
key you should be fine with using the ProcessFunction on a keyed stream and 
there you can use the TimerService. It is advised to use a KeyedProcessFunction 
on a keyed stream, however for backwards compatibility the old behaviour has 
been kept.
 
 
Hope that it clarifies the things a bit.
 
Best,
 
Dawid
 
 On 17/03/2021 07:47, Chirag Dewan wrote:
  
 
 Hi, 
  Currently, both ProcessFunction and KeyedProcessFunction (and their CoProcess 
counterparts) expose the Context and TimerService in the processElement() 
method. However, if we use the TimerService in non keyed context, it gives a 
runtime error.  
  I am a bit confused about these APIs. Is there any specific reason for 
exposing TimerService in non-keyed context especially if it cant be used 
without keyed stream? 
  Any leads are much appreciated. 
  Thanks, Chirag


????hadoop#configuration

2021-03-25 Thread ????
hi all
onyarn31??flink-confhadoop yarn
https://issues.apache.org/jira/browse/FLINK-21981


2??hadoop#configuration??yarnyarn??configuration??yarn??configurationconfiguration??
https://issues.apache.org/jira/browse/FLINK-21982


3??flink??-ythdfs-site/core-site??flink??hadoop#configuration??hdfs/core-siteconf??configuration??hdfs/core-stie??classloader.getresource2??


reviewmerge FLINK-21640


thanks all

退订

2021-03-25 Thread 袁刚
退订

Flink on Native K8S模式下如何配置StandBy做高可用?

2021-03-25 Thread casel.chen
Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!

退订

2021-03-25 Thread 天琦
退订



发自我的iPhone

Re: Time Temporal Join

2021-03-25 Thread Satyam Shekhar
Hi Timo,

Apologies for the late response. I somehow seem to have missed your reply.

I do want the join to be "time-based" since I need to perform a tumble
grouping operation on top of the join.

I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS,
that didn't help either.

Note that we have a custom connector to an internal storage engine. The
connector implements ScanTableSource interface with
SupportsWatermarkPushDown ability. Would the watermark strategy in the
table schema matter in that case? I changed the query to the following to
simplify further -

select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF
F.R ON F.C1 = D.C1

I still do not see any output from the pipeline. The overall logs I see
from the connecter is the following -

Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0  -->
ctx.collectWithTimestamp(row_,
rowtime);
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0
Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000
Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0
Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000
Emit D.F wm=4000  --->  ctx.emitWatermark(new Watermark(wm));
Emit D.D wm=0

Now, if I change the rowtime of table D to 1s instead of 0, I get one row
as output.

Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000
Emit D.F wm=1000
Emit D.D wm=1000

reply: (1, "1", 1000, 1, "1", 1000)

The next row streamed from F which should join with a row emitted from D
does not emit any output -

Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F wm=2000
NO REPLY

My understanding of temporal joins is that the latest row from D should be
picked for joining rows from F.  Is my expectation that the (2, 2, 2s) in F
join with (2, 2, 1s) row in D wrong?

Regards,
Satyam


On Tue, Mar 16, 2021 at 5:54 AM Timo Walther  wrote:

> Hi Satyam,
>
> first of all your initial join query can also work, you just need to
> make sure that no time attribute is in the SELECT clause. As the
> exception indicates, you need to cast all time attributes to TIMESTAMP.
> The reason for this is some major design issue that is also explained
> here where a time attribute must not be in the output of a regular join:
>
> https://stackoverflow.com/a/64500296/806430
>
> However, since you would like to perform the join "time-based" either
> interval join or temporal join might solve your use cases.
>
> In your case I guess the watermark strategy of D is the problem. Are you
> sure the result is:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>
> and not:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>
> Or maybe the watermark is even dropped. Could you try to use a watermark
> strategy with
>
> `R` - INTERVAL '0.001' SECONDS
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 16.03.21 04:37, Satyam Shekhar wrote:
> > Hello folks,
> >
> > I would love to hear back your feedback on this.
> >
> > Regards,
> > Satyam
> >
> > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar  > > wrote:
> >
> > Hello folks,
> >
> > I am looking to enrich rows from an unbounded streaming table by
> > joining it with a bounded static table while preserving rowtime for
> > the streaming table. For example, let's consider table two tables F
> > and D, where F is unbounded and D is bounded. The schema for the two
> > tables is the following -
> >
> > F:
> >   |-- C0: BIGINT
> >   |-- C1: STRING
> >   |-- R: TIMESTAMP(3) **rowtime**
> >   |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >
> > D:
> >   |-- C0: BIGINT
> >   |-- C1: STRING NOT NULL
> >
> > I'd like to run the following query on this schema -
> >
> > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> >  from F join D ON F.C1 = D.C1
> >  group by D.C1, tumble(F.R, interval '1' second)
> >
> > However, I run into the following error while running the above
> query -
> >
> > "Rowtime attributes must not be in the input rows of a regular join.
> > As a workaround you can cast the time attributes of input tables to
> > TIMESTAMP before."
> >
> > My understanding reading the docs is that Time Temporal Join is
> > meant to solve this problem. So I model table D as the following 

Re: General guidance

2021-03-25 Thread Kenneth Knowles
This is a Beam issue indeed, though it is an issue with the FlinkRunner. So
I think I will BCC the Flink list.

You may be in one of the following situations:
 - These timers should not be viewed as distinct by the runner, but
deduped, per
https://issues.apache.org/jira/browse/BEAM-8212#comment-16946013
 - There is a different problem if you have an unbounded key space with
windows that never expire, since then there are unbounded numbers of truly
distinct (but irrelevant) timers. That is also the responsibility of the
runner to simply not set timers that can never fire.

Kenn

On Thu, Mar 25, 2021 at 11:57 AM Almeida, Julius 
wrote:

> Hi Team,
>
>
>
> My streaming pipeline is based on beam & running using flink runner with
> rocksdb as state backend.
>
>
>
> Over time I am  seeing memory spike & after giving a look at heap dump, I
> am seeing records in  ‘__StatefulParDoGcTimerId’ which seems to be never
> cleaned.
>
>
>
> Found this jira https://issues.apache.org/jira/browse/BEAM-8212
> describing the issue I believe I am facing.
>
>
>
> Any pointers would be helpful in identifying possible solution.
>
>
>
> Thanks,
>
> Julius
>


General guidance

2021-03-25 Thread Almeida, Julius
Hi Team,

My streaming pipeline is based on beam & running using flink runner with 
rocksdb as state backend.

Over time I am  seeing memory spike & after giving a look at heap dump, I am 
seeing records in  ‘__StatefulParDoGcTimerId’ which seems to be never cleaned.

Found this jira https://issues.apache.org/jira/browse/BEAM-8212 describing the 
issue I believe I am facing.

Any pointers would be helpful in identifying possible solution.

Thanks,
Julius


FlinkKafkaConsumer - Broadcast - Initial Load

2021-03-25 Thread Sandeep khanzode
Hi,

I have a master/reference data that needs to come in through a 
FlinkKafkaConsumer to be broadcast to all nodes and subsequently joined with 
the actual stream for enriching content.

The Kafka consumer gets CDC-type records from database changes. All this works 
well.


My question is how do I initialise the pipeline for the first set of records in 
the database? i.e. those that are not CDC? 

When I deploy for the first time, I would need all the DB records to be sent to 
the FlinkKafkaConsumer before any CDC updates happen.

Is there a hook that allows for the first time initial load of the records in 
the Kafka topic to be broadcast?



Also, about the broadcast state, since we are persisting the state in RocksDB 
backend, I am assuming that the state backend would have the latest records and 
even if the task manager crashes and restarts, we will have the correct Kafka 
consumer group topic offsets recorded so that the next time, we do not 
“startFromEarliest”? Is that right?

Will the state always maintain the updates to the records as well as the Kafka 
topic offsets?


Thanks,
Sandeep 

Re: Flink on Minikube

2021-03-25 Thread Sandeep khanzode
Hi Arvid,

Thanks, will set the scope to Provided and try. 

Are there public examples in GitHub that demonstrate a sample app in Minikube? 

Sandeep

> On 23-Mar-2021, at 3:17 PM, Arvid Heise  wrote:
> 
> Hi Sandeep,
> 
> please have a look at [1], you should add most Flink dependencies as provided 
> - exceptions are connectors (or in general stuff that is not in flink/lib/ or 
> flink/plugins).
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#setting-up-a-project-basic-dependencies
>  
> 
> On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode  > wrote:
> Hi Arvid,
> 
> I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I 
> wanted to set this up for my testing purposes.
> 
> Below is the Dockerfile:
> FROM apache/flink:1.12.1-java11
> RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar 
> /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
> ADD myJar.jar /opt/flink/usrlib/myJar.jar
> 
> … But, in my process, this is a Fat JAR created by the Maven Shade Plugin. 
> Are you saying that all Flink classes should not be part of the user JAR? How 
> does that work? Do we set the scope of the dependencies to compile (or, not 
> runtime) for Flink Jars? Do we have any samples/examples that shows this? 
> Would be really helpful.
> 
> 
>> On 22-Mar-2021, at 8:00 PM, Arvid Heise > > wrote:
>> 
>> Hi Sandeep,
>> 
>> The first error definitively indicates a classloading issue, which may also 
>> be the cause for the second error.
>> 
>> Can you describe where you put your jar inside the docker image and which 
>> execution mode you are using? As a general rule, the jar is not supposed to 
>> go into flink/lib.
>> Also make sure to never shade non-connector classes of Flink into your jar. 
>> A typical user jar should be ~1MB.
>> 
>> On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode > > wrote:
>> Hello,
>> 
>> I have a fat JAR compiled using the Man Shade plugin and everything  works 
>> correctly when I deploy it on a standalone local cluster i.e. one job and 
>> one task manager node.
>> 
>> But I installed Minikube and the same JAR file packaged into a docker image 
>> fails with weird serialization  errors:
>> 
>> Caused by: java.lang.ClassCastException: cannot assign instance of 
>> java.lang.invoke.SerializedLambda to field 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
>>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
>> 
>> 
>> … or in certain cases, if I comment out everything except the Kafka Source, 
>> then ...
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
>> org.apache.kafka.common.requests.MetadataRequest$Builder
>> 
>> 
>> Is there anything I am missing with the Minikube setup? I initially tried 
>> with the steps for the Job Application cluster on the website, but I was 
>> unable to get the /usrlib mounted from the hostpath. 
>> 
>> 
>> So, I created a simple docker image from ...
>> apache/flink:1.12.1-java11
>> 
>> But I have not had any success getting the same job to run here. Please let 
>> me know if there are well-known steps or issues that I can check.
>> 
>> Thanks,
>> Sandeep
> 



Question about checkpoints and savepoints

2021-03-25 Thread Robert Cullen
When I run a job on my Kubernetes session cluster only the checkpoint
directories are created but not the savepoints. (Filesystem configured to
S3 Minio)  Any ideas?

-- 
Robert Cullen
240-475-4490


Re: Hadoop is not in the classpath/dependencies

2021-03-25 Thread Maminspapin
I downloaded the lib (last version) from here:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/

and put it in the flink_home/lib directory.

It helped.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Hadoop is not in the classpath/dependencies

2021-03-25 Thread Maminspapin
I have the same problem  ...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink job repeated restart failure

2021-03-25 Thread Arvid Heise
Hi Vinaya,

SpillingAdaptiveSpanningRecordDeserializer tries to create a directory in
the temp directory, which you can configure by setting io.tmp.dirs. By
default, it's set to System.getProperty("java.io.tmpdir"), which seems to
be invalid in your case. (Note that the directory has to exist on the task
managers)

Best,

Arvid

On Thu, Mar 25, 2021 at 7:27 AM VINAYA KUMAR BENDI 
wrote:

> Dear all,
>
>
>
> One of the Flink jobs gave below exception and failed. Several attempts to
> restart the job resulted in the same exception and the job failed each
> time. The job started successfully only after changing the file name.
>
>
>
> *Flink Version*: 1.11.2
>
>
>
> *Exception*
>
> 2021-03-24 20:13:09,288 INFO
> org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer
> clientId=producer-2] Closing the Kafka producer with timeoutMillis = 0 ms.
>
> 2021-03-24 20:13:09,288 INFO
> org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer
> clientId=producer-2] Proceeding to force close the producer since pending
> requests could not be completed within timeout 0 ms.
>
> 2021-03-24 20:13:09,304 WARN
> org.apache.flink.runtime.taskmanager.Task[] - Flat Map
> -> async wait operator -> Process -> Sink: Unnamed (1/1)
> (8905142514cb25adbd42980680562d31) switched from RUNNING to FAILED.
>
> java.io.IOException: No such file or directory
>
> at java.io.UnixFileSystem.createFileExclusively(Native Method)
> ~[?:1.8.0_252]
>
> at java.io.File.createNewFile(File.java:1012) ~[?:1.8.0_252]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.createSpillingChannel(SpanningWrapper.java:291)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.updateLength(SpanningWrapper.java:178)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.transferFrom(SpanningWrapper.java:111)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
>
> 2021-03-24 20:13:09,305 INFO
> org.apache.flink.runtime.taskmanager.Task[] - Freeing
> task resources for Flat Map -> async wait operator -> Process -> Sink:
> Unnamed (1/1) (8905142514cb25adbd42980680562d31).
>
> 2021-03-24 20:13:09,311 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1)
> 8905142514cb25adbd42980680562d31.
>
>
>
> *File*:
> https://github.com/apache/flink/blob/release-1.11.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
>
>
>
> *Related Jira ID*: https://issues.apache.org/jira/browse/FLINK-18811
>
>
>
> Similar exception mentioned in FLINK-18811 which has a fix in 1.12.0.
> Though in our case, we didn’t notice any disk failure. Is there any other
> reason(s) for the above mentioned IOException?
>
>
>
> While we are planning to upgrade to the latest Flink version, are there
> any other workaround(s) instead of deploying the 

reading from jdbc connection

2021-03-25 Thread Arran Duff
Hi,

I'm quite new to flink and I'm trying to create an application, which reads 
ID's from a kinesis stream and then uses these to read from a mysql database. I 
expect that I would just be doing a join of the id's onto the table

I'm struggling to understand from the documentation how to actually connect to 
jdbc from flink using java.  For example - the code shown 
here
doesn't give any information about what to provide as arguments in the connect 
method. Reading the javadoc I can see that it needs to be a 
ConnectorDescriptor
 object. And I can see that the known subclasses include a HBase, Kafka and 
Hbase connector. But I don't see one for JDBC. Should I just be using the 
CustomConnectorDescriptor
 and adding JDBC connection options? Will this work out of the box or am I 
going down a rabbit hole?

I also note that all of the examples that I see for the jdbc connector are 
written in SQL, or DDL or yaml - for example here 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html#jdbc-connector
I'm not quite sure how I would go about getting that working in java. Any help 
would be greatly appreciated

Thanks,
Arran


[HEADS UP] Flink Community Survey closes Tue, March 30

2021-03-25 Thread Ana Vasiliuk
Hi all,

Thanks to everyone who has already left feedback on the community
experience in the Community Survey!

The survey is open until *Tuesday, March 30th*, so if you haven't done so
yet, please take 2 minutes (maybe less!) to fill it out below. Your opinion
is very helpful for us to better understand your satisfaction with the
current community activities and learn where we can improve.

Thanks a lot!

Ana
[1] https://form.typeform.com/to/M5JyHILk


Re: Flink Native Kubernetes 部署模式下,如何开启Queryable State?

2021-03-25 Thread Yang Wang
和Standalone一样,你可以按照自己创建一个taskmanager-query-state-service,然后把selector修改一下就好了
native会自动添加如下的label,可以filter出来属于一个Flink cluster的TaskManager

app: 
component: taskmanager
type: flink-native-kubernetes


Best,
Yang

tian lin  于2021年3月25日周四 下午4:43写道:

> 各位好:
> 请教Flink 1.12.1 在Flink Native Kubernets部署模式下,如何开启Queryable
> State呢?官网提供了Standaleon K8S下开启的说明(
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html#enabling-queryable-state),但Native
> K8S部署模式下,无论是Session还是Application 模式,Flink相关k8s
> spec基本都是由flink代码生成,尤其是Queryable State端口以及相关K8S Service没有非常便利的自动生成及部署办法。
>
> Sent from Mail for Windows 10
>
>


Re: flink sql jmh failure

2021-03-25 Thread jie mei
HI, Guowei

yeah, I think so too. There is no way trigger a checkpoint and wath the
checkpoint finished now, so I will do the benchmark with lower level api.


Guowei Ma  于2021年3月25日周四 下午4:59写道:

> Hi,
> I am not an expert of JMH but it seems that it is not an error. From the
> log it looks like that the job is not finished.
> The data source continues to read data when JMH finishes.
>
> Thread[Legacy Source Thread - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) ->
> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) -> Sink:
> Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
> fields=[dt, first_bigint, second_bigint, first_int, second_int,
> first_float, second_float, first_double, second_double, first_string,
> second_string]) (3/6),5,Flink Task Threads]
>   at
> org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
>   at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
>   at
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 9:56 PM jie mei  wrote:
>
>> Hi, Yik San
>>
>> I use a library wroten by myself and trying to verify the performance.
>>
>>
>> Yik San Chan  于2021年3月24日周三 下午9:07写道:
>>
>>> Hi Jie,
>>>
>>> I am curious what library do you use to get the ClickHouseTableBuilder
>>>
>>> On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:
>>>
 Hi, Community

 I run a jmh benchmark task get blew error, which use flink sql
 consuming data from data-gen connector(10_000_000) and write data to
 clickhouse. blew is partly log and you can see completable log by attached
 file

 *My jmh benchmark code as blew:*

 @Benchmark
 @Threads(1)
 @Fork(1)
 public void sinkBenchmark() throws IOException {

   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
   .getExecutionEnvironment();
   streamEnv.enableCheckpointing(6);

   EnvironmentSettings settings = EnvironmentSettings.newInstance()
   .useBlinkPlanner()
   .inStreamingMode().build();
   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
 settings);

   // create clickhouse table
   new ClickHouseTableBuilder(tableEnv,
   parseSchema("clickhouse_sink_table.sql"))
   .database("benchmark")
   .table("bilophus_sink_benchmark")
   .address("jdbc:clickhouse://localhost:8123")
   .build();

   // create mock data table
   tableEnv.executeSql(
   parseSchema("clickhouse_source_table.sql") +
   "WITH (" +
   "'connector' = 'datagen'," +
   "'number-of-rows' = '1000')");

   tableEnv.executeSql(
   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
 CLICKHOUSE_SOURCE_BENCHMARK");

 }

 *running command:*

 mvn clean package -DskipTests

 
   org.codehaus.mojo
   exec-maven-plugin
   1.6.0
   
 
   test-benchmarks
   test
   
 exec
   
 
   
   
 false
 test
 java
 
   -Xmx6g
   -classpath
   
   org.openjdk.jmh.Main
   
   -foe
   true
   
   -f
   1
   -i
   1
   -wi
   0
   -rf
   csv
   .*
 
   
 


 Non-finished threads:

 Thread[Source: TableSourceScan(table=[[default_catalog,
 default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
 second_bigint, first_int, second_int, first_float, second_float,
 first_double, second_double, first_string, s
 econd_string]) 

Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-25 Thread Konstantin Knauf
Hi Matthias,

Thank you for following up on this. +1 to officially deprecate Mesos in the
code and documentation, too. It will be confusing for users if this
diverges from the roadmap.

Cheers,

Konstantin

On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl 
wrote:

> Hi everyone,
> considering the upcoming release of Flink 1.13, I wanted to revive the
> discussion about the Mesos support ones more. Mesos is also already listed
> as deprecated in Flink's overall roadmap [1]. Maybe, it's time to align the
> documentation accordingly to make it more explicit?
>
> What do you think?
>
> Best,
> Matthias
>
> [1] https://flink.apache.org/roadmap.html#feature-radar
>
> On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann 
> wrote:
>
> > Hi Oleksandr,
> >
> > yes you are right. The biggest problem is at the moment the lack of test
> > coverage and thereby confidence to make changes. We have some e2e tests
> > which you can find here [1]. These tests are, however, quite coarse
> grained
> > and are missing a lot of cases. One idea would be to add a Mesos e2e test
> > based on Flink's end-to-end test framework [2]. I think what needs to be
> > done there is to add a Mesos resource and a way to submit jobs to a Mesos
> > cluster to write e2e tests.
> >
> > [1] https://github.com/apache/flink/tree/master/flink-jepsen
> > [2]
> >
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
> >
> > Cheers,
> > Till
> >
> > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
> > o.nitavs...@criteo.com> wrote:
> >
> >> Hello Xintong,
> >>
> >> Thanks for the insights and support.
> >>
> >> Browsing the Mesos backlog and didn't identify anything critical, which
> >> is left there.
> >>
> >> I see that there are were quite a lot of contributions to the Flink
> Mesos
> >> in the recent version:
> >> https://github.com/apache/flink/commits/master/flink-mesos.
> >> We plan to validate the current Flink master (or release 1.12 branch)
> our
> >> Mesos setup. In case of any issues, we will try to propose changes.
> >> My feeling is that our test results shouldn't affect the Flink 1.12
> >> release cycle. And if any potential commits will land into the 1.12.1 it
> >> should be totally fine.
> >>
> >> In the future, we would be glad to help you guys with any
> >> maintenance-related questions. One of the highest priorities around this
> >> component seems to be the development of the full e2e test.
> >>
> >> Kind Regards
> >> Oleksandr Nitavskyi
> >> 
> >> From: Xintong Song 
> >> Sent: Tuesday, October 27, 2020 7:14 AM
> >> To: dev ; user 
> >> Cc: Piyush Narang 
> >> Subject: [BULK]Re: [SURVEY] Remove Mesos support
> >>
> >> Hi Piyush,
> >>
> >> Thanks a lot for sharing the information. It would be a great relief
> that
> >> you are good with Flink on Mesos as is.
> >>
> >> As for the jira issues, I believe the most essential ones should have
> >> already been resolved. You may find some remaining open issues here [1],
> >> but not all of them are necessary if we decide to keep Flink on Mesos
> as is.
> >>
> >> At the moment and in the short future, I think helps are mostly needed
> on
> >> testing the upcoming release 1.12 with Mesos use cases. The community is
> >> currently actively preparing the new release, and hopefully we could
> come
> >> up with a release candidate early next month. It would be greatly
> >> appreciated if you fork as experienced Flink on Mesos users can help
> with
> >> verifying the release candidates.
> >>
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >> [1]
> >>
> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
> >> <
> >>
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D=0
> >> >
> >>
> >> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang  >> > wrote:
> >>
> >> Hi Xintong,
> >>
> >>
> >>
> >> Do you have any jiras that cover any of the items on 1 or 2? I can reach
> >> out to folks internally and see if I can get some folks to commit to
> >> helping out.
> >>
> >>
> >>
> >> To cover the other qs:
> >>
> >>   *   Yes, we’ve not got a plan at the moment to get off Mesos. We use
> >> Yarn for some our Flink workloads when we can. Mesos is only used when
> we
> >> need streaming capabilities in our WW dcs (as our Yarn is centralized in
> >> one DC)
> >>   *   We’re currently on Flink 1.9 (old planner). We have a 

Glob support on file access

2021-03-25 Thread Etienne Chauchot

Hi all,

In case it is useful to some of you:

I have a big batch that needs to use globs (*.parquet for example) to 
read input files. It seems that globs do not work out of the box (see 
https://issues.apache.org/jira/browse/FLINK-6417)


But there is a workaround:


final  FileInputFormat inputFormat =new  FileInputFormat(new  
Path(extractDir(filePath)));/* or any subclass of FileInputFormat*/  /*extact 
parent dir*/
inputFormat.setFilesFilter(new  GlobFilePathFilter(Collections.singletonList(filePath), Collections.emptyList()));/*filePath contains glob, the whole path needs to be provided to 
GlobFilePathFilter*/

inputFormat.setNestedFileEnumeration(true);

Hope, it helps some people

Etienne Chauchot




Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-25 Thread Matthias Pohl
Hi everyone,
considering the upcoming release of Flink 1.13, I wanted to revive the
discussion about the Mesos support ones more. Mesos is also already listed
as deprecated in Flink's overall roadmap [1]. Maybe, it's time to align the
documentation accordingly to make it more explicit?

What do you think?

Best,
Matthias

[1] https://flink.apache.org/roadmap.html#feature-radar

On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann  wrote:

> Hi Oleksandr,
>
> yes you are right. The biggest problem is at the moment the lack of test
> coverage and thereby confidence to make changes. We have some e2e tests
> which you can find here [1]. These tests are, however, quite coarse grained
> and are missing a lot of cases. One idea would be to add a Mesos e2e test
> based on Flink's end-to-end test framework [2]. I think what needs to be
> done there is to add a Mesos resource and a way to submit jobs to a Mesos
> cluster to write e2e tests.
>
> [1] https://github.com/apache/flink/tree/master/flink-jepsen
> [2]
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
>
> Cheers,
> Till
>
> On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
> o.nitavs...@criteo.com> wrote:
>
>> Hello Xintong,
>>
>> Thanks for the insights and support.
>>
>> Browsing the Mesos backlog and didn't identify anything critical, which
>> is left there.
>>
>> I see that there are were quite a lot of contributions to the Flink Mesos
>> in the recent version:
>> https://github.com/apache/flink/commits/master/flink-mesos.
>> We plan to validate the current Flink master (or release 1.12 branch) our
>> Mesos setup. In case of any issues, we will try to propose changes.
>> My feeling is that our test results shouldn't affect the Flink 1.12
>> release cycle. And if any potential commits will land into the 1.12.1 it
>> should be totally fine.
>>
>> In the future, we would be glad to help you guys with any
>> maintenance-related questions. One of the highest priorities around this
>> component seems to be the development of the full e2e test.
>>
>> Kind Regards
>> Oleksandr Nitavskyi
>> 
>> From: Xintong Song 
>> Sent: Tuesday, October 27, 2020 7:14 AM
>> To: dev ; user 
>> Cc: Piyush Narang 
>> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>>
>> Hi Piyush,
>>
>> Thanks a lot for sharing the information. It would be a great relief that
>> you are good with Flink on Mesos as is.
>>
>> As for the jira issues, I believe the most essential ones should have
>> already been resolved. You may find some remaining open issues here [1],
>> but not all of them are necessary if we decide to keep Flink on Mesos as is.
>>
>> At the moment and in the short future, I think helps are mostly needed on
>> testing the upcoming release 1.12 with Mesos use cases. The community is
>> currently actively preparing the new release, and hopefully we could come
>> up with a release candidate early next month. It would be greatly
>> appreciated if you fork as experienced Flink on Mesos users can help with
>> verifying the release candidates.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
>> <
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D=0
>> >
>>
>> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang > > wrote:
>>
>> Hi Xintong,
>>
>>
>>
>> Do you have any jiras that cover any of the items on 1 or 2? I can reach
>> out to folks internally and see if I can get some folks to commit to
>> helping out.
>>
>>
>>
>> To cover the other qs:
>>
>>   *   Yes, we’ve not got a plan at the moment to get off Mesos. We use
>> Yarn for some our Flink workloads when we can. Mesos is only used when we
>> need streaming capabilities in our WW dcs (as our Yarn is centralized in
>> one DC)
>>   *   We’re currently on Flink 1.9 (old planner). We have a plan to bump
>> to 1.11 / 1.12 this quarter.
>>   *   We typically upgrade once every 6 months to a year (not every
>> release). We’d like to speed up the cadence but we’re not there yet.
>>   *   We’d largely be good with keeping Flink on Mesos as-is and
>> functional while missing out on some of the newer features. We understand
>> the pain on the communities side and we can take on the work if we see some
>> fancy improvement in Flink on Yarn / K8s that we want in Mesos to put in
>> the request to 

Re: Native kubernetes execution and History server

2021-03-25 Thread Lukáš Drbal
Hello Guowei,

I just checked it and it works!

Thanks a lot!

Here is workaround which use UUID as jobId:
-D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")


L.

On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma  wrote:

> Hi,
> Thanks for providing the logs. From the logs this is a known bug.[1]
> Maybe you could use `$internal.pipeline.job-id` to set your own
> job-id.(Thanks to Wang Yang)
> But keep in mind this is only for internal use and may be changed in
> some release. So you should keep an eye on [1] for the correct solution.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19358
>
> Best,
> Guowei
>
>
> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal  wrote:
>
>> Hello,
>>
>> sure. Here is log from first run which succeed -
>> https://pastebin.com/tV75ZS5S
>> and here is from second run (it's same for all next) -
>> https://pastebin.com/pwTFyGvE
>>
>> My Docker file is pretty simple, just take wordcount + S3
>>
>> FROM flink:1.12.2
>>
>> RUN mkdir -p $FLINK_HOME/usrlib
>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>>  $FLINK_HOME/usrlib/wordcount.jar
>>
>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>>
>> Thanks!
>>
>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>>
>>> Hi,
>>> After some discussion with Wang Yang offline, it seems that there might
>>> be a jobmanager failover. So would you like to share full jobmanager log?
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
>>> wrote:
>>>
 Hi,

 I would like to use native kubernetes execution [1] for one batch job
 and let scheduling on kubernetes. Flink version: 1.12.2.

 Kubernetes job:
 apiVersion: batch/v1beta1
 kind: CronJob
 metadata:
   name: scheduled-job
 spec:
   schedule: "*/1 * * * *"
   jobTemplate:
 spec:
   template:
 metadata:
   labels:
 app: super-flink-batch-job
 spec:
   containers:
   - name: runner
 image: localhost:5000/batch-flink-app-v3:latest
 imagePullPolicy: Always
 command:
   - /bin/sh
   - -c
   - /opt/flink/bin/flink run-application --target
 kubernetes-application -Dkubernetes.service-account=flink-service-account
 -Dkubernetes.rest-service.exposed.type=NodePort
 -Dkubernetes.cluster-id=batch-job-cluster
 -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
 -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
 -Ds3.secret-key=SECRETKEY
 -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
 -Ds3.path-style-access=true -Ds3.ssl.enabled=false
 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=s3://flink/flink-ha
 local:///opt/flink/usrlib/job.jar
   restartPolicy: OnFailure


 This works well for me but I would like to write the result to the
 archive path and show it in the History server (running as separate
 deployment in k8)

 Anytime it creates JobId= which
 obviously leads to

 Caused by: java.util.concurrent.ExecutionException:
 org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
 already been submitted.
 at
 java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 ~[?:1.8.0_282]
 at
 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 ~[?:1.8.0_282]
 at
 org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
 ~[?:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 ~[?:1.8.0_282]
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:1.8.0_282]
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_282]
 at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]

Hadoop is not in the classpath/dependencies

2021-03-25 Thread Matthias Seiler
Hello everybody,

I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines.
The job should store the checkpoints on HDFS like so:
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
```

Unfortunately, the JobManager throws
```
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. For a full list of supported file systems,
please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
// ...
Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
not in the classpath/dependencies.
```
and I don't understand why.

`echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
wildcards. Flink's JobManger prints the classpath which includes
specific packages from these Hadoop libraries. Besides that, Flink
creates the state directories on HDFS, but no content.

Thank you for any advice,
Matthias



Re: Native kubernetes execution and History server

2021-03-25 Thread Guowei Ma
Hi,
Thanks for providing the logs. From the logs this is a known bug.[1]
Maybe you could use `$internal.pipeline.job-id` to set your own
job-id.(Thanks to Wang Yang)
But keep in mind this is only for internal use and may be changed in
some release. So you should keep an eye on [1] for the correct solution.

[1] https://issues.apache.org/jira/browse/FLINK-19358

Best,
Guowei


On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal  wrote:

> Hello,
>
> sure. Here is log from first run which succeed -
> https://pastebin.com/tV75ZS5S
> and here is from second run (it's same for all next) -
> https://pastebin.com/pwTFyGvE
>
> My Docker file is pretty simple, just take wordcount + S3
>
> FROM flink:1.12.2
>
> RUN mkdir -p $FLINK_HOME/usrlib
> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>  $FLINK_HOME/usrlib/wordcount.jar
>
> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>
> Thanks!
>
> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>
>> Hi,
>> After some discussion with Wang Yang offline, it seems that there might
>> be a jobmanager failover. So would you like to share full jobmanager log?
>> Best,
>> Guowei
>>
>>
>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
>> wrote:
>>
>>> Hi,
>>>
>>> I would like to use native kubernetes execution [1] for one batch job
>>> and let scheduling on kubernetes. Flink version: 1.12.2.
>>>
>>> Kubernetes job:
>>> apiVersion: batch/v1beta1
>>> kind: CronJob
>>> metadata:
>>>   name: scheduled-job
>>> spec:
>>>   schedule: "*/1 * * * *"
>>>   jobTemplate:
>>> spec:
>>>   template:
>>> metadata:
>>>   labels:
>>> app: super-flink-batch-job
>>> spec:
>>>   containers:
>>>   - name: runner
>>> image: localhost:5000/batch-flink-app-v3:latest
>>> imagePullPolicy: Always
>>> command:
>>>   - /bin/sh
>>>   - -c
>>>   - /opt/flink/bin/flink run-application --target
>>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>>> -Dkubernetes.rest-service.exposed.type=NodePort
>>> -Dkubernetes.cluster-id=batch-job-cluster
>>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>>> -Ds3.secret-key=SECRETKEY
>>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>>> local:///opt/flink/usrlib/job.jar
>>>   restartPolicy: OnFailure
>>>
>>>
>>> This works well for me but I would like to write the result to the
>>> archive path and show it in the History server (running as separate
>>> deployment in k8)
>>>
>>> Anytime it creates JobId= which
>>> obviously leads to
>>>
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>> already been submitted.
>>> at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
>>> ~[?:?]
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_282]
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_282]
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_282]
>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> 

【MySQL cdc 的数据不能聚合】

2021-03-25 Thread guoyb
请问,MySQLcdc的数据,后续要聚合,应该放到upsertkafka吗?还是有其他方法。

Re: Native kubernetes execution and History server

2021-03-25 Thread Lukáš Drbal
Hello,

sure. Here is log from first run which succeed -
https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) -
https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
 $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/

Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:

> Hi,
> After some discussion with Wang Yang offline, it seems that there might be
> a jobmanager failover. So would you like to share full jobmanager log?
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
> wrote:
>
>> Hi,
>>
>> I would like to use native kubernetes execution [1] for one batch job and
>> let scheduling on kubernetes. Flink version: 1.12.2.
>>
>> Kubernetes job:
>> apiVersion: batch/v1beta1
>> kind: CronJob
>> metadata:
>>   name: scheduled-job
>> spec:
>>   schedule: "*/1 * * * *"
>>   jobTemplate:
>> spec:
>>   template:
>> metadata:
>>   labels:
>> app: super-flink-batch-job
>> spec:
>>   containers:
>>   - name: runner
>> image: localhost:5000/batch-flink-app-v3:latest
>> imagePullPolicy: Always
>> command:
>>   - /bin/sh
>>   - -c
>>   - /opt/flink/bin/flink run-application --target
>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>> -Dkubernetes.rest-service.exposed.type=NodePort
>> -Dkubernetes.cluster-id=batch-job-cluster
>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>> -Ds3.secret-key=SECRETKEY
>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>> local:///opt/flink/usrlib/job.jar
>>   restartPolicy: OnFailure
>>
>>
>> This works well for me but I would like to write the result to the
>> archive path and show it in the History server (running as separate
>> deployment in k8)
>>
>> Anytime it creates JobId= which obviously
>> leads to
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> ~[?:1.8.0_282]
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
>> ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_282]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_282]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_282]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> ... 10 more
>>
>> I assume it is because it will spawn a completely new cluster for each
>> run.
>>
>> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>>
>> Thanks for advice.
>>
>> L.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>>

Re: flink sql jmh failure

2021-03-25 Thread Guowei Ma
Hi,
I am not an expert of JMH but it seems that it is not an error. From the
log it looks like that the job is not finished.
The data source continues to read data when JMH finishes.

Thread[Legacy Source Thread - Source:
TableSourceScan(table=[[default_catalog, default_database,
CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, second_string]) ->
Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, second_string]) -> Sink:
Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
fields=[dt, first_bigint, second_bigint, first_int, second_int,
first_float, second_float, first_double, second_double, first_string,
second_string]) (3/6),5,Flink Task Threads]
  at
org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
  at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
  at
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

Best,
Guowei


On Wed, Mar 24, 2021 at 9:56 PM jie mei  wrote:

> Hi, Yik San
>
> I use a library wroten by myself and trying to verify the performance.
>
>
> Yik San Chan  于2021年3月24日周三 下午9:07写道:
>
>> Hi Jie,
>>
>> I am curious what library do you use to get the ClickHouseTableBuilder
>>
>> On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:
>>
>>> Hi, Community
>>>
>>> I run a jmh benchmark task get blew error, which use flink sql consuming
>>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>>> is partly log and you can see completable log by attached file
>>>
>>> *My jmh benchmark code as blew:*
>>>
>>> @Benchmark
>>> @Threads(1)
>>> @Fork(1)
>>> public void sinkBenchmark() throws IOException {
>>>
>>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>>   .getExecutionEnvironment();
>>>   streamEnv.enableCheckpointing(6);
>>>
>>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>   .useBlinkPlanner()
>>>   .inStreamingMode().build();
>>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>>> settings);
>>>
>>>   // create clickhouse table
>>>   new ClickHouseTableBuilder(tableEnv,
>>>   parseSchema("clickhouse_sink_table.sql"))
>>>   .database("benchmark")
>>>   .table("bilophus_sink_benchmark")
>>>   .address("jdbc:clickhouse://localhost:8123")
>>>   .build();
>>>
>>>   // create mock data table
>>>   tableEnv.executeSql(
>>>   parseSchema("clickhouse_source_table.sql") +
>>>   "WITH (" +
>>>   "'connector' = 'datagen'," +
>>>   "'number-of-rows' = '1000')");
>>>
>>>   tableEnv.executeSql(
>>>   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>>> CLICKHOUSE_SOURCE_BENCHMARK");
>>>
>>> }
>>>
>>> *running command:*
>>>
>>> mvn clean package -DskipTests
>>>
>>> 
>>>   org.codehaus.mojo
>>>   exec-maven-plugin
>>>   1.6.0
>>>   
>>> 
>>>   test-benchmarks
>>>   test
>>>   
>>> exec
>>>   
>>> 
>>>   
>>>   
>>> false
>>> test
>>> java
>>> 
>>>   -Xmx6g
>>>   -classpath
>>>   
>>>   org.openjdk.jmh.Main
>>>   
>>>   -foe
>>>   true
>>>   
>>>   -f
>>>   1
>>>   -i
>>>   1
>>>   -wi
>>>   0
>>>   -rf
>>>   csv
>>>   .*
>>> 
>>>   
>>> 
>>>
>>>
>>> Non-finished threads:
>>>
>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>> second_bigint, first_int, second_int, first_float, second_float,
>>> first_double, second_double, first_string, s
>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, first_double, second_double, first_string, second_string]) ->
>>> Sink: Sink(table=[default_catal
>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, 

Flink Native Kubernetes 部署模式下,如何开启Queryable State?

2021-03-25 Thread tian lin
各位好:
 请教Flink 1.12.1 在Flink Native Kubernets部署模式下,如何开启Queryable 
State呢?官网提供了Standaleon 
K8S下开启的说明(https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html#enabling-queryable-state),但Native
 K8S部署模式下,无论是Session还是Application 模式,Flink相关k8s  
spec基本都是由flink代码生成,尤其是Queryable State端口以及相关K8S Service没有非常便利的自动生成及部署办法。 

Sent from Mail for Windows 10



Re: Native kubernetes execution and History server

2021-03-25 Thread Guowei Ma
Hi,
After some discussion with Wang Yang offline, it seems that there might be
a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal  wrote:

> Hi,
>
> I would like to use native kubernetes execution [1] for one batch job and
> let scheduling on kubernetes. Flink version: 1.12.2.
>
> Kubernetes job:
> apiVersion: batch/v1beta1
> kind: CronJob
> metadata:
>   name: scheduled-job
> spec:
>   schedule: "*/1 * * * *"
>   jobTemplate:
> spec:
>   template:
> metadata:
>   labels:
> app: super-flink-batch-job
> spec:
>   containers:
>   - name: runner
> image: localhost:5000/batch-flink-app-v3:latest
> imagePullPolicy: Always
> command:
>   - /bin/sh
>   - -c
>   - /opt/flink/bin/flink run-application --target
> kubernetes-application -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=batch-job-cluster
> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
> -Ds3.secret-key=SECRETKEY
> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=s3://flink/flink-ha
> local:///opt/flink/usrlib/job.jar
>   restartPolicy: OnFailure
>
>
> This works well for me but I would like to write the result to the archive
> path and show it in the History server (running as separate deployment in
> k8)
>
> Anytime it creates JobId= which obviously
> leads to
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_282]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_282]
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
> ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_282]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_282]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> ... 10 more
>
> I assume it is because it will spawn a completely new cluster for each run.
>
> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>
> Thanks for advice.
>
> L.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>


退订

2021-03-25 Thread aegean0...@163.com
退订

Flink job repeated restart failure

2021-03-25 Thread VINAYA KUMAR BENDI
Dear all,

One of the Flink jobs gave below exception and failed. Several attempts to 
restart the job resulted in the same exception and the job failed each time. 
The job started successfully only after changing the file name.

Flink Version: 1.11.2

Exception
2021-03-24 20:13:09,288 INFO  org.apache.kafka.clients.producer.KafkaProducer   
   [] - [Producer clientId=producer-2] Closing the Kafka producer with 
timeoutMillis = 0 ms.
2021-03-24 20:13:09,288 INFO  org.apache.kafka.clients.producer.KafkaProducer   
   [] - [Producer clientId=producer-2] Proceeding to force close the 
producer since pending requests could not be completed within timeout 0 ms.
2021-03-24 20:13:09,304 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Flat Map -> async wait operator -> Process -> Sink: Unnamed 
(1/1) (8905142514cb25adbd42980680562d31) switched from RUNNING to FAILED.
java.io.IOException: No such file or directory
at java.io.UnixFileSystem.createFileExclusively(Native Method) 
~[?:1.8.0_252]
at java.io.File.createNewFile(File.java:1012) ~[?:1.8.0_252]
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.createSpillingChannel(SpanningWrapper.java:291)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.updateLength(SpanningWrapper.java:178)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.transferFrom(SpanningWrapper.java:111)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.12-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
2021-03-24 20:13:09,305 INFO  org.apache.flink.runtime.taskmanager.Task 
   [] - Freeing task resources for Flat Map -> async wait operator -> 
Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31).
2021-03-24 20:13:09,311 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FAILED to JobManager for 
task Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) 
8905142514cb25adbd42980680562d31.

File: 
https://github.com/apache/flink/blob/release-1.11.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java

Related Jira ID: https://issues.apache.org/jira/browse/FLINK-18811

Similar exception mentioned in FLINK-18811 which has a fix in 1.12.0. Though in 
our case, we didn't notice any disk failure. Is there any other reason(s) for 
the above mentioned IOException?

While we are planning to upgrade to the latest Flink version, are there any 
other workaround(s) instead of deploying the job again with a different file 
name?

Kind regards,
Vinaya