Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Yuval Itzchakov
The job construction itself is a bit complex, but it can either be a
StatementSet that's being filled, or there is some kind of conversion Table
-> DataStream and then we put the transformations on the DataStream itself.
Invocation looks like this:

  executionEffect =
if (...)
  FlinkTask.lockedEffect(flink.execute(jobName))
else FlinkTask.lockedEffect(statementSet.execute())

If I don't infinitely block on this, it terminates right after starting the
execution:

2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not
contain a setter for field partitionKey
2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ...
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not
contain a setter for field stage
2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ...
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated
configuration key 'akka.client.timeout' instead of proper key
'client.timeout'
2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job
492c9f07d8b3458a52595ab49f636205 is submitted.
2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting Job
with JobId=492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph
submission '' (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job ''
(492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...'
(492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using
restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
backoffTimeMS=1) for ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder Running
initialization on master for job ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
Successfully ran initialization on master in 0 ms.
2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1
pipelined regions in 0 ms
2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state backend
has been configured, using default (HashMap)
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e
2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend
loader loads the state backend as HashMapStateBackend
2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint
storage is set to 'jobmanager'
2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No checkpoint
found during restore.
2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job
492c9f07d8b3458a52595ab49f636205 from savepoint  (allowing non restored
state)
2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the
checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400.
2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring job
492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for
492c9f07d8b3458a52595ab49f636205 located at file:..
2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to
restore
2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover
strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24
for ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of job
'...' (492c9f07d8b3458a52595ab49f636205) under job master id
.
2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting scheduling
with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ...
(492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING.
2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1)
(3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (2/3) 

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Caizhi Weng
Hi!

You talked about status code so I guess you're speaking about the client
that submits the job, not the job itself. Flink jobs does not have "exit
codes", they only have status such as RUNNING and FINISHED.

When you run your user code locally, it is running in a testing
mini-cluster in JVM. So if your client code exits the JVM will also exit
and the job will not be finished properly. However if you submit your job
to the cluster it will be running in the remote cluster. In this case it
doesn't matter whether client has exited or not.

You can check Flink web UI to see if your job has been submitted and
running in the cluster.

Yuval Itzchakov  于2021年12月22日周三 14:48写道:

> I mean it finishes successful and exists with status code 0. Both when
> running locally and submitting to the cluster.
>
> On Wed, Dec 22, 2021, 08:36 Caizhi Weng  wrote:
>
>> Hi!
>>
>> By "the streaming job stops" do you mean the job ends with CANCELED state
>> instead of FINISHED state? Which kind of job are you running? Is it a
>> select job or an insert job? Insert jobs should run continuously once
>> they're submitted. Could you share your user code if possible?
>>
>> Yuval Itzchakov  于2021年12月22日周三 14:11写道:
>>
>>> Hi Caizhi,
>>>
>>> If I don't block on statementset.execute, the job finishes immediately
>>> with exit code 0 and the streaming job stops, and that's not what I want. I
>>> somehow need to block.
>>>
>>>
>>>
>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng  wrote:
>>>
 Hi!

 You can poll the status of that job with REST API [1]. You can tell
 that the job successfully finishes by the FINISHED state and that the job
 fails by the FAILED state.

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid

 Yuval Itzchakov  于2021年12月22日周三 02:36写道:

> Hi,
>
> Flink 1.14.2
> Scala 2.12
>
> I have a streaming job that executes and I want to infinitely wait for
> it's completion, or if an exception is thrown during initialization. When
> using *statementSet.execute().await()*, I get an error:
>
> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
> Result cannot be fetched through the Job Client when in Web Submission.*
> at
> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
> at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
> ... 7 more
>
> This is because the Web Submission via the REST API is using
> the WebSubmissionJobClient.
>
> How can I wait on my Flink SQL streaming job when submitting through
> the REST API?
> --
> Best Regards,
> Yuval Itzchakov
>



Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Yuval Itzchakov
I mean it finishes successful and exists with status code 0. Both when
running locally and submitting to the cluster.

On Wed, Dec 22, 2021, 08:36 Caizhi Weng  wrote:

> Hi!
>
> By "the streaming job stops" do you mean the job ends with CANCELED state
> instead of FINISHED state? Which kind of job are you running? Is it a
> select job or an insert job? Insert jobs should run continuously once
> they're submitted. Could you share your user code if possible?
>
> Yuval Itzchakov  于2021年12月22日周三 14:11写道:
>
>> Hi Caizhi,
>>
>> If I don't block on statementset.execute, the job finishes immediately
>> with exit code 0 and the streaming job stops, and that's not what I want. I
>> somehow need to block.
>>
>>
>>
>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> You can poll the status of that job with REST API [1]. You can tell that
>>> the job successfully finishes by the FINISHED state and that the job fails
>>> by the FAILED state.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>
>>> Yuval Itzchakov  于2021年12月22日周三 02:36写道:
>>>
 Hi,

 Flink 1.14.2
 Scala 2.12

 I have a streaming job that executes and I want to infinitely wait for
 it's completion, or if an exception is thrown during initialization. When
 using *statementSet.execute().await()*, I get an error:

 Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
 Result cannot be fetched through the Job Client when in Web Submission.*
 at
 org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
 at
 org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
 ... 7 more

 This is because the Web Submission via the REST API is using
 the WebSubmissionJobClient.

 How can I wait on my Flink SQL streaming job when submitting through
 the REST API?
 --
 Best Regards,
 Yuval Itzchakov

>>>


Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Caizhi Weng
Hi!

By "the streaming job stops" do you mean the job ends with CANCELED state
instead of FINISHED state? Which kind of job are you running? Is it a
select job or an insert job? Insert jobs should run continuously once
they're submitted. Could you share your user code if possible?

Yuval Itzchakov  于2021年12月22日周三 14:11写道:

> Hi Caizhi,
>
> If I don't block on statementset.execute, the job finishes immediately
> with exit code 0 and the streaming job stops, and that's not what I want. I
> somehow need to block.
>
>
>
> On Wed, Dec 22, 2021, 03:43 Caizhi Weng  wrote:
>
>> Hi!
>>
>> You can poll the status of that job with REST API [1]. You can tell that
>> the job successfully finishes by the FINISHED state and that the job fails
>> by the FAILED state.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>
>> Yuval Itzchakov  于2021年12月22日周三 02:36写道:
>>
>>> Hi,
>>>
>>> Flink 1.14.2
>>> Scala 2.12
>>>
>>> I have a streaming job that executes and I want to infinitely wait for
>>> it's completion, or if an exception is thrown during initialization. When
>>> using *statementSet.execute().await()*, I get an error:
>>>
>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
>>> cannot be fetched through the Job Client when in Web Submission.*
>>> at
>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>> at
>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>> ... 7 more
>>>
>>> This is because the Web Submission via the REST API is using
>>> the WebSubmissionJobClient.
>>>
>>> How can I wait on my Flink SQL streaming job when submitting through the
>>> REST API?
>>> --
>>> Best Regards,
>>> Yuval Itzchakov
>>>
>>


Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Yuval Itzchakov
Hi Caizhi,

If I don't block on statementset.execute, the job finishes immediately with
exit code 0 and the streaming job stops, and that's not what I want. I
somehow need to block.



On Wed, Dec 22, 2021, 03:43 Caizhi Weng  wrote:

> Hi!
>
> You can poll the status of that job with REST API [1]. You can tell that
> the job successfully finishes by the FINISHED state and that the job fails
> by the FAILED state.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>
> Yuval Itzchakov  于2021年12月22日周三 02:36写道:
>
>> Hi,
>>
>> Flink 1.14.2
>> Scala 2.12
>>
>> I have a streaming job that executes and I want to infinitely wait for
>> it's completion, or if an exception is thrown during initialization. When
>> using *statementSet.execute().await()*, I get an error:
>>
>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
>> cannot be fetched through the Job Client when in Web Submission.*
>> at
>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>> at
>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>> ... 7 more
>>
>> This is because the Web Submission via the REST API is using
>> the WebSubmissionJobClient.
>>
>> How can I wait on my Flink SQL streaming job when submitting through the
>> REST API?
>> --
>> Best Regards,
>> Yuval Itzchakov
>>
>


Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-21 Thread Debraj Manna
Any idea when can we expect
https://issues.apache.org/jira/browse/FLINK-25375 to be released?

On Mon, Dec 20, 2021 at 8:18 PM Martijn Visser 
wrote:

> Hi,
>
> The status and Flink ticket for upgrading to Log4j 2.17.0 can be tracked
> at https://issues.apache.org/jira/browse/FLINK-25375.
>
> Best regards,
>
> Martijn
>
> On Sat, 18 Dec 2021 at 16:50, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
>> Hi,
>>
>>
>>
>> It seems there is high severity vulnerability in log4j 2.16.0.(
>> CVE-2021-45105
>> )
>>
>> Refer : https://logging.apache.org/log4j/2.x/security.html
>>
>> Any update on this please?
>>
>>
>>
>> Regards,
>>
>> Suchithra
>>
>>
>>
>> *From:* Chesnay Schepler 
>> *Sent:* Thursday, December 16, 2021 4:35 PM
>> *To:* Parag Somani 
>> *Cc:* Michael Guterl ; V N, Suchithra (Nokia -
>> IN/Bangalore) ; Richard Deurwaarder <
>> rich...@xeli.eu>; user 
>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>>
>>
>> We will announce the releases when the binaries are available.
>>
>>
>>
>> On 16/12/2021 05:37, Parag Somani wrote:
>>
>> Thank you Chesnay for expediting this fix...!
>>
>>
>>
>> Can you suggest, when can I get binaries for 1.14.2 flink version?
>>
>>
>>
>> On Thu, Dec 16, 2021 at 5:52 AM Chesnay Schepler 
>> wrote:
>>
>> We will push docker images for all new releases, yes.
>>
>>
>>
>> On 16/12/2021 01:16, Michael Guterl wrote:
>>
>> Will you all be pushing Docker images for the 1.11.6 release?
>>
>>
>>
>> On Wed, Dec 15, 2021 at 3:26 AM Chesnay Schepler 
>> wrote:
>>
>> The current ETA is 40h for an official announcement.
>>
>> We are validating the release today (concludes in 16h), publish it
>> tonight, then wait for mirrors to be sync (about a day), then we announce
>> it.
>>
>>
>>
>> On 15/12/2021 12:08, V N, Suchithra (Nokia - IN/Bangalore) wrote:
>>
>> Hello,
>>
>>
>>
>> Could you please tell when we can expect Flink 1.12.7 release? We are
>> waiting for the CVE fix.
>>
>>
>>
>> Regards,
>>
>> Suchithra
>>
>>
>>
>>
>>
>> *From:* Chesnay Schepler  
>> *Sent:* Wednesday, December 15, 2021 4:04 PM
>> *To:* Richard Deurwaarder  
>> *Cc:* user  
>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>>
>>
>> We will also update the docker images.
>>
>>
>>
>> On 15/12/2021 11:29, Richard Deurwaarder wrote:
>>
>> Thanks for picking this up quickly!
>>
>>
>>
>> I saw you've made a second minor upgrade to upgrade to log4j2 2.16 which
>> is perfect.
>>
>>
>>
>> Just to clarify: Will you also push new docker images for these releases
>> as well? In particular flink 1.11.6 (Sorry we must upgrade soon! :()
>>
>>
>>
>> On Tue, Dec 14, 2021 at 2:33 AM narasimha  wrote:
>>
>> Thanks TImo, that was helpful.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 7:19 PM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>> Chesnay Thank you for the clarification.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 6:55 PM Chesnay Schepler 
>> wrote:
>>
>> The flink-shaded-zookeeper jars do not contain log4j.
>>
>>
>>
>> On 13/12/2021 14:11, Prasanna kumar wrote:
>>
>> Does Zookeeper have this vulnerability dependency ? I see references to
>> log4j in Shaded Zookeeper jar included as part of the flink distribution.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 1:40 PM Timo Walther  wrote:
>>
>> While we are working to upgrade the affected dependencies of all
>> components, we recommend users follow the advisory of the Apache Log4j
>> Community. Also Ververica platform can be patched with a similar approach:
>>
>> To configure the JVMs used by Ververica Platform, you can pass custom
>> Java options via the JAVA_TOOL_OPTIONS environment variable. Add the
>> following to your platform values.yaml, or append to the existing value
>> of JAVA_TOOL_OPTIONS if you are using it already there, then redeploy
>> the platform with Helm:
>> env:
>>- name: JAVA_TOOL_OPTIONS
>>  value: -Dlog4j2.formatMsgNoLookups=true
>>
>>
>> For any questions, please contact us via our support portal.
>>
>> Regards,
>> Timo
>>
>> On 11.12.21 06:45, narasimha wrote:
>> > Folks, what about the veverica platform. Is there any
>> mitigation around it?
>> >
>> > On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler > > > wrote:
>> >
>> > I would recommend to modify your log4j configurations to set
>> > log4j2.formatMsgNoLookups to true/./
>> > /
>> > /
>> > As far as I can tell this is equivalent to upgrading log4j, which
>> > just disabled this lookup by default.
>> > /
>> > /
>> > On 10/12/2021 10:21, Richard Deurwaarder wrote:
>> >> Hello,
>> >>
>> >> There has been a log4j2 vulnerability made public
>> >> https://www.randori.com/blog/cve-2021-44228/
>> >>  which is making
>> >> some waves :)
>> >> This post even explicitly mentions Apache Flink:
>> >>
>> 

Re: PyFlink Perfomance

2021-12-21 Thread Francis Conroy
Hi Dian, I'll build up something similar and post it, my current test code
contains proprietary information.

On Wed, 22 Dec 2021 at 14:49, Dian Fu  wrote:

> Hi Francis,
>
> Could you share the benchmark code you use?
>
> Regards,
> Dian
>
> On Wed, Dec 22, 2021 at 11:31 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> I've just run an analysis using a similar example which involves a single
>> python flatmap operator and we're getting 100x less through by using python
>> over java. I'm interested to know if you can do such a comparison. I'm
>> using Flink 14.0.
>>
>> Thanks,
>> Francis
>>
>> On Thu, 18 Nov 2021 at 02:20, Thomas Portugal 
>> wrote:
>>
>>> Hello community,
>>> My team is developing an application using Pyflink. We are using the
>>> Datastream API. Basically, we read from a kafka topic, do some maps, and
>>> write on another kafka topic. One restriction about it is the first map,
>>> that has to be serialized and with parallelism equals to one. This is
>>> causing a bottleneck on the throughput, and we are achieving approximately
>>> 2k msgs/sec. Monitoring the cpu usage and the number of records on each
>>> operator, it seems that the first operator is causing the issue.
>>> The first operator is like a buffer that groups the messages from kafka
>>> and sends them to the next operators. We are using a dequeue from python's
>>> collections. Since we are stuck on this issue, could you answer some
>>> questions about this matter?
>>>
>>> 1 - Using data structures from python can introduce some latency or
>>> increase the CPU usage?
>>> 2 - There are alternatives to this approach? We were thinking about
>>> Window structure, from Flink, but in our case it's not time based, and we
>>> didn't find an equivalent on python API.
>>> 3 - Using Table API to read from Kafka Topic and do the windowing can
>>> improve our performance?
>>>
>>> We already set some parameters like python.fn-execution.bundle.time and
>>> buffer.timeout to improve our performance.
>>>
>>> Thanks for your attention.
>>> Best Regards
>>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: 托管内存为什么不能够指定最小或者最大值?

2021-12-21 Thread Xintong Song
Network 和 JVM Overhead 之所以采用了 min-max,是因为这两项如果太小往往会导致
Failure,如果太大也并不会对性能有多少帮助属于资源浪费。

相反,Managed 内存有时候可以很小,甚至一些场景下可以为 0,且增大 Managed 内存通常是有助于提高性能的,所以设计上没有引入
min-max 的配置。

Thank you~

Xintong Song



On Wed, Dec 22, 2021 at 10:53 AM johnjlong  wrote:

> 大佬们,托管内存为什么不能够指定最小或者最大值?
> 还是说 taskmanager.memory.managed.fraction 计算出来的就是最大值?
>
>
> | |
> johnjlong
> |
> |
> johnjl...@163.com
> |
> 签名由网易邮箱大师定制


Re: PyFlink Perfomance

2021-12-21 Thread Dian Fu
Hi Francis,

Could you share the benchmark code you use?

Regards,
Dian

On Wed, Dec 22, 2021 at 11:31 AM Francis Conroy <
francis.con...@switchdin.com> wrote:

> I've just run an analysis using a similar example which involves a single
> python flatmap operator and we're getting 100x less through by using python
> over java. I'm interested to know if you can do such a comparison. I'm
> using Flink 14.0.
>
> Thanks,
> Francis
>
> On Thu, 18 Nov 2021 at 02:20, Thomas Portugal 
> wrote:
>
>> Hello community,
>> My team is developing an application using Pyflink. We are using the
>> Datastream API. Basically, we read from a kafka topic, do some maps, and
>> write on another kafka topic. One restriction about it is the first map,
>> that has to be serialized and with parallelism equals to one. This is
>> causing a bottleneck on the throughput, and we are achieving approximately
>> 2k msgs/sec. Monitoring the cpu usage and the number of records on each
>> operator, it seems that the first operator is causing the issue.
>> The first operator is like a buffer that groups the messages from kafka
>> and sends them to the next operators. We are using a dequeue from python's
>> collections. Since we are stuck on this issue, could you answer some
>> questions about this matter?
>>
>> 1 - Using data structures from python can introduce some latency or
>> increase the CPU usage?
>> 2 - There are alternatives to this approach? We were thinking about
>> Window structure, from Flink, but in our case it's not time based, and we
>> didn't find an equivalent on python API.
>> 3 - Using Table API to read from Kafka Topic and do the windowing can
>> improve our performance?
>>
>> We already set some parameters like python.fn-execution.bundle.time and
>> buffer.timeout to improve our performance.
>>
>> Thanks for your attention.
>> Best Regards
>>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: FlinkKafkaProducer deprecated in 1.14 but pyflink binding still present?

2021-12-21 Thread Francis Conroy
Thanks for the response Dian,

I made the changes to a fork of flink and have been using them. The changes
aren't ready to be merged back though as a lot is missing, documentation
updates, testing, etc.
Thanks,
Francis

On Wed, 27 Oct 2021 at 13:40, Dian Fu  wrote:

> Hi Francis,
>
> Yes, you are right. It's still not updated in PyFlink as
> KafkaSource/KafkaSink are still not supported in PyFlink. Hopeful we could
> add that support in 1.15 and then we could deprecate/remove the legacy
> interfaces.
>
> Regards,
> Dian
>
> On Tue, Oct 26, 2021 at 12:53 PM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Looks like this got deprecated in 1.14 in favour of KafkaSink/KafkaSource
>> but the python binding didn't get updated? Can someone confirm this?
>>
>> Francis Conroy
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: PyFlink Perfomance

2021-12-21 Thread Francis Conroy
I've just run an analysis using a similar example which involves a single
python flatmap operator and we're getting 100x less through by using python
over java. I'm interested to know if you can do such a comparison. I'm
using Flink 14.0.

Thanks,
Francis

On Thu, 18 Nov 2021 at 02:20, Thomas Portugal 
wrote:

> Hello community,
> My team is developing an application using Pyflink. We are using the
> Datastream API. Basically, we read from a kafka topic, do some maps, and
> write on another kafka topic. One restriction about it is the first map,
> that has to be serialized and with parallelism equals to one. This is
> causing a bottleneck on the throughput, and we are achieving approximately
> 2k msgs/sec. Monitoring the cpu usage and the number of records on each
> operator, it seems that the first operator is causing the issue.
> The first operator is like a buffer that groups the messages from kafka
> and sends them to the next operators. We are using a dequeue from python's
> collections. Since we are stuck on this issue, could you answer some
> questions about this matter?
>
> 1 - Using data structures from python can introduce some latency or
> increase the CPU usage?
> 2 - There are alternatives to this approach? We were thinking about Window
> structure, from Flink, but in our case it's not time based, and we didn't
> find an equivalent on python API.
> 3 - Using Table API to read from Kafka Topic and do the windowing can
> improve our performance?
>
> We already set some parameters like python.fn-execution.bundle.time and
> buffer.timeout to improve our performance.
>
> Thanks for your attention.
> Best Regards
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re:Re: flink固定延迟重启策略没有延迟

2021-12-21 Thread 宋品如
Hi:


谢谢回复,我明白了。还有一个问题:
日志里显示:Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=3)
这说明我的任务已经重启了2次依然失败了吗?
日志里我看不到重启的记录,这个应该怎么确认呢?
这里的报错是第一次重启还是最后一次重启导致的呢?


谢谢!














在 2021-12-22 10:47:24,"Caizhi Weng"  写道:
>Hi!
>
>log 里的这些信息是同一个 job 里不同的并发分别 fail(可以从 2/3 和 3/3 这两个不同的并发号看出来),并不是说这个 job
>fail 了两次。
>
>宋品如  于2021年12月22日周三 10:14写道:
>
>> 发件人: Song PinRu
>> 发送时间: 2021年12月21日 15:19
>> 收件人: user-zh@flink.apache.org
>> 主题: flink固定延迟重启策略没有延迟
>>
>>  Hi:
>> 昨天的邮件截图看不了,把日志贴上来重新发送一份
>> --
>>
>> 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,
>>
>> 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,
>>
>> 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。
>>
>> 有没有人能告诉我这是为什么?
>>
>>
>>
>>  设置重启策略的代码:
>> ```
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val backend = new
>> FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT)
>> env.setStateBackend(backend)
>> // 每 3ms 开始一次 checkpoint
>> env.enableCheckpointing(3)
>> // 设置模式为精确一次 (这是默认值)
>>
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>> // 确认 checkpoints 之间的时间会进行 500 ms
>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
>> // Checkpoint 必须在2分钟内完成,否则就会被抛弃
>> env.getCheckpointConfig.setCheckpointTimeout(12)
>> // 可容忍checkpoint失败次数
>> env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
>> // 同一时间只允许一个 checkpoint 进行
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>> //设置全局并行度
>> //  env.setParallelism(3)
>> //重启策略
>> //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用
>>
>>
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,
>> Time.seconds(30)))
>> ```
>>
>>
>>
>>
>>
>>
>> 日志:
>> ```
>>
>>  2021-12-21 06:26:50,850 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
>> source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a)
>> switched from RUNNING to FAILED on
>> container_e1595_1638345947522_0010_01_03 @ pbj-cdh-20-72.optaim.com
>> (dataPort=35530).
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>> send data to Kafka: This server is not the leader for that topic-partition.
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
>> ~[dws_module-1.0.4.jar:?]
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
>> ~[dws_module-1.0.4.jar:?]
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>> ~[dws_module-1.0.4.jar:?]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>> 

托管内存为什么不能够指定最小或者最大值?

2021-12-21 Thread johnjlong
大佬们,托管内存为什么不能够指定最小或者最大值?
还是说 taskmanager.memory.managed.fraction 计算出来的就是最大值?


| |
johnjlong
|
|
johnjl...@163.com
|
签名由网易邮箱大师定制

Re: flink固定延迟重启策略没有延迟

2021-12-21 Thread Caizhi Weng
Hi!

log 里的这些信息是同一个 job 里不同的并发分别 fail(可以从 2/3 和 3/3 这两个不同的并发号看出来),并不是说这个 job
fail 了两次。

宋品如  于2021年12月22日周三 10:14写道:

> 发件人: Song PinRu
> 发送时间: 2021年12月21日 15:19
> 收件人: user-zh@flink.apache.org
> 主题: flink固定延迟重启策略没有延迟
>
>  Hi:
> 昨天的邮件截图看不了,把日志贴上来重新发送一份
> --
>
> 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,
>
> 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,
>
> 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。
>
> 有没有人能告诉我这是为什么?
>
>
>
>  设置重启策略的代码:
> ```
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val backend = new
> FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT)
> env.setStateBackend(backend)
> // 每 3ms 开始一次 checkpoint
> env.enableCheckpointing(3)
> // 设置模式为精确一次 (这是默认值)
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> // 确认 checkpoints 之间的时间会进行 500 ms
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
> // Checkpoint 必须在2分钟内完成,否则就会被抛弃
> env.getCheckpointConfig.setCheckpointTimeout(12)
> // 可容忍checkpoint失败次数
> env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
> // 同一时间只允许一个 checkpoint 进行
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> //设置全局并行度
> //  env.setParallelism(3)
> //重启策略
> //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用
>
>
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,
> Time.seconds(30)))
> ```
>
>
>
>
>
>
> 日志:
> ```
>
>  2021-12-21 06:26:50,850 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a)
> switched from RUNNING to FAILED on
> container_e1595_1638345947522_0010_01_03 @ pbj-cdh-20-72.optaim.com
> (dataPort=35530).
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: This server is not the leader for that topic-partition.
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> ~[dws_module-1.0.4.jar:?]
>
> at
> 

Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-21 Thread Xintong Song
Sorry to join the discussion late.

+1 for dropping support for hadoop versions < 2.8 from my side.

TBH, warping the reflection based logic with safeguards sounds a bit
neither fish nor fowl to me. It weakens the major benefits that we look for
by dropping support for early versions.
- The codebase is simplified, but not significantly. We still have the
complexity of understanding which APIs may not exist in early versions.
- Without CI, we provide no guarantee that Flink will still work with early
hadoop versions. Or otherwise we fail to simplify the CI.

I'd suggest to say we no longer support hadoop versions < 2.8 at all. And
if that is not permitted by our users, we may consider to keep the codebase
as is and wait for a bit longer.

WDYT?

Thank you~

Xintong Song


[1]
https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility

On Wed, Dec 22, 2021 at 12:52 AM David Morávek  wrote:

> CC user@f.a.o
>
> Is anyone aware of something that blocks us from doing the upgrade?
>
> D.
>
> On Tue, Dec 21, 2021 at 5:50 PM David Morávek 
> wrote:
>
>> Hi Martijn,
>>
>> from person experience, most Hadoop users are lagging behind the release
>> lines by a lot, because upgrading a Hadoop cluster is not really a simply
>> task to achieve. I think for now, we can stay a bit conservative, nothing
>> blocks us for using 2.8.5 as we don't use any "newer" APIs in the code.
>>
>> As for Till's concern, we can still wrap the reflection based logic, to
>> be skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do
>> now.
>>
>> D.
>>
>>
>> On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser 
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is
>>> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1]
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>>
>>> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines
>>>
>>> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann 
>>> wrote:
>>>
>>> > Hi David,
>>> >
>>> > I think we haven't updated our Hadoop dependencies in a long time.
>>> Hence,
>>> > it is probably time to do so. So +1 for upgrading to the latest patch
>>> > release.
>>> >
>>> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then
>>> I
>>> > don't see a problem with dropping support for pre-bundled Hadoop
>>> versions <
>>> > 2.8. This could indeed help us decrease our build matrix a bit and,
>>> thus,
>>> > saving some build time.
>>> >
>>> > Concerning simplifying our code base to get rid of reflection logic
>>> etc. we
>>> > still might have to add a safeguard for features that are not
>>> supported by
>>> > earlier versions. According to the docs
>>> >
>>> > > YARN applications that attempt to use new APIs (including new fields
>>> in
>>> > data structures) that have not yet been deployed to the cluster can
>>> expect
>>> > link exceptions
>>> >
>>> > we can see link exceptions. We could get around this by saying that
>>> Flink
>>> > no longer supports Hadoop < 2.8. But this should be checked with our
>>> users
>>> > on the user ML at least.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek  wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > I'd like to start a discussion about upgrading a minimal Hadoop
>>> version
>>> > > that Flink supports.
>>> > >
>>> > > Even though the default value for `hadoop.version` property is set to
>>> > > 2.8.3, we're still ensuring both runtime and compile compatibility
>>> with
>>> > > Hadoop 2.4.x with the scheduled pipeline[1].
>>> > >
>>> > > Here is list of dates of the latest releases for each minor version
>>> up to
>>> > > 2.8.x
>>> > >
>>> > > - Hadoop 2.4.1: Last commit on 6/30/2014
>>> > > - Hadoop 2.5.2: Last commit on 11/15/2014
>>> > > - Hadoop 2.6.5: Last commit on 10/11/2016
>>> > > - Hadoop 2.7.7: Last commit on 7/18/2018
>>> > > - Hadoop 2.8.5: Last commit on 9/8/2018
>>> > >
>>> > > Since then there were two more minor releases in 2.x branch and four
>>> more
>>> > > minor releases in 3.x branch.
>>> > >
>>> > > Supporting the older version involves reflection-based "hacks" for
>>> > > supporting multiple versions.
>>> > >
>>> > > My proposal would be changing the minimum supported version *to
>>> 2.8.5*.
>>> > > This should simplify the hadoop related codebase and simplify the CI
>>> > build
>>> > > infrastructure as we won't have to test for the older versions.
>>> > >
>>> > > Please note that this only involves a minimal *client side*
>>> > compatibility.
>>> > > The wire protocol should remain compatible with earlier versions
>>> [2], so
>>> > we
>>> > > should be able to talk with any servers in 2.x major branch.
>>> > >
>>> > > One small note for the 2.8.x branch, some of the classes we need are
>>> only
>>> > > available in 2.8.4 version and above, but I'm not sure we should
>>> take an
>>> > > eventual need for upgrading a patch version into consideration here,

flink固定延迟重启策略没有延迟

2021-12-21 Thread 宋品如
发件人: Song PinRu
发送时间: 2021年12月21日 15:19
收件人: user-zh@flink.apache.org
主题: flink固定延迟重启策略没有延迟

 Hi:
昨天的邮件截图看不了,把日志贴上来重新发送一份
--

查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,

但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,

我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。

有没有人能告诉我这是为什么?



 设置重启策略的代码:
```

val env = StreamExecutionEnvironment.getExecutionEnvironment
val backend = new FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT)
env.setStateBackend(backend)
// 每 3ms 开始一次 checkpoint
env.enableCheckpointing(3)
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
// Checkpoint 必须在2分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(12)
// 可容忍checkpoint失败次数
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//设置全局并行度
//  env.setParallelism(3)
//重启策略
//PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用


env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(30)))
```






日志:
```

 2021-12-21 06:26:50,850 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a) switched 
from RUNNING to FAILED on container_e1595_1638345947522_0010_01_03 @ 
pbj-cdh-20-72.optaim.com (dataPort=35530).

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: This server is not the leader for that topic-partition.

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 

Re: Overwriting Flink Core InputStreamFactory

2021-12-21 Thread Caizhi Weng
Hi!

I see that you're submitting your job to Flink in a k8s environment. Could
you explain in detail how do you submit your job? For example did you put
your user jar under the lib directory and build a docker image from it?

Flink regards user classes as higher priority ones, so adding the class
with the same name and the same package in your project should do the trick.

AG  于2021年12月22日周三 03:38写道:

> I included the package org.apache.flink.api.common.io.compression in my
> intellij project and added the class GzipInflaterInputStreamFactory.
>
> The class just redefined the method `getCommonFileExtensions` to not
> recognize "gzip". I need this because of google cloud's transcoding
>  which I can't turn
> off. Essentially flink see's a .gz file in my cloud bucket and wants to
> read it as a gzip, but the google filesystem that gets called is reading it
> as inflated and I get the error "Not in Gzip Format".
>
> I tried shading the jar and submitting on k8s but Flink fails to recognize
> the new class. I'm wondering if there's a way to go about doing this that
> doesn't require rebuilding Flink from scratch?
>
> Warm regards,
>
> Aaron
>
>


Re: Flink Checkpoint Duration/Job Throughput

2021-12-21 Thread Caizhi Weng
Hi!

>From your description this is due to data skew. The common solution to data
skew is to add a random value to your partition keys so that data can be
distributed evenly into downstream operators.

Could you provide more information about your job (preferably user code or
SQL code), especially the operator that suffers from data skew?

Terry Heathcote  于2021年12月22日周三 02:53写道:

> Hi
>
> We are having trouble with record throughput that we believe to be a
> result of slow checkpoint durations. The job uses Kafka as both a source
> and sink as well as a Redis-backed service within the same cluster, used to
> enrich the data in a transformation, before writing records back to Kafka.
> Below is a description of the job:
>
>- Flink Version 12.5
>- Source topic = 24 partitions.
>- Multiple sink topics.
>- Parallelism set to 24.
>- Operators applied are a map function and process function to fetch
>the Redis data.
>- EXACTLY_ONCE processing is required.
>
> We have switched between aligned and unaligned checkpoints but with no
> improvement in performance. What we have witnessed is that on average the
> majority of operators and their respective subtasks acknowledge checkpoints
> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
> acknowledging the checkpoint. Also, the subtask load seems skewed after
> applying transformations prior to the sinks (tried to rebalance and shuffle
> here but with no improvement). Checkpoint duration can vary between 5s and
> 7 minutes.
>
> We believe this is slowing our overall job throughput as Kafka transaction
> commits are delayed by slower checkpointing, creating upstream
> backpressure, and a buildup on the source Kafka topic offsets. We would
> ideally like to decrease the checkpoint interval once durations are low and
> stable.
>
> Any help on this would be greatly appreciated.
>
> Best regards
> Terry
> ᐧ
>


Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Caizhi Weng
Hi!

You can poll the status of that job with REST API [1]. You can tell that
the job successfully finishes by the FINISHED state and that the job fails
by the FAILED state.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid

Yuval Itzchakov  于2021年12月22日周三 02:36写道:

> Hi,
>
> Flink 1.14.2
> Scala 2.12
>
> I have a streaming job that executes and I want to infinitely wait for
> it's completion, or if an exception is thrown during initialization. When
> using *statementSet.execute().await()*, I get an error:
>
> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
> cannot be fetched through the Job Client when in Web Submission.*
> at
> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
> at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
> ... 7 more
>
> This is because the Web Submission via the REST API is using
> the WebSubmissionJobClient.
>
> How can I wait on my Flink SQL streaming job when submitting through the
> REST API?
> --
> Best Regards,
> Yuval Itzchakov
>


Re: flink固定延迟重启策略没有延迟

2021-12-21 Thread Caizhi Weng
Hi!

截图无法在邮件中显示,可以使用外部图床上传,或直接把 log 的内容贴在邮件里。

Song PinRu  于2021年12月21日周二 15:54写道:

> 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,
>
> 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,
>
> 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。
>
> 有没有人能告诉我这是为什么?
>
>
>
> 日志的截图:
>
>
>
> [image: cid:image001.png@01D7F67C.D00DC560]
>
>
>
> [image: cid:image002.png@01D7F67C.D00DC560]
>
>
>
> 设置重启策略的代码:
>
> val env = StreamExecutionEnvironment.
> *getExecutionEnvironment *val backend = new FsStateBackend(CommonConfig.
> *FLINK_STATEBACKEND_CHECKPOINT*)
> env.setStateBackend(backend)
> // 每 3ms 开始一次 checkpoint
> env.enableCheckpointing(3)
> // 设置模式为精确一次 (这是默认值)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.
> *EXACTLY_ONCE*)
> // 确认 checkpoints 之间的时间会进行 500 ms
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
> // Checkpoint 必须在2分钟内完成,否则就会被抛弃
> env.getCheckpointConfig.setCheckpointTimeout(12)
> // 可容忍checkpoint失败次数
> env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
> // 同一时间只允许一个 checkpoint 进行
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> //设置全局并行度
> //  env.setParallelism(3)
> //重启策略
> //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用
> env.setRestartStrategy(RestartStrategies.*fixedDelayRestart*(2, Time.
> *seconds*(30)))
>
>
>


Re:Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread RS
跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit plan才会创建,和流模式不太一样




在 2021-12-21 20:06:08,"RS"  写道:
>slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
>
>
>
>
>
>在 2021-12-21 17:57:21,"刘建刚"  写道:
>>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
>>
>>RS  于2021年12月21日周二 16:53写道:
>>
>>> hi,
>>>
>>> 版本:flink1.14
>>>
>>> 模式:batch
>>>
>>> 测试场景:消费hive大量数据,计算某个字段的 top 10
>>>
>>>
>>> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>>>
>>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>>>
>>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>>>
>>>
>>>
>>>
>>> 测试SQL:
>>>
>>> SELECT price
>>>
>>> FROM hive.data.data1
>>>
>>> ORDER BY price DESC
>>>
>>> LIMIT 10;


Re: How to know if Job nodes are registered in cluster?

2021-12-21 Thread John Smith
Ok so only the leader will indicate it's the leader. The other just say
they are waiting for a lock...

On Tue., Dec. 21, 2021, 9:42 a.m. David Morávek,  wrote:

> Hi John,
>
> there is usually no need to run multiple JM, if you're able to start a new
> one quickly after failure (eg. when you're running on kubernetes). There is
> always only single active leader and other JMs effectively do nothing
> besides competing for the leadership. Zookeeper based HA uses the
> DefaultLeaderRetrievalService, which logs leadership information on DEBUG
> level.
>
> Best,
> D.
>
> On Sun, Dec 19, 2021 at 6:38 AM John Smith  wrote:
>
>> Hi running flink 1.10
>>
>> I have 3 zookeeper nodes and 3 job nodes.
>>
>> 1 nodes has specifically indicated that it was granted leadership with
>> token.
>> The other 2 job nodes. Indicate: Starting ZooKeeperLeaderElectionService
>> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
>> So is that enough to know. Usually isn't there some message printed on each
>> node indicating to each other who is leader and who is "present"?
>>
>


Re: org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat class missing from Flink 1.14 ?

2021-12-21 Thread Tuomas Pystynen
Timo,

Thanks, that was the problem. I was using flink-connector-jdbc_2.11-1.13.2.jar. 
Upgrading to flink-connector-jdbc_2.11-1.14.2.jar solved my problem.

Regards,
Tuomas 

> On 21. Dec 2021, at 11.14, Timo Walther  wrote:
> 
> Hi Tuomas,
> 
> are you sure that all dependencies have been upgraded to Flink 1.14. 
> Connector dependencies that still reference Flink 1.13 might cause issues.
> 
> JdbcBatchingOutputFormat has been refactored in this PR:
> 
> https://github.com/apache/flink/pull/16528
> 
> I hope this helps.
> 
> Regards,
> Timo
> 
> On 18.12.21 12:30, Tuomas Pystynen wrote:
>> I am just trying out Flink using images from Docker Hub. My simple insert 
>> using JDBC connector to Postgres database fails with this error
>> Flink SQL> insert into order_counts
>> > select customerid, count(*) order_count from orders group by customerid;
>> [INFO] Submitting SQL update statement to the cluster...
>> [INFO] SQL update statement has been successfully submitted to the cluster:
>> Job ID: 5d3b32bd8cc0f11dfe73cbf242793cc9
>> 2021-12-17 20:39:08,975 WARN  org.apache.flink.runtime.taskmanager.Task  
>>[] - GroupAggregate(groupBy=[customerid], select=[customerid, 
>> COUNT(*) AS order_count]) -> NotNullEnforcer(fields=[customerid]) -> Sink: 
>> Sink(table=[default_catalog.default_database.order_counts], 
>> fields=[customerid, order_count]) (1/1)#0 (12466e41fb377bcd45b3d22aab1cadfd) 
>> switched from INITIALIZING to FAILED with failure cause: 
>> java.lang.NoClassDefFoundError: 
>> org/apache/flink/runtime/util/ExecutorThreadFactory
>> at 
>> *org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open*(JdbcBatchingOutputFormat.java:118)
>> at 
>> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
>> at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>> at 
>> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
>> at 
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>> at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.runtime.util.ExecutorThreadFactory
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>> ... 15 more
>> After downgrading to Flink 1.13 the insert works fine. Has 
>> JdbcBatchingOutputFormat.java been removed by accident or purpose in 1.14 ?
>> Regards,
>> Tuomas Pystynen
> 



Overwriting Flink Core InputStreamFactory

2021-12-21 Thread AG
I included the package org.apache.flink.api.common.io.compression in my
intellij project and added the class GzipInflaterInputStreamFactory.

The class just redefined the method `getCommonFileExtensions` to not
recognize "gzip". I need this because of google cloud's transcoding
 which I can't turn off.
Essentially flink see's a .gz file in my cloud bucket and wants to read it
as a gzip, but the google filesystem that gets called is reading it as
inflated and I get the error "Not in Gzip Format".

I tried shading the jar and submitting on k8s but Flink fails to recognize
the new class. I'm wondering if there's a way to go about doing this that
doesn't require rebuilding Flink from scratch?

Warm regards,

Aaron


Flink Checkpoint Duration/Job Throughput

2021-12-21 Thread Terry Heathcote
Hi

We are having trouble with record throughput that we believe to be a result
of slow checkpoint durations. The job uses Kafka as both a source and sink
as well as a Redis-backed service within the same cluster, used to enrich
the data in a transformation, before writing records back to Kafka. Below
is a description of the job:

   - Flink Version 12.5
   - Source topic = 24 partitions.
   - Multiple sink topics.
   - Parallelism set to 24.
   - Operators applied are a map function and process function to fetch the
   Redis data.
   - EXACTLY_ONCE processing is required.

We have switched between aligned and unaligned checkpoints but with no
improvement in performance. What we have witnessed is that on average the
majority of operators and their respective subtasks acknowledge checkpoints
within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
acknowledging the checkpoint. Also, the subtask load seems skewed after
applying transformations prior to the sinks (tried to rebalance and shuffle
here but with no improvement). Checkpoint duration can vary between 5s and
7 minutes.

We believe this is slowing our overall job throughput as Kafka transaction
commits are delayed by slower checkpointing, creating upstream
backpressure, and a buildup on the source Kafka topic offsets. We would
ideally like to decrease the checkpoint interval once durations are low and
stable.

Any help on this would be greatly appreciated.

Best regards
Terry
ᐧ


Waiting on streaming job with StatementSet.execute() when using Web Submission

2021-12-21 Thread Yuval Itzchakov
Hi,

Flink 1.14.2
Scala 2.12

I have a streaming job that executes and I want to infinitely wait for it's
completion, or if an exception is thrown during initialization. When using
*statementSet.execute().await()*, I get an error:

Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
cannot be fetched through the Job Client when in Web Submission.*
at
org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
at
org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more

This is because the Web Submission via the REST API is using
the WebSubmissionJobClient.

How can I wait on my Flink SQL streaming job when submitting through the
REST API?
-- 
Best Regards,
Yuval Itzchakov


Re: Kryo EOFException: No more bytes left

2021-12-21 Thread Dan Hill
I was not able to reproduce it by re-running the same job with an updated
kryo library.  The join doesn't do anything special.

On Sun, Dec 19, 2021 at 4:58 PM Dan Hill  wrote:

> I'll retry the job to see if it's reproducible. The serialized state is
> bad so that run keeps failing.
>
> On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang 
> wrote:
>
>> Hi Dan,
>>
>> Could you provide the code snippet such that we can reproduce the bug
>> here?
>>
>> Dan Hill  于2021年12月20日周一 07:18写道:
>>
>>> Hi.
>>>
>>> I was curious if anyone else has hit this exception.  I'm using the
>>> IntervalJoinOperator to two streams of protos.  I registered the protos
>>> with a kryo serializer.  I started hitting this issue which looks like the
>>> operator is trying to deserialize a bad set of bytes that it serialized.
>>> I'm not doing anything weird or custom with the code.  It's a pretty simple
>>> interval join.
>>>
>>> Has anyone hit this before?  How have people solved this?  I skimmed the
>>> operator code and don't see an easy way to exclude the bad serialized
>>> bytes.  I could fork the interval join code and have a route that writes
>>> badly serialized
>>>
>>> A couple ideas:
>>> 1. I could fork the interval join code and have a route to handle bad
>>> serialization.
>>> 2. Maybe there's a weird case where the bytes become empty and this is
>>> an exception given for an empty array of bytes?
>>>
>>> Could this be a version issue?  My Flink version is v1.12.3 and
>>> Twitter/chill v0.9.4.
>>>
>>> Thoughts?
>>>
>>>
>>> java.lang.RuntimeException: Could not create class
>>> com.myexample.proto.MyJoinedOutput
>>>
>>> at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>> at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>> at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> at
>>> 

Re: Avoiding Dynamic Classloading for User Code

2021-12-21 Thread David Morávek
"+ *setting class-loading strategy to parent-first *could" ... otherwise
the classes will be always loaded from the jar provided via the REST API

On Tue, Dec 21, 2021 at 5:46 PM Lior Liviev  wrote:

> So again, after putting the jar in that folder I don’t need to configure
> anything else?
>
> Get Outlook for iOS 
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 6:39:10 PM
> *To:* Lior Liviev 
> *Cc:* user 
> *Subject:* Re: Avoiding Dynamic Classloading for User Code
>
>
> *CAUTION*: external source
> hmm, with this approach I can only think about not really nice
> solutions... I guess putting a jar into `/lib` folder + setting
> class-loading strategy to parent-first could do the trick (load everything
> in the "main" class loader), but then using this endpoint / deployment path
> for submission kind of seems to lack purpose.
>
> on the other hand, I don't really think we provide a nice way to achieve
> this with session cluster right now :(
>
> D.
>
> On Tue, Dec 21, 2021 at 5:27 PM Lior Liviev 
> wrote:
>
> Yes, I’m using "/jars/:jarid/run"
>
> Get Outlook for iOS
> 
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 6:08:51 PM
> *To:* Lior Liviev ; user 
> *Subject:* Re: Avoiding Dynamic Classloading for User Code
>
>
> *CAUTION*: external source
> Please always include the ML in the reply-list, so other can participate
> in the discussion / learn from the findings
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation (a
> lot of small batch jobs) and could be fixed by accounting for when setting
> Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
> Could you by any chance try to submit jobs with the Flink CLI instead?
> That should be more robust when it comes to the class loading issues.
>
> Which endpoint are you using for submitting the job? "/jars/:jarid/run"?
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
> 
> [2] https://issues.apache.org/jira/browse/FLINK-25027
> 
> [3] https://issues.apache.org/jira/browse/FLINK-25023
> 
>
> On Tue, Dec 21, 2021 at 4:49 PM Lior Liviev 
> wrote:
>
> Yes, I use the REST API. I'm running into OOM Metaspace, and I think it's
> a class-loading problem, so that's why I'm thinking of putting the jar in
> flink/lib
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 5:43 PM
> *To:* Lior Liviev 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Avoiding Dynamic Classloading for User Code
>
>
> *CAUTION*: external source
> Hi Lior,
>
> can you please provide details about the steps (I'm not sure what load jar
> / execute with the API means)? are you submitting the job using the REST
> API or Flink CLI? I assume you're using a session cluster.
>
> also what is the concern here? do 

Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-21 Thread David Morávek
CC user@f.a.o

Is anyone aware of something that blocks us from doing the upgrade?

D.

On Tue, Dec 21, 2021 at 5:50 PM David Morávek 
wrote:

> Hi Martijn,
>
> from person experience, most Hadoop users are lagging behind the release
> lines by a lot, because upgrading a Hadoop cluster is not really a simply
> task to achieve. I think for now, we can stay a bit conservative, nothing
> blocks us for using 2.8.5 as we don't use any "newer" APIs in the code.
>
> As for Till's concern, we can still wrap the reflection based logic, to be
> skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do
> now.
>
> D.
>
>
> On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser 
> wrote:
>
>> Hi David,
>>
>> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is
>> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1]
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines
>>
>> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann  wrote:
>>
>> > Hi David,
>> >
>> > I think we haven't updated our Hadoop dependencies in a long time.
>> Hence,
>> > it is probably time to do so. So +1 for upgrading to the latest patch
>> > release.
>> >
>> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then I
>> > don't see a problem with dropping support for pre-bundled Hadoop
>> versions <
>> > 2.8. This could indeed help us decrease our build matrix a bit and,
>> thus,
>> > saving some build time.
>> >
>> > Concerning simplifying our code base to get rid of reflection logic
>> etc. we
>> > still might have to add a safeguard for features that are not supported
>> by
>> > earlier versions. According to the docs
>> >
>> > > YARN applications that attempt to use new APIs (including new fields
>> in
>> > data structures) that have not yet been deployed to the cluster can
>> expect
>> > link exceptions
>> >
>> > we can see link exceptions. We could get around this by saying that
>> Flink
>> > no longer supports Hadoop < 2.8. But this should be checked with our
>> users
>> > on the user ML at least.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek  wrote:
>> >
>> > > Hi,
>> > >
>> > > I'd like to start a discussion about upgrading a minimal Hadoop
>> version
>> > > that Flink supports.
>> > >
>> > > Even though the default value for `hadoop.version` property is set to
>> > > 2.8.3, we're still ensuring both runtime and compile compatibility
>> with
>> > > Hadoop 2.4.x with the scheduled pipeline[1].
>> > >
>> > > Here is list of dates of the latest releases for each minor version
>> up to
>> > > 2.8.x
>> > >
>> > > - Hadoop 2.4.1: Last commit on 6/30/2014
>> > > - Hadoop 2.5.2: Last commit on 11/15/2014
>> > > - Hadoop 2.6.5: Last commit on 10/11/2016
>> > > - Hadoop 2.7.7: Last commit on 7/18/2018
>> > > - Hadoop 2.8.5: Last commit on 9/8/2018
>> > >
>> > > Since then there were two more minor releases in 2.x branch and four
>> more
>> > > minor releases in 3.x branch.
>> > >
>> > > Supporting the older version involves reflection-based "hacks" for
>> > > supporting multiple versions.
>> > >
>> > > My proposal would be changing the minimum supported version *to
>> 2.8.5*.
>> > > This should simplify the hadoop related codebase and simplify the CI
>> > build
>> > > infrastructure as we won't have to test for the older versions.
>> > >
>> > > Please note that this only involves a minimal *client side*
>> > compatibility.
>> > > The wire protocol should remain compatible with earlier versions [2],
>> so
>> > we
>> > > should be able to talk with any servers in 2.x major branch.
>> > >
>> > > One small note for the 2.8.x branch, some of the classes we need are
>> only
>> > > available in 2.8.4 version and above, but I'm not sure we should take
>> an
>> > > eventual need for upgrading a patch version into consideration here,
>> > > because both 2.8.4 and 2.8.5 are pretty old.
>> > >
>> > > WDYT, is it already time to upgrade? Looking forward for any thoughts
>> on
>> > > the topic!
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/release-1.14.0/tools/azure-pipelines/build-apache-repo.yml#L123
>> > > [2]
>> > >
>> > >
>> >
>> https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility
>> > >
>> > > Best,
>> > > D.
>> > >
>> >
>>
>


Re: Avoiding Dynamic Classloading for User Code

2021-12-21 Thread Lior Liviev
So again, after putting the jar in that folder I don’t need to configure 
anything else?

Get Outlook for iOS

From: David Morávek 
Sent: Tuesday, December 21, 2021 6:39:10 PM
To: Lior Liviev 
Cc: user 
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

hmm, with this approach I can only think about not really nice solutions... I 
guess putting a jar into `/lib` folder + setting class-loading strategy to 
parent-first could do the trick (load everything in the "main" class loader), 
but then using this endpoint / deployment path for submission kind of seems to 
lack purpose.

on the other hand, I don't really think we provide a nice way to achieve this 
with session cluster right now :(

D.

On Tue, Dec 21, 2021 at 5:27 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
Yes, I’m using "/jars/:jarid/run"

Get Outlook for 
iOS

From: David Morávek mailto:d...@apache.org>>
Sent: Tuesday, December 21, 2021 6:08:51 PM
To: Lior Liviev mailto:lior.liv...@earnix.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Please always include the ML in the reply-list, so other can participate in the 
discussion / learn from the findings

we are aware of multiple issues when web-submission can result in
classloader / thread local leaks, which could potentially result in the
behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a
lot of small batch jobs) and could be fixed by accounting for when setting
Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via
rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way,
that the "main method" of the uploaded jar is executed on JobManager and
it's really hard to isolate it's execution from possible side effects.

Could you by any chance try to submit jobs with the Flink CLI instead? That 
should be more robust when it comes to the class loading issues.

Which endpoint are you using for submitting the job? "/jars/:jarid/run"?

[1] 
https://issues.apache.org/jira/browse/FLINK-25022
[2] 
https://issues.apache.org/jira/browse/FLINK-25027
[3] 
https://issues.apache.org/jira/browse/FLINK-25023

On Tue, Dec 21, 2021 at 4:49 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
Yes, I use the REST API. I'm running into OOM Metaspace, and I think it's a 
class-loading problem, so that's why I'm thinking of putting the jar in 
flink/lib

From: David Morávek mailto:d...@apache.org>>
Sent: Tuesday, December 21, 2021 5:43 PM
To: Lior Liviev mailto:lior.liv...@earnix.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Hi Lior,

can you please provide details about the steps (I'm not sure what load jar / 
execute with the API means)? are you submitting the job using the REST API or 
Flink CLI? I assume you're using a session cluster.

also what is the concern here? do you run into any class-loading related issues?

D.

On Tue, Dec 21, 2021 at 3:48 PM Lior Liviev 

Re: Avoiding Dynamic Classloading for User Code

2021-12-21 Thread David Morávek
hmm, with this approach I can only think about not really nice solutions...
I guess putting a jar into `/lib` folder + setting class-loading strategy
to parent-first could do the trick (load everything in the "main" class
loader), but then using this endpoint / deployment path for submission kind
of seems to lack purpose.

on the other hand, I don't really think we provide a nice way to achieve
this with session cluster right now :(

D.

On Tue, Dec 21, 2021 at 5:27 PM Lior Liviev  wrote:

> Yes, I’m using "/jars/:jarid/run"
>
> Get Outlook for iOS 
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 6:08:51 PM
> *To:* Lior Liviev ; user 
> *Subject:* Re: Avoiding Dynamic Classloading for User Code
>
>
> *CAUTION*: external source
> Please always include the ML in the reply-list, so other can participate
> in the discussion / learn from the findings
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation (a
> lot of small batch jobs) and could be fixed by accounting for when setting
> Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
> Could you by any chance try to submit jobs with the Flink CLI instead?
> That should be more robust when it comes to the class loading issues.
>
> Which endpoint are you using for submitting the job? "/jars/:jarid/run"?
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
> 
> [2] https://issues.apache.org/jira/browse/FLINK-25027
> 
> [3] https://issues.apache.org/jira/browse/FLINK-25023
> 
>
> On Tue, Dec 21, 2021 at 4:49 PM Lior Liviev 
> wrote:
>
> Yes, I use the REST API. I'm running into OOM Metaspace, and I think it's
> a class-loading problem, so that's why I'm thinking of putting the jar in
> flink/lib
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 5:43 PM
> *To:* Lior Liviev 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Avoiding Dynamic Classloading for User Code
>
>
> *CAUTION*: external source
> Hi Lior,
>
> can you please provide details about the steps (I'm not sure what load jar
> / execute with the API means)? are you submitting the job using the REST
> API or Flink CLI? I assume you're using a session cluster.
>
> also what is the concern here? do you run into any class-loading related
> issues?
>
> D.
>
> On Tue, Dec 21, 2021 at 3:48 PM Lior Liviev 
> wrote:
>
> Hello, I have existing fixed cluster (*not* a new one with every job
> execution) and a single Jar +multiple executions with different params.
>
> Currently my procedure is: 1. Download Jar 2. Load Jar with API  3.
> Execute with API.
> I plan to avoid dynamic class loading by applying method described in:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
> 

Re: Avoiding Dynamic Classloading for User Code

2021-12-21 Thread Lior Liviev
Yes, I’m using "/jars/:jarid/run"

Get Outlook for iOS

From: David Morávek 
Sent: Tuesday, December 21, 2021 6:08:51 PM
To: Lior Liviev ; user 
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Please always include the ML in the reply-list, so other can participate in the 
discussion / learn from the findings

we are aware of multiple issues when web-submission can result in
classloader / thread local leaks, which could potentially result in the
behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a
lot of small batch jobs) and could be fixed by accounting for when setting
Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via
rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way,
that the "main method" of the uploaded jar is executed on JobManager and
it's really hard to isolate it's execution from possible side effects.

Could you by any chance try to submit jobs with the Flink CLI instead? That 
should be more robust when it comes to the class loading issues.

Which endpoint are you using for submitting the job? "/jars/:jarid/run"?

[1] 
https://issues.apache.org/jira/browse/FLINK-25022
[2] 
https://issues.apache.org/jira/browse/FLINK-25027
[3] 
https://issues.apache.org/jira/browse/FLINK-25023

On Tue, Dec 21, 2021 at 4:49 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
Yes, I use the REST API. I'm running into OOM Metaspace, and I think it's a 
class-loading problem, so that's why I'm thinking of putting the jar in 
flink/lib

From: David Morávek mailto:d...@apache.org>>
Sent: Tuesday, December 21, 2021 5:43 PM
To: Lior Liviev mailto:lior.liv...@earnix.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Hi Lior,

can you please provide details about the steps (I'm not sure what load jar / 
execute with the API means)? are you submitting the job using the REST API or 
Flink CLI? I assume you're using a session cluster.

also what is the concern here? do you run into any class-loading related issues?

D.

On Tue, Dec 21, 2021 at 3:48 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:

Hello, I have existing fixed cluster (not a new one with every job execution) 
and a single Jar +multiple executions with different params.

Currently my procedure is: 1. Download Jar 2. Load Jar with API  3. Execute 
with API.

I plan to avoid dynamic class loading by applying method described in: 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
Debugging Classloading | Apache 

Re: Class loader

2021-12-21 Thread David Morávek
to answer that, I need a better understanding of how exactly you deploy the
jobs. Please keep the conversation focused in the second thread, that
discusses the problem

D.

On Tue, Dec 21, 2021 at 5:09 PM Lior Liviev  wrote:

> I don’t have the Jar in Flinks folder yet it’s just that I don’t know if
> it will resolve my oom metaspace problem
>
> Get Outlook for iOS 
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 5:59:05 PM
> *To:* Lior Liviev ; user 
> *Subject:* Re: Class loader
>
>
> *CAUTION*: external source
>
> if I have my user code Jar in Flink
>
>
> I assume this means that the user code is already on Flink's class path
> (eg. /lib folder).
>
> Then it depends on how the class-loader is set up [1] and on how you
> submit the job. For session cluster, each job is isolated in a separate
> classloader and by default Flink uses the child-first resolution strategy,
> that first tries to load classes from the submitted job JAR.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-resolve-order
> 
>
> On Tue, Dec 21, 2021 at 4:50 PM Lior Liviev 
> wrote:
>
> Actually no, it's a separate question
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 5:45 PM
> *To:* Lior Liviev 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Class loader
>
>
> *CAUTION*: external source
> I assume this is a duplicate of the previous thread [1]
>
> [1] https://lists.apache.org/thread/16kxytrqycolfwfmr5tv0g6bq9m2wvof
> 
>
> Best,
> D.
>
> On Tue, Dec 21, 2021 at 3:53 PM Lior Liviev 
> wrote:
>
> Hello, I wanted to know if I have my user code Jar in Flink, and I'm
> running it 3 times, will the class loader take the same classes at every
> execution?
>
> Do not click on links or open attachments unless you recognize the sender.
> Please use the report button if you believe this email is suspicious.
>
> Do not click on links or open attachments unless you recognize the sender.
> Please use the report button if you believe this email is suspicious.
>


Re: Avoiding Dynamic Classloading for User Code

2021-12-21 Thread David Morávek
Please always include the ML in the reply-list, so other can participate in
the discussion / learn from the findings

we are aware of multiple issues when web-submission can result in
classloader / thread local leaks, which could potentially result in the
behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a
lot of small batch jobs) and could be fixed by accounting for when setting
Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via
rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way,
that the "main method" of the uploaded jar is executed on JobManager and
it's really hard to isolate it's execution from possible side effects.

Could you by any chance try to submit jobs with the Flink CLI instead? That
should be more robust when it comes to the class loading issues.

Which endpoint are you using for submitting the job? "/jars/:jarid/run"?

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

On Tue, Dec 21, 2021 at 4:49 PM Lior Liviev  wrote:

> Yes, I use the REST API. I'm running into OOM Metaspace, and I think it's
> a class-loading problem, so that's why I'm thinking of putting the jar in
> flink/lib
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 5:43 PM
> *To:* Lior Liviev 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Avoiding Dynamic Classloading for User Code
>
>
> *CAUTION*: external source
> Hi Lior,
>
> can you please provide details about the steps (I'm not sure what load jar
> / execute with the API means)? are you submitting the job using the REST
> API or Flink CLI? I assume you're using a session cluster.
>
> also what is the concern here? do you run into any class-loading related
> issues?
>
> D.
>
> On Tue, Dec 21, 2021 at 3:48 PM Lior Liviev 
> wrote:
>
> Hello, I have existing fixed cluster (*not* a new one with every job
> execution) and a single Jar +multiple executions with different params.
>
> Currently my procedure is: 1. Download Jar 2. Load Jar with API  3.
> Execute with API.
> I plan to avoid dynamic class loading by applying method described in:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
> 
> Debugging Classloading | Apache Flink
> 
> Debugging Classloading # Overview of Classloading in Flink # When running
> Flink applications, the JVM will load various classes over time. These
> classes can be divided into three groups based on their origin: The Java
> Classpath: This is Java’s common classpath, and it includes the JDK
> libraries, and all code in Flink’s /lib folder (the classes of Apache Flink
> and some dependencies).
> nightlies.apache.org
> 
> My question is:
>
> After putting the Jar in $FLINK/lib, do I need to load Jar and execute it
> the old way, or what?
>
> Do not click on links or open attachments unless you recognize the sender.
> Please use the report button if you believe this email is suspicious.
>


Re: Class loader

2021-12-21 Thread David Morávek
>
> if I have my user code Jar in Flink
>

I assume this means that the user code is already on Flink's class path
(eg. /lib folder).

Then it depends on how the class-loader is set up [1] and on how you submit
the job. For session cluster, each job is isolated in a separate
classloader and by default Flink uses the child-first resolution strategy,
that first tries to load classes from the submitted job JAR.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-resolve-order

On Tue, Dec 21, 2021 at 4:50 PM Lior Liviev  wrote:

> Actually no, it's a separate question
> --
> *From:* David Morávek 
> *Sent:* Tuesday, December 21, 2021 5:45 PM
> *To:* Lior Liviev 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Class loader
>
>
> *CAUTION*: external source
> I assume this is a duplicate of the previous thread [1]
>
> [1] https://lists.apache.org/thread/16kxytrqycolfwfmr5tv0g6bq9m2wvof
> 
>
> Best,
> D.
>
> On Tue, Dec 21, 2021 at 3:53 PM Lior Liviev 
> wrote:
>
> Hello, I wanted to know if I have my user code Jar in Flink, and I'm
> running it 3 times, will the class loader take the same classes at every
> execution?
>
> Do not click on links or open attachments unless you recognize the sender.
> Please use the report button if you believe this email is suspicious.
>


Re: Class loader

2021-12-21 Thread David Morávek
I assume this is a duplicate of the previous thread [1]

[1] https://lists.apache.org/thread/16kxytrqycolfwfmr5tv0g6bq9m2wvof

Best,
D.

On Tue, Dec 21, 2021 at 3:53 PM Lior Liviev  wrote:

> Hello, I wanted to know if I have my user code Jar in Flink, and I'm
> running it 3 times, will the class loader take the same classes at every
> execution?
>


Re: Avoiding Dynamic Classloading for User Code

2021-12-21 Thread David Morávek
Hi Lior,

can you please provide details about the steps (I'm not sure what load jar
/ execute with the API means)? are you submitting the job using the REST
API or Flink CLI? I assume you're using a session cluster.

also what is the concern here? do you run into any class-loading related
issues?

D.

On Tue, Dec 21, 2021 at 3:48 PM Lior Liviev  wrote:

> Hello, I have existing fixed cluster (*not* a new one with every job
> execution) and a single Jar +multiple executions with different params.
>
> Currently my procedure is: 1. Download Jar 2. Load Jar with API  3.
> Execute with API.
> I plan to avoid dynamic class loading by applying method described in:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
> Debugging Classloading | Apache Flink
> 
> Debugging Classloading # Overview of Classloading in Flink # When running
> Flink applications, the JVM will load various classes over time. These
> classes can be divided into three groups based on their origin: The Java
> Classpath: This is Java’s common classpath, and it includes the JDK
> libraries, and all code in Flink’s /lib folder (the classes of Apache Flink
> and some dependencies).
> nightlies.apache.org
> My question is:
>
> After putting the Jar in $FLINK/lib, do I need to load Jar and execute it
> the old way, or what?
>


Class loader

2021-12-21 Thread Lior Liviev
Hello, I wanted to know if I have my user code Jar in Flink, and I'm running it 
3 times, will the class loader take the same classes at every execution?


Avoiding Dynamic Classloading for User Code

2021-12-21 Thread Lior Liviev
Hello, I have existing fixed cluster (not a new one with every job execution) 
and a single Jar +multiple executions with different params.

Currently my procedure is: 1. Download Jar 2. Load Jar with API  3. Execute 
with API.

I plan to avoid dynamic class loading by applying method described in: 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
Debugging Classloading | Apache 
Flink
Debugging Classloading # Overview of Classloading in Flink # When running Flink 
applications, the JVM will load various classes over time. These classes can be 
divided into three groups based on their origin: The Java Classpath: This is 
Java’s common classpath, and it includes the JDK libraries, and all code in 
Flink’s /lib folder (the classes of Apache Flink and some dependencies).
nightlies.apache.org
My question is:

After putting the Jar in $FLINK/lib, do I need to load Jar and execute it the 
old way, or what?


Re: How to know if Job nodes are registered in cluster?

2021-12-21 Thread David Morávek
Hi John,

there is usually no need to run multiple JM, if you're able to start a new
one quickly after failure (eg. when you're running on kubernetes). There is
always only single active leader and other JMs effectively do nothing
besides competing for the leadership. Zookeeper based HA uses the
DefaultLeaderRetrievalService, which logs leadership information on DEBUG
level.

Best,
D.

On Sun, Dec 19, 2021 at 6:38 AM John Smith  wrote:

> Hi running flink 1.10
>
> I have 3 zookeeper nodes and 3 job nodes.
>
> 1 nodes has specifically indicated that it was granted leadership with
> token.
> The other 2 job nodes. Indicate: Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
> So is that enough to know. Usually isn't there some message printed on each
> node indicating to each other who is leader and who is "present"?
>


Re: Read parquet data from S3 with Flink 1.12

2021-12-21 Thread Seth Wiesman
Hi Alexandre,

You are correct, BatchTableEnvironment does not exist in 1.14 anymore. In
1.15 we will have the state processor API ported to DataStream for exactly
this reason, it is the last piece to begin officially marking DataSet as
deprecated. As you can understand, this has been a multi year process and
there have been some rough edges as components are migrated.

The easiest solution is for you to use 1.12 DataSet <-> Table interop. Any
savepoint you create using Flink 1.12 you should be able to restore on a
1.14 DataStream application.

I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
hard requirement, rewriting your input data into another format could also
be a viable stop-gap solution.

Seth

On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> wrote:

> Hello,
>
> I also face the same issue as documented in a previous mail from the
> mailing list [1]
> Basically when using flink-parquet, I get:
>
>>  java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
>
> I have no idea what I need to do to fix this and could not find anything
> from the doc. I tried importing various hadoop libraries, but it always
> causes yet another issue.
>
> I think this might be the root cause of my problem.
>
> Best,
> Alex
>
> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
>
> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
> alexandre.montecu...@grabtaxi.com> wrote:
>
>> Hello Piotrek,
>> Thank you for the help.
>> Regarding the S3 issue I have followed the documentation for the plugins.
>> Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
>> Also, in this case, just reading regular plain text file works, I only
>> have an issue when using Parquet.
>>
>> I tried switching to Flink 1.14, however I am stumbling upon other
>> blockers.
>> To give more context, I am trying to build a Flink savepoint for cold
>> start data. So I am using the Flink State Processor API. But:
>>  -  Flink State Processor API is using the DataSet api which is now
>> marked as deprecated (Legacy)
>>  - the doc you shared regarding reading from Parquet uses the DataStream
>> API
>>  - the Flink State Processor API doc [1] states there is interoperability
>> of DataSet and Table API
>> 
>>  (but the link is now erroneous), it was last correct in Flink 1.12 [2]
>>
>> Given that we can convert from DataStream to Table API, I was thinking I
>> could then convert from Table to DataSet API (though very cumbersome and
>> unsure if any performance / memory impact).
>> But for the Table to DataSet conversion, the doc is using a 
>> BatchTableEnvironment
>> class which does not seem to exist in Flink 1.14 anymore
>>
>> Any recommendations or anything I might have missed?
>>
>> Thank you.
>>
>> Best,
>> Alex
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>>
>> 
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>>
>>
>> On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Reading in the DataStream API (that's what I'm using you are doing) from
>>> Parquet files is officially supported and documented only since 1.14 [1].
>>> Before that it was only supported for the Table API. As far as I can tell,
>>> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
>>> already been in the code base since 1.12.x. I don't know how stable it was
>>> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
>>> a last resort you can try using the very least the latest version of 1.12.x
>>> branch as documented by 1.14 version, but I can not guarantee that it will
>>> be working.
>>>
>>> Regarding the S3 issue, have you followed the documentation? [2][3]
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html
>>>
>>>
>>> pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
>>> alexandre.montecu...@grabtaxi.com> napisał(a):
>>>
 Hello everyone,
 I am struggling to read S3 parquet files from S3 with Flink Streaming
 1.12.2
 I had some difficulty simply reading from local parquet files. I
 finally managed that part, though the solution feels 

Re:实时读取hive参数不生效

2021-12-21 Thread RS
set table.dynamic-table-options.enabled=true;

sql-client的话这样配置,不要引号


在 2021-12-21 20:20:11,"Fei Han"  写道:
>@all:
>大家好!
>  我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢?
>版本如下:
>Flink版本1.13.3
>   Hive版本hive2.1.1-CDH6.2.0
>设置的参数是set 'table.dynamic-table-options.enabled'='true'
>报错如下:
>INSERT INTO qhc_catalog.qhc_hms.qhc_ods_assassin_dept
>select * from qhc_catalog.qhc_assassin_ods.assassin_dept /*+ 
>OPTIONS('streaming-source.enable'='true', 
>'streaming-source.partition-order'='create-time') */
>2021-12-21 19:56:45,198 ERROR com.flink.streaming.core.JobApplication  
>[] - 任务执行失败:
>org.apache.flink.table.api.ValidationException: The 'OPTIONS' hint is allowed 
>only when the config option 'table.dynamic-table-options.enabled' is set to 
>true.
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createFinalCatalogTable(CatalogSourceTable.java:104)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:79)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2140)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> com.flink.streaming.core.execute.ExecuteSql.exeSql(ExecuteSql.java:38) 
> ~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
>   at com.flink.streaming.core.JobApplication.main(JobApplication.java:80) 
> ~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_211]
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_211]
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_211]
>   at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
> 

回复:实时读取hive参数不生效

2021-12-21 Thread JasonLee
hi


看上面似乎这个参数没有生效,你是在哪里设置的呢?


Best
JasonLee


在2021年12月21日 20:20,Fei Han 写道:
@all:
大家好!
我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢?
版本如下:
Flink版本1.13.3
Hive版本hive2.1.1-CDH6.2.0
设置的参数是set 'table.dynamic-table-options.enabled'='true'
报错如下:
INSERT INTO qhc_catalog.qhc_hms.qhc_ods_assassin_dept
select * from qhc_catalog.qhc_assassin_ods.assassin_dept /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition-order'='create-time') */
2021-12-21 19:56:45,198 ERROR com.flink.streaming.core.JobApplication   
   [] - 任务执行失败:
org.apache.flink.table.api.ValidationException: The 'OPTIONS' hint is allowed 
only when the config option 'table.dynamic-table-options.enabled' is set to 
true.
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createFinalCatalogTable(CatalogSourceTable.java:104)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:79)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2140)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) 
~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at com.flink.streaming.core.execute.ExecuteSql.exeSql(ExecuteSql.java:38) 
~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at com.flink.streaming.core.JobApplication.main(JobApplication.java:80) 
~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_211]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_211]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_211]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 

实时读取hive参数不生效

2021-12-21 Thread Fei Han
@all:
大家好!
  我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢?
版本如下:
Flink版本1.13.3
   Hive版本hive2.1.1-CDH6.2.0
设置的参数是set 'table.dynamic-table-options.enabled'='true'
报错如下:
INSERT INTO qhc_catalog.qhc_hms.qhc_ods_assassin_dept
select * from qhc_catalog.qhc_assassin_ods.assassin_dept /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition-order'='create-time') */
2021-12-21 19:56:45,198 ERROR com.flink.streaming.core.JobApplication   
   [] - 任务执行失败:
org.apache.flink.table.api.ValidationException: The 'OPTIONS' hint is allowed 
only when the config option 'table.dynamic-table-options.enabled' is set to 
true.
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createFinalCatalogTable(CatalogSourceTable.java:104)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:79)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2140)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) 
~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
com.flink.streaming.core.execute.ExecuteSql.exeSql(ExecuteSql.java:38) 
~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at com.flink.streaming.core.JobApplication.main(JobApplication.java:80) 
~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_211]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_211]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_211]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 

Re:实时读取hive参数不生效

2021-12-21 Thread Yuepeng Pan
Hi, Fei Han.


 图片挂掉了。可以尝试外链外部图床或者直接附上相关的hints原始信息。


Best,
Roc.




在 2021-12-21 20:12:33,"Fei Han"  写道:







@all:
大家好!
  我在实时读取hive的时候动态参数不生效,
Flink版本1.13.3
Hive版本hive2.1.1-CDH6.2.0


我的脚本如下:






报错如下:


请大佬们看下是什么原因造成的?



实时读取hive参数不生效

2021-12-21 Thread Fei Han




@all:
大家好!
  我在实时读取hive的时候动态参数不生效,
Flink版本1.13.3
Hive版本hive2.1.1-CDH6.2.0

我的脚本如下:





报错如下:


请大佬们看下是什么原因造成的?



Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread RS
slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?





在 2021-12-21 17:57:21,"刘建刚"  写道:
>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
>
>RS  于2021年12月21日周二 16:53写道:
>
>> hi,
>>
>> 版本:flink1.14
>>
>> 模式:batch
>>
>> 测试场景:消费hive大量数据,计算某个字段的 top 10
>>
>>
>> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>>
>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>>
>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>>
>>
>>
>>
>> 测试SQL:
>>
>> SELECT price
>>
>> FROM hive.data.data1
>>
>> ORDER BY price DESC
>>
>> LIMIT 10;


Re: flink-playground docker/mvn clean install Unknown host repo.maven.apache.org: Name or service not known

2021-12-21 Thread David Morávek
I'd say that the playground was never meant to be run behind a corporate
proxy. If you feel like the documentation can be improved improved in this
regard, please open a new jira issue describing what's missing. Ideally if
you'd like to fix this for others with the similar setup that may run into
the issue, the contributions to the documentation are always appreciated ;)

Best,
D.

On Tue, Dec 21, 2021 at 12:15 PM HG  wrote:

> Hello David,
>
> Yes I understand the issues.
> But the problem is that the documentation on
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/flink-operations-playground/
> says:
>
> Starting the Playground #
> 
>
> *The playground environment is set up in just a few steps*. We will walk
> you through the necessary commands and show how to validate that everything
> is running correctly.
>
> Which is in fact not exactly true
>
> The compose command failse.
>
> I think that instructions need to be added to fix this.
>
>
> Regards Hans-Peter
>
> Op di 21 dec. 2021 om 11:51 schreef David Morávek :
>
>> oh, I've misread your email... the proxies are generally tricky to reason
>> about, as each software needs to implement the proxy support on its own
>>
>> in general, most tools try to use the commonly "agreed upon" environment
>> variables (HTTP_PROXY & HTTPS_PROXY), which is also case for wget
>>
>> JVM uses java system properties instead [1], so this is most likely what
>> the maven proxy settings do set up. Also consider the fact, that even with
>> this system property, some custom java HTTP clients, may simply decide to
>> ignore this... :)
>>
>> does it explain your concerns?
>>
>> [1]
>> https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html
>>
>> D.
>>
>> On Tue, Dec 21, 2021 at 11:17 AM HG  wrote:
>>
>>> Hi David
>>> When I start a docker container based on the image that is created
>>> shortly before the mvn command fails, it is resolvable.
>>> I can do : apt update
>>> This basically means that the outside world is reachable
>>>
>>> When I install wget the following succeeds without any issue:
>>>
>>> wget
>>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
>>> --2021-12-21 10:06:37--
>>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
>>> Resolving www-proxy.. (www-proxy..)... 11.9.6.2
>>> Connecting to www-proxy.   ... connected.
>>> Proxy request sent, awaiting response... 200 OK
>>> Length: 5394 (5.3K) [text/xml]
>>> Saving to: ‘flink-table-api-java-1.13.1.pom’
>>>
>>> flink-table-api-java-1.13.1.pom
>>>  
>>> 100%[=>]
>>>   5.27K  --.-KB/sin 0.001s
>>>
>>> 2021-12-21 10:06:39 (4.42 MB/s) - ‘flink-table-api-java-1.13.1.pom’
>>> saved [5394/5394]
>>>
>>> Only setting /root/.m2/settings.xml correctly helps
>>> And that is what I don't understand as the proxy configuration will not
>>> be the same for everyone.
>>>
>>> Regards Hans-Peter
>>>
>>>
>>>
>>>
>>> Op di 21 dec. 2021 om 09:51 schreef David Morávek :
>>>
 Hello Hans,

 it's DNS ;) You need to make sure, that "repo.maven.apache.org" can be
 resolved from your docker container (you can use tools such as host, dig,
 nslookup to verify that). This is may be tricky to debug, unless you're
 familiar with networking. A good place to start might be checking the
 /etc/resolv.conf for details about the DNS server being used.

 Best,
 D.

 On Mon, Dec 20, 2021 at 3:39 PM HG  wrote:

> Hello
> I am trying to the flink-playground examples.
> The docker-compose build fails on the mvn clean install command .
>
> I am behind a proxy.
> To diagnose this I started a container based on the already created
> image
>   docker run -it  --name my-maven-project -v "$(pwd)":/usr/src/mymaven
> -w /usr/src/mymaven b92c6af9fde8 /bin/bash
> Adding packages with apt goes fine
> I added the proxy names with their  ip addresses to /etc/hosts, added
> the proxies to .m2/settings.xml
>
> After that it worked .
>
> But why does it not work without manual intervention?
>
> Regards Hans-Peter
>



Re: flink-playground docker/mvn clean install Unknown host repo.maven.apache.org: Name or service not known

2021-12-21 Thread HG
Hello David,

Yes I understand the issues.
But the problem is that the documentation on
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/flink-operations-playground/
says:

Starting the Playground #


*The playground environment is set up in just a few steps*. We will walk
you through the necessary commands and show how to validate that everything
is running correctly.

Which is in fact not exactly true

The compose command failse.

I think that instructions need to be added to fix this.


Regards Hans-Peter

Op di 21 dec. 2021 om 11:51 schreef David Morávek :

> oh, I've misread your email... the proxies are generally tricky to reason
> about, as each software needs to implement the proxy support on its own
>
> in general, most tools try to use the commonly "agreed upon" environment
> variables (HTTP_PROXY & HTTPS_PROXY), which is also case for wget
>
> JVM uses java system properties instead [1], so this is most likely what
> the maven proxy settings do set up. Also consider the fact, that even with
> this system property, some custom java HTTP clients, may simply decide to
> ignore this... :)
>
> does it explain your concerns?
>
> [1]
> https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html
>
> D.
>
> On Tue, Dec 21, 2021 at 11:17 AM HG  wrote:
>
>> Hi David
>> When I start a docker container based on the image that is created
>> shortly before the mvn command fails, it is resolvable.
>> I can do : apt update
>> This basically means that the outside world is reachable
>>
>> When I install wget the following succeeds without any issue:
>>
>> wget
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
>> --2021-12-21 10:06:37--
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
>> Resolving www-proxy.. (www-proxy..)... 11.9.6.2
>> Connecting to www-proxy.   ... connected.
>> Proxy request sent, awaiting response... 200 OK
>> Length: 5394 (5.3K) [text/xml]
>> Saving to: ‘flink-table-api-java-1.13.1.pom’
>>
>> flink-table-api-java-1.13.1.pom
>>  
>> 100%[=>]
>>   5.27K  --.-KB/sin 0.001s
>>
>> 2021-12-21 10:06:39 (4.42 MB/s) - ‘flink-table-api-java-1.13.1.pom’
>> saved [5394/5394]
>>
>> Only setting /root/.m2/settings.xml correctly helps
>> And that is what I don't understand as the proxy configuration will not
>> be the same for everyone.
>>
>> Regards Hans-Peter
>>
>>
>>
>>
>> Op di 21 dec. 2021 om 09:51 schreef David Morávek :
>>
>>> Hello Hans,
>>>
>>> it's DNS ;) You need to make sure, that "repo.maven.apache.org" can be
>>> resolved from your docker container (you can use tools such as host, dig,
>>> nslookup to verify that). This is may be tricky to debug, unless you're
>>> familiar with networking. A good place to start might be checking the
>>> /etc/resolv.conf for details about the DNS server being used.
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Dec 20, 2021 at 3:39 PM HG  wrote:
>>>
 Hello
 I am trying to the flink-playground examples.
 The docker-compose build fails on the mvn clean install command .

 I am behind a proxy.
 To diagnose this I started a container based on the already created
 image
   docker run -it  --name my-maven-project -v "$(pwd)":/usr/src/mymaven
 -w /usr/src/mymaven b92c6af9fde8 /bin/bash
 Adding packages with apt goes fine
 I added the proxy names with their  ip addresses to /etc/hosts, added
 the proxies to .m2/settings.xml

 After that it worked .

 But why does it not work without manual intervention?

 Regards Hans-Peter

>>>


Re: flink-playground docker/mvn clean install Unknown host repo.maven.apache.org: Name or service not known

2021-12-21 Thread David Morávek
oh, I've misread your email... the proxies are generally tricky to reason
about, as each software needs to implement the proxy support on its own

in general, most tools try to use the commonly "agreed upon" environment
variables (HTTP_PROXY & HTTPS_PROXY), which is also case for wget

JVM uses java system properties instead [1], so this is most likely what
the maven proxy settings do set up. Also consider the fact, that even with
this system property, some custom java HTTP clients, may simply decide to
ignore this... :)

does it explain your concerns?

[1] https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html

D.

On Tue, Dec 21, 2021 at 11:17 AM HG  wrote:

> Hi David
> When I start a docker container based on the image that is created shortly
> before the mvn command fails, it is resolvable.
> I can do : apt update
> This basically means that the outside world is reachable
>
> When I install wget the following succeeds without any issue:
>
> wget
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
> --2021-12-21 10:06:37--
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
> Resolving www-proxy.. (www-proxy..)... 11.9.6.2
> Connecting to www-proxy.   ... connected.
> Proxy request sent, awaiting response... 200 OK
> Length: 5394 (5.3K) [text/xml]
> Saving to: ‘flink-table-api-java-1.13.1.pom’
>
> flink-table-api-java-1.13.1.pom
>  
> 100%[=>]
>   5.27K  --.-KB/sin 0.001s
>
> 2021-12-21 10:06:39 (4.42 MB/s) - ‘flink-table-api-java-1.13.1.pom’
> saved [5394/5394]
>
> Only setting /root/.m2/settings.xml correctly helps
> And that is what I don't understand as the proxy configuration will not be
> the same for everyone.
>
> Regards Hans-Peter
>
>
>
>
> Op di 21 dec. 2021 om 09:51 schreef David Morávek :
>
>> Hello Hans,
>>
>> it's DNS ;) You need to make sure, that "repo.maven.apache.org" can be
>> resolved from your docker container (you can use tools such as host, dig,
>> nslookup to verify that). This is may be tricky to debug, unless you're
>> familiar with networking. A good place to start might be checking the
>> /etc/resolv.conf for details about the DNS server being used.
>>
>> Best,
>> D.
>>
>> On Mon, Dec 20, 2021 at 3:39 PM HG  wrote:
>>
>>> Hello
>>> I am trying to the flink-playground examples.
>>> The docker-compose build fails on the mvn clean install command .
>>>
>>> I am behind a proxy.
>>> To diagnose this I started a container based on the already created image
>>>   docker run -it  --name my-maven-project -v "$(pwd)":/usr/src/mymaven
>>> -w /usr/src/mymaven b92c6af9fde8 /bin/bash
>>> Adding packages with apt goes fine
>>> I added the proxy names with their  ip addresses to /etc/hosts, added
>>> the proxies to .m2/settings.xml
>>>
>>> After that it worked .
>>>
>>> But why does it not work without manual intervention?
>>>
>>> Regards Hans-Peter
>>>
>>


Re: flink-playground docker/mvn clean install Unknown host repo.maven.apache.org: Name or service not known

2021-12-21 Thread HG
Hi David
When I start a docker container based on the image that is created shortly
before the mvn command fails, it is resolvable.
I can do : apt update
This basically means that the outside world is reachable

When I install wget the following succeeds without any issue:

wget
https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
--2021-12-21 10:06:37--
https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom
Resolving www-proxy.. (www-proxy..)... 11.9.6.2
Connecting to www-proxy.   ... connected.
Proxy request sent, awaiting response... 200 OK
Length: 5394 (5.3K) [text/xml]
Saving to: ‘flink-table-api-java-1.13.1.pom’

flink-table-api-java-1.13.1.pom
 
100%[=>]
  5.27K  --.-KB/sin 0.001s

2021-12-21 10:06:39 (4.42 MB/s) - ‘flink-table-api-java-1.13.1.pom’
saved [5394/5394]

Only setting /root/.m2/settings.xml correctly helps
And that is what I don't understand as the proxy configuration will not be
the same for everyone.

Regards Hans-Peter




Op di 21 dec. 2021 om 09:51 schreef David Morávek :

> Hello Hans,
>
> it's DNS ;) You need to make sure, that "repo.maven.apache.org" can be
> resolved from your docker container (you can use tools such as host, dig,
> nslookup to verify that). This is may be tricky to debug, unless you're
> familiar with networking. A good place to start might be checking the
> /etc/resolv.conf for details about the DNS server being used.
>
> Best,
> D.
>
> On Mon, Dec 20, 2021 at 3:39 PM HG  wrote:
>
>> Hello
>> I am trying to the flink-playground examples.
>> The docker-compose build fails on the mvn clean install command .
>>
>> I am behind a proxy.
>> To diagnose this I started a container based on the already created image
>>   docker run -it  --name my-maven-project -v "$(pwd)":/usr/src/mymaven -w
>> /usr/src/mymaven b92c6af9fde8 /bin/bash
>> Adding packages with apt goes fine
>> I added the proxy names with their  ip addresses to /etc/hosts, added the
>> proxies to .m2/settings.xml
>>
>> After that it worked .
>>
>> But why does it not work without manual intervention?
>>
>> Regards Hans-Peter
>>
>


Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread 刘建刚
固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。

RS  于2021年12月21日周二 16:53写道:

> hi,
>
> 版本:flink1.14
>
> 模式:batch
>
> 测试场景:消费hive大量数据,计算某个字段的 top 10
>
>
> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>
> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>
> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>
>
>
>
> 测试SQL:
>
> SELECT price
>
> FROM hive.data.data1
>
> ORDER BY price DESC
>
> LIMIT 10;


Re: flinksql的await

2021-12-21 Thread Jing Ge
陈卓宇 你好,

在默认情况下,所有提交后的DML都是异步执行的,详见TableEnvironment.executeSql(String
statement)的注释。使用.await()和不使用.await()的区别是使用await()后会等待异步查询返回第一行结果(题外话:请注意INSERT和SELECT的区别),详见TableResult.await()注解,具体代码见TableResultImpl.awaitInternal(long
timeout, TimeUnit unit), 由于此时入参timeout为-1,导致future.get()被调用,
强制等待resultProvider.isFirstRowReady()为true。

祝好

On Tue, Dec 21, 2021 at 10:00 AM 陈卓宇 <2572805...@qq.com.invalid> wrote:

> 社区您好:
>
> String initialValues =
> "INSERT INTO kafka\n"
> + "SELECT CAST(price AS DECIMAL(10, 2)), currency, "
> + " CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS
> TIMESTAMP(3))\n"
> + "FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01',
> '2019-12-12 00:00:01.001001'), \n"
> + "  (1.11,'US Dollar','2019-12-12', '00:00:02',
> '2019-12-12 00:00:02.002001'), \n"
> + "  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12
> 00:00:03.004001'), \n"
> + "  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12
> 00:00:04.005001'), \n"
> + "  (5.33,'US Dollar','2019-12-12', '00:00:05',
> '2019-12-12 00:00:05.006001'), \n"
> + "  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12
> 00:00:10'))\n"
> + "  AS orders (price, currency, d, t, ts)";
> tEnv.executeSql(initialValues).await();
> 我看了await他的注解但是感觉还是没有理解他的作用,
> 使用.await()和不使用.await()的区别是什么?
> 陈卓宇
>
>
> 


Re: org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat class missing from Flink 1.14 ?

2021-12-21 Thread Timo Walther

Hi Tuomas,

are you sure that all dependencies have been upgraded to Flink 1.14. 
Connector dependencies that still reference Flink 1.13 might cause issues.


JdbcBatchingOutputFormat has been refactored in this PR:

https://github.com/apache/flink/pull/16528

I hope this helps.

Regards,
Timo

On 18.12.21 12:30, Tuomas Pystynen wrote:
I am just trying out Flink using images from Docker Hub. My simple 
insert using JDBC connector to Postgres database fails with this error


Flink SQL> insert into order_counts
 > select customerid, count(*) order_count from orders group by customerid;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5d3b32bd8cc0f11dfe73cbf242793cc9


2021-12-17 20:39:08,975 WARN  org.apache.flink.runtime.taskmanager.Task 
                    [] - GroupAggregate(groupBy=[customerid], 
select=[customerid, COUNT(*) AS order_count]) -> 
NotNullEnforcer(fields=[customerid]) -> Sink: 
Sink(table=[default_catalog.default_database.order_counts], 
fields=[customerid, order_count]) (1/1)#0 
(12466e41fb377bcd45b3d22aab1cadfd) switched from INITIALIZING to FAILED 
with failure cause: java.lang.NoClassDefFoundError: 
org/apache/flink/runtime/util/ExecutorThreadFactory
at 
*org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open*(JdbcBatchingOutputFormat.java:118)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.runtime.util.ExecutorThreadFactory

at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more

After downgrading to Flink 1.13 the insert works fine. Has 
JdbcBatchingOutputFormat.java been removed by accident or purpose in 1.14 ?


Regards,
Tuomas Pystynen




batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread RS
hi,

版本:flink1.14

模式:batch

测试场景:消费hive大量数据,计算某个字段的 top 10

使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。

请问下,SortLimit状态一直为CREATED是正常现象吗? 

数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?




测试SQL:

SELECT price

FROM hive.data.data1

ORDER BY price DESC

LIMIT 10;

flinksql??await

2021-12-21 Thread ??????
??

String initialValues =
"INSERT INTO kafka\n"
+ "SELECT CAST(price AS DECIMAL(10, 2)), currency, "
+ " CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS 
TIMESTAMP(3))\n"
+ "FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', 
'2019-12-12 00:00:01.001001'), \n"
+ "  (1.11,'US Dollar','2019-12-12', '00:00:02', '2019-12-12 
00:00:02.002001'), \n"
+ "  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 
00:00:03.004001'), \n"
+ "  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 
00:00:04.005001'), \n"
+ "  (5.33,'US Dollar','2019-12-12', '00:00:05', '2019-12-12 
00:00:05.006001'), \n"
+ "  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 
00:00:10'))\n"
+ "  AS orders (price, currency, d, t, ts)";
tEnv.executeSql(initialValues).await();
??await??
.await().await()??
??




Re: flink-playground docker/mvn clean install Unknown host repo.maven.apache.org: Name or service not known

2021-12-21 Thread David Morávek
Hello Hans,

it's DNS ;) You need to make sure, that "repo.maven.apache.org" can be
resolved from your docker container (you can use tools such as host, dig,
nslookup to verify that). This is may be tricky to debug, unless you're
familiar with networking. A good place to start might be checking the
/etc/resolv.conf for details about the DNS server being used.

Best,
D.

On Mon, Dec 20, 2021 at 3:39 PM HG  wrote:

> Hello
> I am trying to the flink-playground examples.
> The docker-compose build fails on the mvn clean install command .
>
> I am behind a proxy.
> To diagnose this I started a container based on the already created image
>   docker run -it  --name my-maven-project -v "$(pwd)":/usr/src/mymaven -w
> /usr/src/mymaven b92c6af9fde8 /bin/bash
> Adding packages with apt goes fine
> I added the proxy names with their  ip addresses to /etc/hosts, added the
> proxies to .m2/settings.xml
>
> After that it worked .
>
> But why does it not work without manual intervention?
>
> Regards Hans-Peter
>