Flink 1.11.1 - job manager exists with exit code 0

2020-07-24 Thread Alexey Trenikhun
Hello,

I've Flink 1.11.1 session cluster running via docker compose, I upload job jar, 
when I submit job jobmanager exits without any errors in log:

...
{"@timestamp":"2020-07-25T04:32:54.007Z","@version":"1","message":"Starting 
execution of job katana-fsp (64ff3943fdc5024c5beef1612518c627) under job master 
id 
.","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":2}
{"@timestamp":"2020-07-25T04:32:54.011Z","@version":"1","message":"Stopped BLOB 
server at 
0.0.0.0:6124","logger_name":"org.apache.flink.runtime.blob.BlobServer","thread_name":"BlobServer
 shutdown hook","level":"INFO","level_value":2}
{"@timestamp":"2020-07-25T04:32:54.015Z","@version":"1","message":"Starting 
scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":2}
{"@timestamp":"2020-07-25T04:32:54.016Z","@version":"1","message":"Job 
katana-fsp (64ff3943fdc5024c5beef1612518c627) switched from state CREATED to 
RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":2}

Any ideas how to diagnose it?

Thanks,
Alexey


Re:Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 Thread RS
你看下INERT SQL的执行时长,看下是不是MySQL那边的瓶颈?比如写入的数据较大,索引创建比较慢等其他问题?

或者你手动模拟执行下SQL写数据对比下速度?














在 2020-07-25 10:20:35,"小学生" <201782...@qq.com> 写道:
>您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。


Re: flink1.11??????????????????mysql????????

2020-07-24 Thread ??????
mysql8??

Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 Thread WeiXubin
Hi,
你可以尝试改写url,加上rewritebatchedstatements=true,如下:
jdbc:mysql://198.2.2.71:3306/bda?useSSL=false=true

MySQL
Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true,
驱动才会帮你批量执行SQL。

祝好
weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11 job stop with save point timeout error

2020-07-24 Thread Ivan Yang
Hi Robert,
Below is the job manager log after issuing the “flink stop” command


2020-07-24 19:24:12,388 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 1 (type=CHECKPOINT) @ 1595618652138 for job 
853c59916ac33dfbf46503b33289929e.
2020-07-24 19:24:13,914 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 1 for job 853c59916ac33dfbf46503b33289929e (7146 bytes in 1774 ms).
2020-07-24 19:27:59,299 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Triggering stop-with-savepoint for job 
853c59916ac33dfbf46503b33289929e.
2020-07-24 19:27:59,655 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 2 (type=SYNC_SAVEPOINT) @ 1595618879302 for job 
853c59916ac33dfbf46503b33289929e.
2020-07-24 19:28:00,962 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 2 for job 853c59916ac33dfbf46503b33289929e (7147 bytes in 1240 ms).
==

It looks normal to me. 

In the kubernetes deployment cluster, we set up a metric reporter, it has these 
keys in the flink-config.yaml

# Metrics Reporter Configuration
metrics.reporters: wavefront
metrics.reporter.wavefront.interval: 60 SECONDS
metrics.reporter.wavefront.env: prod
metrics.reporter.wavefront.class: com.x.flink.monitor.WavefrontReporter
metrics.reporter.wavefront.host: xx
metrics.reporter.wavefront.token: xx
metrics.scope.tm: flink.taskmanager

Could this reporter interval interfere the job manager? I test the same job in 
a standalone 
Flink 1.11.0 without the reporter, Flink stop worked, and no hanging nor 
timeout. Also the same reporter is used in 1.9.1 version where we didn’t have 
issue on “flink stop”.

Thanks 
Ivan


> On Jul 24, 2020, at 5:15 AM, Robert Metzger  wrote:
> 
> Hi Ivan,
> thanks a lot for your message. Can you post the JobManager log here as well? 
> It might contain additional information on the reason for the timeout.
> 
> On Fri, Jul 24, 2020 at 4:03 AM Ivan Yang  > wrote:
> Hello everyone,
> 
> We recently upgrade FLINK from 1.9.1 to 1.11.0. Found one strange behavior 
> when we stop a job to a save point got following time out error.
> I checked Flink web console, the save point is created in s3 in 1 second.The 
> job is fairly simple, so 1 second for savepoint generation is expected. We 
> use kubernetes deployment. I clocked it, it’s about 60 seconds when it 
> returns this error. So afterwards, the job is hanging (it still says running, 
> but actually not doing anything). I need run another command to cancel it. 
> Anyone has idea what’s going on here? BTW, “flink stop works” in 1.19.1 for 
> us before
> 
> 
> 
> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ flink stop 
> 88d9b46f59d131428e2a18c9c7b3aa3f
> Suspending job "88d9b46f59d131428e2a18c9c7b3aa3f" with a savepoint.
> 
> 
>  The program finished with the following exception:
> 
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "88d9b46f59d131428e2a18c9c7b3aa3f".
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>   at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>   ... 9 more
> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ 
> 
> 
> Thanks in advance,
> Ivan



Re: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Kostas Kloudas
Thanks a lot for the update Tomasz and keep up posted if it happens again.

Kostas

On Fri, Jul 24, 2020 at 6:37 PM Tomasz Dudziak  wrote:
>
> Yes, the job was running and the REST server as well. No JobMaster failures 
> noticed.
> I used a test cluster deployed on a bunch of VM's and bare metal boxes.
> I am afraid, I can no longer reproduce this issue. It occurred a couple days 
> ago and lasted for an entire day with jobs being quite often erratically 
> reported as Not Found. As I said, I noticed that another query immediately 
> after the one that returned Not Found consistently returned a correct result.
> It had never occurred before and I am afraid now I could no longer observe it 
> again. I appreciate it does not give too much information so I will come back 
> with more info on this thread if it happens again.
>
> -Original Message-
> From: Kostas Kloudas 
> Sent: 24 July 2020 15:46
> To: Tomasz Dudziak 
> Cc: user@flink.apache.org; Chesnay Schepler 
> Subject: Re: REST API randomly returns Not Found for an existing job
>
> Hi Tomasz,
>
> Thanks a lot for reporting this issue. If you have verified that the job is 
> running AND that the REST server is also up and running (e.g.
> check the overview page) then I think that this should not be happening. I am 
> cc'ing Chesnay who may have an additional opinion on this.
>
> Cheers,
> Kostas
>
> On Thu, Jul 23, 2020 at 12:59 PM Tomasz Dudziak  wrote:
> >
> > Hi,
> >
> >
> >
> > I have come across an issue related to GET /job/:jobId endpoint from 
> > monitoring REST API in Flink 1.9.0. A few seconds after successfully 
> > starting a job and confirming its status as RUNNING, that endpoint would 
> > return 404 (Not Found). Interestingly, querying immediately again 
> > (literally a millisecond later) would return a valid result. I later 
> > noticed a similar behaviour in regard to a finished job as well. At certain 
> > points in time that endpoint would arbitrarily return 404, but similarly 
> > querying again would succeed. I saw this strange behaviour only recently 
> > and it used to work fine before.
> >
> >
> >
> > Do you know what could be the root cause of this? At the moment, as a
> > workaround I just query a job a couple of times in a row to ensure
> > whether it definitely does not exist or it is just being misreported
> > as non-existent, but this feels a bit like cottage industry…
> >
> >
> >
> > Kind regards,
> >
> > Tomasz
> >
> >
> >
> > Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street,
> > London, SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 024 7061
> >
> >
> >
> >
> >
> > This e-mail and any attachments are confidential to the addressee(s) and 
> > may contain information that is legally privileged and/or confidential. 
> > Please refer to http://www.mwam.com/email-disclaimer-uk for important 
> > disclosures regarding this email. If we collect and use your personal data 
> > we will use it in accordance with our privacy policy, which can be reviewed 
> > at https://www.mwam.com/privacy-policy.
> >
> > Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> > Authority. Marshall Wace LLP is a limited liability partnership registered 
> > in England and Wales with registered number OC302228 and registered office 
> > at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving 
> > this e-mail as a client, or an investor in an investment vehicle, managed 
> > or advised by Marshall Wace North America L.P., the sender of this e-mail 
> > is communicating with you in the sender's capacity as an associated or 
> > related person of Marshall Wace North America L.P., which is registered 
> > with the US Securities and Exchange Commission as an investment adviser.
>
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. Please 
> refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
> regarding this email. If we collect and use your personal data we will use it 
> in accordance with our privacy policy, which can be reviewed at 
> https://www.mwam.com/privacy-policy .
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P., which is registered with the US 
> Securities and Exchange Commission as an investment adviser.


Fwd: Flink Session TM Logs

2020-07-24 Thread Richard Moorhead
-- Forwarded message -
From: Robert Metzger 
Date: Fri, Jul 24, 2020 at 1:09 PM
Subject: Re: Flink Session TM Logs
To: Richard Moorhead 


I accidentally replied to you directly, not to the user@ mailing list. Is
it okay for you to publish the thread on the list again?



On Fri, Jul 24, 2020 at 8:01 PM Richard Moorhead 
wrote:

> It is enabled. The issue is that for a long running flink session -which
> may execute many jobs- the task managers, after a job is completed, are
> gone, and their logs arent available.
>
> What I have noticed is that when the session is terminated I am able to
> find the logs in the job history server under the associated yarn
> application id.
>
> On Fri, Jul 24, 2020 at 12:51 PM Robert Metzger 
> wrote:
>
>> Hi Richard,
>>
>> you need to enable YARN log aggregation to access logs of finished YARN
>> applications.
>>
>> On Fri, Jul 24, 2020 at 5:58 PM Richard Moorhead <
>> richard.moorh...@gmail.com> wrote:
>>
>>> When running a flink session on YARN, task manager logs for a job are
>>> not available after completion. How do we locate these logs?
>>>
>>>


Re: SourceReaderBase not part of flink-core 1.11.1

2020-07-24 Thread Robert Metzger
SourceReaderBase seems to be in flink-connector-base.

On Fri, Jul 24, 2020 at 5:59 PM Yuval Itzchakov  wrote:

> Hi,
> I'm implementing a custom SourceReader and want to base it on
> SourceReaderBase. However, it seems like while SourceReader and Source are
> part of `flink-core:1.11.1`, SourceReaderBase is not?
>
> Do I need an external package for it?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


RE: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Tomasz Dudziak
Yes, the job was running and the REST server as well. No JobMaster failures 
noticed.
I used a test cluster deployed on a bunch of VM's and bare metal boxes.
I am afraid, I can no longer reproduce this issue. It occurred a couple days 
ago and lasted for an entire day with jobs being quite often erratically 
reported as Not Found. As I said, I noticed that another query immediately 
after the one that returned Not Found consistently returned a correct result.
It had never occurred before and I am afraid now I could no longer observe it 
again. I appreciate it does not give too much information so I will come back 
with more info on this thread if it happens again.

-Original Message-
From: Kostas Kloudas  
Sent: 24 July 2020 15:46
To: Tomasz Dudziak 
Cc: user@flink.apache.org; Chesnay Schepler 
Subject: Re: REST API randomly returns Not Found for an existing job

Hi Tomasz,

Thanks a lot for reporting this issue. If you have verified that the job is 
running AND that the REST server is also up and running (e.g.
check the overview page) then I think that this should not be happening. I am 
cc'ing Chesnay who may have an additional opinion on this.

Cheers,
Kostas

On Thu, Jul 23, 2020 at 12:59 PM Tomasz Dudziak  wrote:
>
> Hi,
>
>
>
> I have come across an issue related to GET /job/:jobId endpoint from 
> monitoring REST API in Flink 1.9.0. A few seconds after successfully starting 
> a job and confirming its status as RUNNING, that endpoint would return 404 
> (Not Found). Interestingly, querying immediately again (literally a 
> millisecond later) would return a valid result. I later noticed a similar 
> behaviour in regard to a finished job as well. At certain points in time that 
> endpoint would arbitrarily return 404, but similarly querying again would 
> succeed. I saw this strange behaviour only recently and it used to work fine 
> before.
>
>
>
> Do you know what could be the root cause of this? At the moment, as a 
> workaround I just query a job a couple of times in a row to ensure 
> whether it definitely does not exist or it is just being misreported 
> as non-existent, but this feels a bit like cottage industry…
>
>
>
> Kind regards,
>
> Tomasz
>
>
>
> Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, 
> London, SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 024 7061
>
>
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. Please 
> refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
> regarding this email. If we collect and use your personal data we will use it 
> in accordance with our privacy policy, which can be reviewed at 
> https://www.mwam.com/privacy-policy.
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P., which is registered with the US 
> Securities and Exchange Commission as an investment adviser.

This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. Please 
refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
regarding this email. If we collect and use your personal data we will use it 
in accordance with our privacy policy, which can be reviewed at 
https://www.mwam.com/privacy-policy .

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.


Re: Flink state reconciliation

2020-07-24 Thread Kostas Kloudas
Hi Alex,

Maybe Seth (cc'ed) may have an opinion on this.

Cheers,
Kostas

On Thu, Jul 23, 2020 at 12:08 PM Александр Сергеенко
 wrote:
>
> Hi,
>
> We use so-called "control stream" pattern to deliver settings to the Flink 
> job using Apache Kafka topics. The settings are in fact an unlimited stream 
> of events originating from the master DBMS, which acts as a single point of 
> truth concerning the rules list.
>
> It may seems odd, since Flink guarantees the "exactly once" delivery 
> semantics, while a service, which provides the rules publishing mechanism to 
> Kafka is written using Akka Streams and guarantees the "at least once" 
> semantics while the rule handling inside Flink Job implemented in an 
> idempotent manner, but: we have to manage some cases when we need to execute 
> a reconciliation between the current rules stored at the master DBMS and the 
> existing Flink State.
>
> We've looked at the Flink's tooling and found out that the State Processor 
> API can possibly solve our problem, so we basically have to implement a 
> periodical process, which unloads the State to some external file (.csv) and 
> then execute a comparison between the set and the information given at the 
> master system.
> Basically it looks like the lambda architecture approach while Flink is 
> supposed to implement the kappa architecture and in that case our 
> reconciliation problem looks a bit far-fetched.
>
> Are there any best practices or some patterns addressing such scenarios in 
> Flink?
>
> Great thanks for any possible assistance and ideas.
>
> -
> Alex Sergeenko
>


SourceReaderBase not part of flink-core 1.11.1

2020-07-24 Thread Yuval Itzchakov
Hi,
I'm implementing a custom SourceReader and want to base it on
SourceReaderBase. However, it seems like while SourceReader and Source are
part of `flink-core:1.11.1`, SourceReaderBase is not?

Do I need an external package for it?

-- 
Best Regards,
Yuval Itzchakov.


Flink Session TM Logs

2020-07-24 Thread Richard Moorhead
When running a flink session on YARN, task manager logs for a job are not
available after completion. How do we locate these logs?


Re: Kafka connector with PyFlink

2020-07-24 Thread Xingbo Huang
Hi Wojciech,
In many cases, you can make sure that your code can run correctly in local
mode, and then submit the job to the cluster for testing. For how to add
jar packages in local mode, you can refer to the doc[1].
Besides, you'd better use blink planner which is the default planner. For
how to use blink planner, you can refer to the doc[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment

Best,
Xingbo

Wojciech Korczyński  于2020年7月24日周五
下午9:40写道:

> Hi,
>
> I've done like you recommended:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
> StreamTableEnvironment, ScalarFunction
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, 
> Json, Csv
> from pyflink.table.udf import udf
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_config = TableConfig()
> t_env = StreamTableEnvironment.create(exec_env, t_config)
>
> INPUT_TABLE = "my_topic"
> INPUT_TOPIC = "my-topic"
> LOCAL_KAFKA = 'my-cluster-kafka-bootstrap:9092'
> OUTPUT_TABLE = "my_topic_output"
> OUTPUT_TOPIC = "my-topic-output"
>
> ddl_source = f"""
>CREATE TABLE {INPUT_TABLE} (
>message STRING
>) WITH (
>'connector' = 'kafka',
>'topic' = '{INPUT_TOPIC}',
>'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>'format' = 'json'
>)
>"""
>
> ddl_sink = f"""
>CREATE TABLE {OUTPUT_TABLE} (
>message STRING
>) WITH (
>'connector' = 'kafka',
>'topic' = '{OUTPUT_TOPIC}',
>'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>'format' = 'json'
>)
>"""
>
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_sink)
>
> result = t_env.execute_sql(f"""
> INSERT INTO {OUTPUT_TABLE}
> SELECT message
> FROM {INPUT_TABLE}
> """)
>
> result.get_job_client().get_job_execution_result().result()
>
> I think it is correctly written.
>
> However after deploying that job I'm getting an error:
>
> wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
> /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py 
> kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
>  to field java.util.Properties.serialVersionUID
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
> Traceback (most recent call last):
>   File "kafka2flink.py", line 62, in 
> result.get_job_client().get_job_execution_result().result()
>   File 
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
>  line 78, in result
>   File 
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
>   File 
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>  line 147, in deco
>   File 
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> 

Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

2020-07-24 Thread Dmytro Dragan
Hi Timo,
Thank you for response.

Well, it was working.
We have a number of pipelines in production which reuse DataStream and Table 
API parts on Flink 1.10, both for stream and batch.
The same that simple case without aggregation would work in Flink 1.11

But let`s assume there are some incompatible changes and such approach would 
not work anymore.

In case of TableEnvironment there is no way to create/retract stream.
I would assume that it is possible to wrapped stream in bounded 
StreamTableSource/ StreamTableSink 
and use deprecated TableEnvironment methods to register them, but I`m wonder if 
there is a better way to do it.

It sounds a quite strange that with having Blink planner which optimise 
DataStream pipelines for stream and batch jobs, 
there is necessity to write the same things on DataStream and DataSet API.


On 24/07/2020, 15:36, "Timo Walther"  wrote:

Hi Dmytro,

`StreamTableEnvironment` does not support batch mode currently. Only 
`TableEnvironment` supports the unified story. I saw that you disabled 
the check in the `create()` method. This check exists for a reason.

For batch execution, the planner sets specific properties on the stream 
graph that the StreamExecutionEnvironment cannot handle (e.g. blocking 
inputs). My guess would be that this is the reason for your exception.

Have you tried to use the unified `TableEnvironment`?

Regards,
Timo




On 23.07.20 15:14, Dmytro Dragan wrote:
> Hi All,
> 
> We are working on migration existing pipelines from Flink 1.10 to Flink 
> 1.11.
> 
> We are using Blink planner and have unified pipelines which can be used 
> in stream and batch mode.
> 
> Stream pipelines works as expected, but batch once fail on Flink 1.11 if 
> they have any table aggregation transformation.
> 
> Simple example of failed pipeline:
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment./getExecutionEnvironment/();
> env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
> 
> TableConfig tableConfig = new TableConfig();
> tableConfig.setIdleStateRetentionTime(
>  org.apache.flink.api.common.time.Time./minutes/(10),
> org.apache.flink.api.common.time.Time./minutes/(30)
> );
> EnvironmentSettings settings = 
> 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
> 
> // is created using work around with ignoring settings.isStreamingMode() 
> check
> StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
> 
> DataStreamSource streamSource = env.fromCollection(/asList/(new 
> A("1"), new A("2")));
> 
> Table table = tEnv.fromDataStream(streamSource);
> tEnv.createTemporaryView("A", table);
> 
> String sql = "select s from A group by s";
> 
> tEnv
> .toRetractStream(tEnv.sqlQuery(sql), Row.class)
>   .flatMap(new RetractFlatMap())
>   .map(Row::toString)
>   .addSink(new TestSinkFunction<>());
> 
> env.execute("");
> 
> /values/.forEach(System./out/::println);
> 
> Exception:
> 
> Caused by: java.lang.IllegalStateException: Trying to consume an input 
> partition whose producer is not ready (result type: BLOCKING, partition 
> consumable: false, producer state: DEPLOYING, partition id: 
> 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
> 
>  at 
> 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
> 
>  …
> 
> Adding StreamTableEnvironment execute does not help.
> 
> Could you please advise what I`m missing?
> 






Re: Printing effective config for flint 1.11 cli

2020-07-24 Thread Kostas Kloudas
Hi Senthil,

You can see the configuration from the WebUI or you can get from the
REST API[1].
In addition, if you enable debug logging, you will have a line
starting with "Effective executor configuration:" in your client logs
(although I am not 100% sure if this will contain all the
configuration parameters).

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

On Fri, Jul 24, 2020 at 4:28 PM Senthil Kumar  wrote:
>
> Hello,
>
>
>
> My understanding is that flink consumes the config from the config file as 
> well as those specified via the -D  option.
>
> I assume that the -D will override the values from the config file?
>
>
>
> Is there a way to somehow see what the effective config is?
>
> i.e. print all of the config values that flink is going to be invoked with?
>
>
>
> We ran into an issue with the flink stop  command.
>
> It was exiting (after about a minute) with 
> java.util.concurrent.TimeoutException exception
>
> at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>
>
>
> Following the docs, I tried to issue the command with -D “client.timeout=10 
> min”, but it seemed to have made no difference.
>
> That made me wonder just what config values were actually being used.
>
>
>
> Cheers
>
> Kumar


Re: Changing watermark in the middle of a flow

2020-07-24 Thread Kostas Kloudas
Hi Lorenzo,

If you want to learn how Flink uses watermarks, it is worth checking [1].

Now in a nutshell, what a watermark will do in a pipeline is that it
may fire timers that you may have registered, or windows that you may
have accumulated.
If you have no time-sensitive operations between the first and the
second watermark generators, then I do not think you have to worry
(although it would help if you could share a bit more about your
pipeline in order to have a more educated estimation). If you have
windows, then your windows will fire and the emitted elements will
have the timestamp of the end of the window.

After the second watermark assigner, the watermarks coming from the
previous one are discarded and they are not flowing in the pipeline
anymore. You will only have the new watermarks.

A problem may arise if, for example, the second watermark generator
emits watermarks with smaller values than the first but the timestamps
of the elements are assigned based on the progress of the first
generator (e.g. windows fired) and now all your elements are
considered "late".

I hope that the above paint the big picture of what is happening in
your pipeline.

Again, I may be missing something so feel free to send more details
about your pipeline so that we can help a bit more.

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html

On Wed, Jul 22, 2020 at 9:35 AM Lorenzo Nicora  wrote:
>
> Hi
>
> I have a linear streaming flow with a single source and multiple sinks to 
> publish intermediate results.
> The time characteristic is Event Time and I am adding one 
> AssignerWithPeriodicWatermarks immediately after the source.
> I need to add a different assigner, in the middle of the flow, to change the 
> event time - i.e. extracting a different field from the record as event time.
>
> I am not sure I completely understand the implications of changing event time 
> and watermark in the middle of a flow.
>
> Can anybody give me a hint or direct me to any relevant documentation?
>
> Lorenzo


Re: Question on Pattern Matching

2020-07-24 Thread Kostas Kloudas
Hi Basanth,

If I understand your usecase correctly:

1) you want to find all A->B->C->D
2) for all of them you want to know how long it took to complete
3) if one completed within X it is considered ok, if not, it is
considered problematic
4) you want to count each one of them

One way to go is through CEP as Chesnay suggested and count the
resulting matches. This will give you all the requirements apart from
the one about knowing how long it took for the problematic ones to
complete.

One solution to solve this, is to specify a within() clause that is,
for example, 2x or 5x the SLA and then, for all successful (within 5x)
matches, you do the categorization manually (e,g. using a
ProcessFunction to split them into <= SLA and > SLA). After all, I
would assume that if you do not want to keep infinite state your job
has to have a cut off (in the above example 5x), after which a pattern
is considered as failed and you stop tracking it.

Another solution is to go with a ProcessFunction[1] since the
beginning and implement your logic, but if your elements arrive
"out-of-order" e.g. if B may arrive before A, then your code may need
to be pretty complicated. If out-of-orderness is not an issue then the
example in [1] can help, but still the solution will be more
complicated I guess than simply using CEP.

Hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example

On Thu, Jul 23, 2020 at 1:07 PM Basanth Gowda  wrote:
>
> Yes - I am able to process matched out patterns.
>
> Let's suppose I have an order fulfillment process.
>
> I want to know how many fulfillments have not met SLA and further how late 
> they are and track until they are fulfilled.
>
> From what I tried with samples, once the pattern timeout, it is discarded and 
> events that come after that are ignored (not applied to the pattern).
>
> Is there a better way to do it using Table API? Where I am able to emit an 
> event (alert) when the timeout happens, and it continues to alert me - hey 
> you fulfillment is delayed by 6 hours, 12 hours and so on and also know when 
> it is finally completed.
>
> On Thu, Jul 16, 2020 at 3:08 PM Chesnay Schepler  wrote:
>>
>> Have you read this part of the documentation?
>>
>> From what I understand, it provides you hooks for processing matched/timed 
>> out patterns.
>>
>>
>> On 16/07/2020 20:23, Basanth Gowda wrote:
>>
>> Hello,
>>
>> We have a use case where we want to know when a certain pattern doesn't 
>> complete within a given time frame.
>>
>> For Example A -> B -> C -> D (needs to complete in 10 minutes)
>>
>> Now with Flink if event D doesn't happen in 10 minutes, the pattern is 
>> discarded and we can get notified. We also want to track how many of them 
>> completed (even if they meet SLA). How do we achieve this with FLINK CEP or 
>> other mechanisms?
>>
>> thank you,
>> Basanth
>>
>>


Re: AllwindowStream and RichReduceFunction

2020-07-24 Thread Flavio Pompermaier
In my reduce function I want to compute some aggregation on the sub-results
of a map-partition (that I tried to migrate from DataSet to DataStream
without success).
The original code was something like:

 input.mapPartition(new RowToStringSketches(sketchMapSize)) //
.reduce(new SketchesStringReducer()) //
.map(new SketchesStringToStatsPojo(colIndex, topK));

I asked about the simulation of the mapPartition function in the streaming
env in another thread in the mailing list [1] because I was not able to
test it..it seems that the program was exiting before be able to process
anything..
So I gave up on replacing DataSet with DataStream API for the moment..it
seems that there are too many things still to migrate.
Btw, this is the reduce function:

public class SketchesStringReducer extends
RichReduceFunction> {
  private static final long serialVersionUID = 1L;

  private transient ArrayOfItemsSerDe serDe;

  @Override
  public void open(Configuration parameters) throws Exception {
this.serDe = new ArrayOfStringsSerDe();
  }

  @Override
  public Tuple2 reduce(Tuple2 t1,
Tuple2 t2)
  throws Exception {
// merge HLL
final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
final Union union = new Union(hll1.getLgConfigK());
union.update(hll1);
union.update(hll2);
final byte[] hllSketchBytes = union.getResult().toCompactByteArray();

// merge Item
final ItemsSketch s1 =
ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
final ItemsSketch s2 =
ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
return new Tuple2<>(hllSketchBytes, itemSketchBytes);
  }
}

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767

On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek 
wrote:

> What are you trying to do in the ReduceFunction? Without knowing the
> code, maybe an aggregate(AggregateFunction) is the solution.
>
> Best,
> Aljoscha
>
> On 20.07.20 18:03, Flavio Pompermaier wrote:
> > Thanks Aljosha for the reply. So what can I do in my reduce function that
> > contains transient variables (i.e. not serializable)?
> >
> > On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek 
> > wrote:
> >
> >> Hi Flavio,
> >>
> >> the reason is that under the covers the ReduceFunction will be used as
> >> the ReduceFunction of a ReducingState. And those cannot be rich
> >> functions because we cannot provide all the required context "inside"
> >> the state backend.
> >>
> >> You can see how the ReduceFunction is used to create a
> >> ReducingStateDescriptor here:
> >>
> >>
> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 16.07.20 16:28, Flavio Pompermaier wrote:
> >>> Hi to all,
> >>> I'm trying to apply a rich reduce function after a countWindowAll but
> >> Flink
> >>> says
> >>> "ReduceFunction of reduce can not be a RichFunction. Please use
> >>> reduce(ReduceFunction, WindowFunction) instead."
> >>>
> >>> Is there any good reason for this? Or am I doing something wrong?
> >>>
> >>> Best,
> >>> Flavio
> >>>
> >>
> >
>


Re: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Kostas Kloudas
Hi Tomasz,

Thanks a lot for reporting this issue. If you have verified that the
job is running AND that the REST server is also up and running (e.g.
check the overview page) then I think that this should not be
happening. I am cc'ing Chesnay who may have an additional opinion on
this.

Cheers,
Kostas

On Thu, Jul 23, 2020 at 12:59 PM Tomasz Dudziak  wrote:
>
> Hi,
>
>
>
> I have come across an issue related to GET /job/:jobId endpoint from 
> monitoring REST API in Flink 1.9.0. A few seconds after successfully starting 
> a job and confirming its status as RUNNING, that endpoint would return 404 
> (Not Found). Interestingly, querying immediately again (literally a 
> millisecond later) would return a valid result. I later noticed a similar 
> behaviour in regard to a finished job as well. At certain points in time that 
> endpoint would arbitrarily return 404, but similarly querying again would 
> succeed. I saw this strange behaviour only recently and it used to work fine 
> before.
>
>
>
> Do you know what could be the root cause of this? At the moment, as a 
> workaround I just query a job a couple of times in a row to ensure whether it 
> definitely does not exist or it is just being misreported as non-existent, 
> but this feels a bit like cottage industry…
>
>
>
> Kind regards,
>
> Tomasz
>
>
>
> Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, London, 
> SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 024 7061
>
>
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. Please 
> refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
> regarding this email. If we collect and use your personal data we will use it 
> in accordance with our privacy policy, which can be reviewed at 
> https://www.mwam.com/privacy-policy.
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P., which is registered with the US 
> Securities and Exchange Commission as an investment adviser.


Printing effective config for flint 1.11 cli

2020-07-24 Thread Senthil Kumar
Hello,

My understanding is that flink consumes the config from the config file as well 
as those specified via the -D  option.
I assume that the -D will override the values from the config file?

Is there a way to somehow see what the effective config is?
i.e. print all of the config values that flink is going to be invoked with?

We ran into an issue with the flink stop  command.
It was exiting (after about a minute) with 
java.util.concurrent.TimeoutException exception
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)

Following the docs, I tried to issue the command with -D “client.timeout=10 
min”, but it seemed to have made no difference.
That made me wonder just what config values were actually being used.

Cheers
Kumar


Re: Kafka connector with PyFlink

2020-07-24 Thread Wojciech Korczyński
Hi,

I've done like you recommended:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes,
BatchTableEnvironment, StreamTableEnvironment, ScalarFunction
from pyflink.table.descriptors import Schema, OldCsv, FileSystem,
Kafka, Json, Csv
from pyflink.table.udf import udf

exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TABLE = "my_topic"
INPUT_TOPIC = "my-topic"
LOCAL_KAFKA = 'my-cluster-kafka-bootstrap:9092'
OUTPUT_TABLE = "my_topic_output"
OUTPUT_TOPIC = "my-topic-output"

ddl_source = f"""
   CREATE TABLE {INPUT_TABLE} (
   message STRING
   ) WITH (
   'connector' = 'kafka',
   'topic' = '{INPUT_TOPIC}',
   'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
   'format' = 'json'
   )
   """

ddl_sink = f"""
   CREATE TABLE {OUTPUT_TABLE} (
   message STRING
   ) WITH (
   'connector' = 'kafka',
   'topic' = '{OUTPUT_TOPIC}',
   'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
   'format' = 'json'
   )
   """

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

result = t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT message
FROM {INPUT_TABLE}
""")

result.get_job_client().get_job_execution_result().result()

I think it is correctly written.

However after deploying that job I'm getting an error:

wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081
-py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
Traceback (most recent call last):
  File "kafka2flink.py", line 62, in 
result.get_job_client().get_job_execution_result().result()
  File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
line 78, in result
  File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116)

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

2020-07-24 Thread Tom Fennelly
Thanks Robert.

On Fri, Jul 24, 2020 at 2:32 PM Robert Metzger  wrote:

> Hey Tom,
>
> I'm not aware of any patterns for this problem, sorry. Intuitively, I
> would send dead letters to a separate Kafka topic.
>
> Best,
> Robert
>
>
> On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly 
> wrote:
>
>> Thanks Chen.
>>
>> I'm thinking about errors that occur while processing a record/message
>> that shouldn't be retried until after some "action" has been taken Vs
>> flooding the system with pointless retries e.g.
>>
>>- A side output step might involve an API call to an external system
>>and that system is down atm so there's no point retrying until further
>>notice. For this we want to be able to send something to a DLQ.
>>- We have some bad code that is resulting in an uncaught exception in
>>very specific cases. We want these to go to a DLQ and only be retried 
>> after
>>the appropriate fix has been made.
>>
>> The possible scenarios for this are numerous so I think my main question
>> would be ... are there established general Flink patterns or best practices
>> that can be applied for this, or is it something we'd need to hand-role on
>> a case by case basis with a side output type solution such as in your
>> example? We can do that but I just wanted to make sure I wasn't missing
>> anything before heading down that road.
>>
>> Regards,
>>
>> Tom.
>>
>>
>> On Wed, Jul 22, 2020 at 5:46 PM Chen Qin  wrote:
>>
>>> Could you more specific on what “failed message” means here?
>>>
>>> In general side output can do something like were
>>>
>>>
>>>
>>> def process(ele) {
>>>
>>>try{
>>>
>>> biz
>>>
>>> } catch {
>>>
>>>Sideout( ele + exception context)
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> process(func).sideoutput(tag).addSink(kafkasink)
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Chen
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From: *Eleanore Jin 
>>> *Sent: *Wednesday, July 22, 2020 9:25 AM
>>> *To: *Tom Fennelly 
>>> *Cc: *user 
>>> *Subject: *Re: Recommended pattern for implementing a DLQ with
>>> Flink+Kafka
>>>
>>>
>>>
>>> +1 we have a similar use case for message schema validation.
>>>
>>>
>>>
>>> Eleanore
>>>
>>>
>>>
>>> On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly 
>>> wrote:
>>>
>>> Hi.
>>>
>>>
>>>
>>> I've been searching blogs etc trying to see if there are
>>> established patterns/mechanisms for reprocessing of failed messages via
>>> something like a DLQ. I've read about using checkpointing and restarting
>>> tasks (not what we want because we want to keep processing forward) and
>>> then also how some use side outputs to filter "bad" data to a DLQ style
>>> topic. Kafka has dead letter topic configs too but it seems that can't
>>> really be used from inside Flink (from what I can see).
>>>
>>>
>>>
>>> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
>>> just isn't a defined pattern for it, or if I'm just not asking the right
>>> questions in my searches. I searched the archives here and don't see
>>> anything either, which obviously makes me think that I'm not thinking about
>>> this in the "Flink way" :-|
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Tom.
>>>
>>>
>>>
>>


Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

2020-07-24 Thread Timo Walther

Hi Dmytro,

`StreamTableEnvironment` does not support batch mode currently. Only 
`TableEnvironment` supports the unified story. I saw that you disabled 
the check in the `create()` method. This check exists for a reason.


For batch execution, the planner sets specific properties on the stream 
graph that the StreamExecutionEnvironment cannot handle (e.g. blocking 
inputs). My guess would be that this is the reason for your exception.


Have you tried to use the unified `TableEnvironment`?

Regards,
Timo




On 23.07.20 15:14, Dmytro Dragan wrote:

Hi All,

We are working on migration existing pipelines from Flink 1.10 to Flink 
1.11.


We are using Blink planner and have unified pipelines which can be used 
in stream and batch mode.


Stream pipelines works as expected, but batch once fail on Flink 1.11 if 
they have any table aggregation transformation.


Simple example of failed pipeline:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment./getExecutionEnvironment/();

env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);

TableConfig tableConfig = new TableConfig();
tableConfig.setIdleStateRetentionTime(
     org.apache.flink.api.common.time.Time./minutes/(10),
org.apache.flink.api.common.time.Time./minutes/(30)
);
EnvironmentSettings settings = 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();


// is created using work around with ignoring settings.isStreamingMode() 
check

StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);

DataStreamSource streamSource = env.fromCollection(/asList/(new 
A("1"), new A("2")));


Table table = tEnv.fromDataStream(streamSource);
tEnv.createTemporaryView("A", table);

String sql = "select s from A group by s";

tEnv
.toRetractStream(tEnv.sqlQuery(sql), Row.class)
  .flatMap(new RetractFlatMap())
  .map(Row::toString)
  .addSink(new TestSinkFunction<>());

env.execute("");

/values/.forEach(System./out/::println);

Exception:

Caused by: java.lang.IllegalStateException: Trying to consume an input 
partition whose producer is not ready (result type: BLOCKING, partition 
consumable: false, producer state: DEPLOYING, partition id: 
9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).


     at 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)


     …

Adding StreamTableEnvironment execute does not help.

Could you please advise what I`m missing?





Re: Recommended pattern for implementing a DLQ with Flink+Kafka

2020-07-24 Thread Robert Metzger
Hey Tom,

I'm not aware of any patterns for this problem, sorry. Intuitively, I would
send dead letters to a separate Kafka topic.

Best,
Robert


On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly 
wrote:

> Thanks Chen.
>
> I'm thinking about errors that occur while processing a record/message
> that shouldn't be retried until after some "action" has been taken Vs
> flooding the system with pointless retries e.g.
>
>- A side output step might involve an API call to an external system
>and that system is down atm so there's no point retrying until further
>notice. For this we want to be able to send something to a DLQ.
>- We have some bad code that is resulting in an uncaught exception in
>very specific cases. We want these to go to a DLQ and only be retried after
>the appropriate fix has been made.
>
> The possible scenarios for this are numerous so I think my main question
> would be ... are there established general Flink patterns or best practices
> that can be applied for this, or is it something we'd need to hand-role on
> a case by case basis with a side output type solution such as in your
> example? We can do that but I just wanted to make sure I wasn't missing
> anything before heading down that road.
>
> Regards,
>
> Tom.
>
>
> On Wed, Jul 22, 2020 at 5:46 PM Chen Qin  wrote:
>
>> Could you more specific on what “failed message” means here?
>>
>> In general side output can do something like were
>>
>>
>>
>> def process(ele) {
>>
>>try{
>>
>> biz
>>
>> } catch {
>>
>>Sideout( ele + exception context)
>>
>> }
>>
>> }
>>
>>
>>
>> process(func).sideoutput(tag).addSink(kafkasink)
>>
>>
>>
>> Thanks,
>>
>> Chen
>>
>>
>>
>>
>>
>>
>>
>> *From: *Eleanore Jin 
>> *Sent: *Wednesday, July 22, 2020 9:25 AM
>> *To: *Tom Fennelly 
>> *Cc: *user 
>> *Subject: *Re: Recommended pattern for implementing a DLQ with
>> Flink+Kafka
>>
>>
>>
>> +1 we have a similar use case for message schema validation.
>>
>>
>>
>> Eleanore
>>
>>
>>
>> On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly 
>> wrote:
>>
>> Hi.
>>
>>
>>
>> I've been searching blogs etc trying to see if there are
>> established patterns/mechanisms for reprocessing of failed messages via
>> something like a DLQ. I've read about using checkpointing and restarting
>> tasks (not what we want because we want to keep processing forward) and
>> then also how some use side outputs to filter "bad" data to a DLQ style
>> topic. Kafka has dead letter topic configs too but it seems that can't
>> really be used from inside Flink (from what I can see).
>>
>>
>>
>> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
>> just isn't a defined pattern for it, or if I'm just not asking the right
>> questions in my searches. I searched the archives here and don't see
>> anything either, which obviously makes me think that I'm not thinking about
>> this in the "Flink way" :-|
>>
>>
>>
>> Regards,
>>
>>
>>
>> Tom.
>>
>>
>>
>


Re: Flink app cannot restart

2020-07-24 Thread Robert Metzger
Hi Rainie,

I believe we need the full JobManager log to understand what's going on
with your job. The logs you've provided so far only tell us that a
TaskManager has died (which is expected, when a node goes down). What is
interesting to see is what's happening next: are we having enough resources
to restart the job? is there some issue restarting it?

If you feel uncomfortable sharing the full logs on a public mailing list,
feel free to send the logs just to Yang Wang and/or me.

Best,
Robert


On Thu, Jul 23, 2020 at 9:18 AM Rainie Li  wrote:

> Thank you Yang, I checked "yarn.application-attempts" is already set to 10.
> Here is the exception part from job manager log. Full log file is too big,
> I also reflected it to remove some company specific info.
> Any suggestion to this exception would be appreciated!
>
> 2020-07-15 20:04:52,265 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 490 @ 1223
>  for job 3a5aca9433cad1b6caa1b11227b9aa4a.
> 2020-07-15 20:04:55,987 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 490 for job 39393993 (3886147 bytes in 3736 ms).
> 2020-07-15 20:09:41,317 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- (137/240)
> (39393993) switched from RUNNING to FAILED on container_e01_id @ cluster
> name (dataPort=43743).
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> container_e01_id timed out.
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-07-15 20:09:41,324 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job name
> (job id) switched from state RUNNING to FAILING.
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> container_e01_id timed out.
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at 

Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
hi,
感谢回复,尝试了多次之后,发现应该不是依赖包的问题


我项目中新增目录:resources/META-INF/services
然后从Flink源码中复制了2个文件 
org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。


在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>hi
>只需要-sql和-json两个包就可以了
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>On 07/24/2020 17:02, RS wrote:
>hi,
>Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>编译的jar包是jar-with-dependencies的
>
>
>代码片段:
>   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>   "  number BIGINT,\n" +
>   "  msg STRING,\n" +
>   "  username STRING,\n" +
>   "  update_time TIMESTAMP(3)\n" +
>   ") WITH (\n" +
>   " 'connector' = 'kafka',\n" +
>   " 'topic' = '%s',\n" +
>   " 'properties.bootstrap.servers' = '%s',\n" +
>   " 'properties.group.id' = '%s',\n" +
>   " 'format' = 'json',\n" +
>   " 'json.fail-on-missing-field' = 'false',\n" +
>   " 'json.ignore-parse-errors' = 'true'\n" +
>   ")\n", tableName, topic, servers, group);
>
>
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>   tableEnv.executeSql(ddlSql);
>
>
>报错信息:
>Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
>factory for identifier 'kafka' that implements 
>'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
>Available factory identifiers are:
>datagen
>at 
>org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>at 
>org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>... 33 more
>
>
>参考了这个 
>http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
>附上pom依赖:
>
>   
>   org.apache.flink
>   flink-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java-bridge_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-sql-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-json
>   ${flink.version}
>   
>   
>
>
>感谢各位~


Re: How to start flink standalone session on windows ?

2020-07-24 Thread Chesnay Schepler
Flink no longer runs natively on Windows; you will have to use some 
unix-like environment like WSL or cygwin.



On 24/07/2020 04:27, wangl...@geekplus.com.cn wrote:


There's no  start-cluster.bat and flink.bat in bin directory.

So how can i start flink on windowns OS?

Thanks,
Lei

wangl...@geekplus.com.cn 





Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread JasonLee
hi
只需要-sql和-json两个包就可以了


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 07/24/2020 17:02, RS wrote:
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
   public String ddlSql = String.format("CREATE TABLE %s (\n" +
   "  number BIGINT,\n" +
   "  msg STRING,\n" +
   "  username STRING,\n" +
   "  update_time TIMESTAMP(3)\n" +
   ") WITH (\n" +
   " 'connector' = 'kafka',\n" +
   " 'topic' = '%s',\n" +
   " 'properties.bootstrap.servers' = '%s',\n" +
   " 'properties.group.id' = '%s',\n" +
   " 'format' = 'json',\n" +
   " 'json.fail-on-missing-field' = 'false',\n" +
   " 'json.ignore-parse-errors' = 'true'\n" +
   ")\n", tableName, topic, servers, group);


   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
   tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:

   
   org.apache.flink
   flink-java
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java-bridge_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java
   ${flink.version}
   
   
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-json
   ${flink.version}
   
   


感谢各位~

Re: Flink DataStream[String] kafkacosumer avro streaming file sink

2020-07-24 Thread Robert Metzger
Thank you for your question. I responded on StackOverflow.
Let's finish the discussion there.

On Fri, Jul 24, 2020 at 5:07 AM Vijayendra Yadav 
wrote:

> Hi Flink Team,
>
> *FLINK Streaming:* I have DataStream[String] from kafkaconsumer
>
> DataStream stream = env
> .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), 
> properties));
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>
> I have to sink this string stream using StreamingFileSink, which needs
> DataStream[GenericRecord]
>
> val schema: Schema = ...
> val input: DataStream[GenericRecord] = ...
> val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
> .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
> .build()
> input.addSink(sink)
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
>
> *Question:* How to convert DataStream[String] to
> DataStream[GenericRecord] before Sinking so that I can write AVRO files ?
>
>
> https://stackoverflow.com/questions/63065945/flink-datastreamstring-kafkacosumer-convert-to-avro-for-sink
>
> Regards,
> Vijay
>


Re: Flink 1.11 job stop with save point timeout error

2020-07-24 Thread Robert Metzger
Hi Ivan,
thanks a lot for your message. Can you post the JobManager log here as
well? It might contain additional information on the reason for the timeout.

On Fri, Jul 24, 2020 at 4:03 AM Ivan Yang  wrote:

> Hello everyone,
>
> We recently upgrade FLINK from 1.9.1 to 1.11.0. Found one strange behavior
> when we stop a job to a save point got following time out error.
> I checked Flink web console, the save point is created in s3 in 1
> second.The job is fairly simple, so 1 second for savepoint generation is
> expected. We use kubernetes deployment. I clocked it, it’s about 60 seconds
> when it returns this error. So afterwards, the job is hanging (it still
> says running, but actually not doing anything). I need run another command
> to cancel it. Anyone has idea what’s going on here? BTW, “flink stop works”
> in 1.19.1 for us before
>
>
>
> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ flink stop
> 88d9b46f59d131428e2a18c9c7b3aa3f
> Suspending job "88d9b46f59d131428e2a18c9c7b3aa3f" with a savepoint.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "88d9b46f59d131428e2a18c9c7b3aa3f".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
> ... 9 more
> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$
>
>
> Thanks in advance,
> Ivan
>


Re: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Chesnay Schepler

How reproducible is this problem / how often does it occur?
How is the cluster deployed?
Is anything else happening to the cluster around that that time (like a 
JobMaster failure)?


On 24/07/2020 13:28, Tomasz Dudziak wrote:


Hi,

I have come across an issue related to GET /job/:jobId endpoint from 
monitoring REST API in Flink 1.9.0. A few seconds after successfully 
starting a job and confirming its status as RUNNING, that endpoint 
would return 404 (Not Found). Interestingly, querying immediately 
again (literally a millisecond later) would return a valid result. I 
later noticed a similar behaviour in regard to a finished job as well. 
At certain points in time that endpoint would arbitrarily return 404, 
but similarly querying again would succeed. I saw this strange 
behaviour only recently and it used to work fine before.


Do you know what could be the root cause of this? At the moment, as a 
workaround I just query a job a couple of times in a row to ensure 
whether it definitely does not exist or it is just being misreported 
as non-existent, but this feels a bit like cottage industry…


Kind regards,

Tomasz

*Tomasz Dudziak *| Marshall Wace LLP, George House, 131 Sloane Street, 
London, SW1X 9AT |**E-mail: t.dudz...@mwam.com 
| Tel: +44 207 024 7061


This e-mail and any attachments are confidential to the addressee(s) 
and may contain information that is legally privileged and/or 
confidential. Please refer to http://www.mwam.com/email-disclaimer-uk 
for important disclosures regarding this email. If we collect and use 
your personal data we will use it in accordance with our privacy 
policy, which can be reviewed at https://www.mwam.com/privacy-policy.


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership 
registered in England and Wales with registered number OC302228 and 
registered office at George House, 131 Sloane Street, London, SW1X 
9AT. If you are receiving this e-mail as a client, or an investor in 
an investment vehicle, managed or advised by Marshall Wace North 
America L.P., the sender of this e-mail is communicating with you in 
the sender's capacity as an associated or related person of Marshall 
Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.






Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread admin
  
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   

这两个会有冲突,去掉上面那个

> 2020年7月24日 下午5:02,RS  写道:
> 
>   
>org.apache.flink
>flink-connector-kafka_2.12
>${flink.version}
>
>
>org.apache.flink
>flink-sql-connector-kafka_2.12
>${flink.version}
>



REST API randomly returns Not Found for an existing job

2020-07-24 Thread Tomasz Dudziak
Hi,

I have come across an issue related to GET /job/:jobId endpoint from monitoring 
REST API in Flink 1.9.0. A few seconds after successfully starting a job and 
confirming its status as RUNNING, that endpoint would return 404 (Not Found). 
Interestingly, querying immediately again (literally a millisecond later) would 
return a valid result. I later noticed a similar behaviour in regard to a 
finished job as well. At certain points in time that endpoint would arbitrarily 
return 404, but similarly querying again would succeed. I saw this strange 
behaviour only recently and it used to work fine before.

Do you know what could be the root cause of this? At the moment, as a 
workaround I just query a job a couple of times in a row to ensure whether it 
definitely does not exist or it is just being misreported as non-existent, but 
this feels a bit like cottage industry...

Kind regards,
Tomasz


Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, London, 
SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 
024 7061


This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. Please 
refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
regarding this email. If we collect and use your personal data we will use it 
in accordance with our privacy policy, which can be reviewed at 
https://www.mwam.com/privacy-policy .

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.


Re: flink sql 读取mysql

2020-07-24 Thread admin
 'connector.properties.zookeeper.connect' = '',  -- zk 地址
   'connector.properties.bootstrap.servers' = '',  -- broker 地址

'connector.username' = '',
   'connector.password' = ‘',
这几行有问题吧

> 2020年7月24日 下午4:20,liunaihua521  写道:
> 
>  'connector.properties.zookeeper.connect' = '',  -- zk 地址
>'connector.properties.bootstrap.servers' = '',  -- broker 地址



flink1.11??????????????????mysql????????

2020-07-24 Thread ??????
flink1.11kafkamysqlkafka300??/??mysql??6??


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = '*',
'properties.group.id' = 'flink_grouper',
'scan.startup.mode' = 'earliest-offset', 
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
id VARCHAR, 
alarm_id VARCHAR,  
trck_id VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 'detail',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select 
id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()

Re: Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Manas Kale
Okay, thanks for the info!

On Fri, Jul 24, 2020 at 2:11 PM Leonard Xu  wrote:

> Hi, Kale
>
> Unfortunately Flink SQL does not support read/write Kafka message keys
> yet, there is a FLIP[1] to discuss this feature.
>
> Best
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Readingandwritingfromkey,value,timestamp
> 
>
> 在 2020年7月24日,15:01,Manas Kale  写道:
>
> Hi,
> How do I read/write Kafka message keys using DDL? I have not been able to
> see any documentation for the same.
>
> Thanks!
>
>
>


回复: Re: flink 1.11 cdc相关问题

2020-07-24 Thread amen...@163.com
多谢!已关注~


Best


amen...@163.com
 
发件人: Leonard Xu
发送时间: 2020-07-24 16:20
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi amenhub
 
针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]
 
Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 

[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html 

 
> 在 2020年7月23日,09:14,amen...@163.com 写道:
>
> 感谢二位大佬@Leonard, @Jark的解答!
>
>
>
> amen...@163.com
>
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
>
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
>
>> Hello,
>>
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>>
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>
>>
>> {
>>"payload": {
>>"before": null,
>>"after": {
>>"id": 2,
>>"name": "liushimin",
>>"age": "24",
>>"sex": "man",
>>"phone": "1"
>>},
>>"source": {
>>"version": "1.2.0.Final",
>>"connector": "postgresql",
>>"name": "postgres",
>>"ts_ms": 1595409754151,
>>"snapshot": "false",
>>"db": "postgres",
>>"schema": "public",
>>"table": "person",
>>"txId": 569,
>>"lsn": 23632344,
>>"xmin": null
>>},
>>"op": "u",
>>"ts_ms": 1595409754270,
>>"transaction": null
>>}
>> }
>>
>>> 在 2020年7月22日,17:34,amen...@163.com 写道:
>>>
>>>
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>>
>>
 

Re: 关于 sql-client

2020-07-24 Thread Harold.Miao
这个呢  https://github.com/ververica/flink-sql-gateway

杨荣  于2020年7月24日周五 下午3:19写道:

> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re:Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
邮件格式不对,我重新回复下


我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。

> flink run xxx

没有使用shade-plugin

maven build参数:

1.8
1.11.1










maven-compiler-plugin



${jdk.version}

${jdk.version}







org.apache.maven.plugins

maven-assembly-plugin





package



single











jar-with-dependencies














Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
我这边是直接打成jar包扔到服务器上运行的(bin/flink run 
xxx),没有在IDEA运行过。maven编译没配置shade-plugin,maven build参数如下:
propertiesjdk.version1.8/jdk.version  
  flink.version1.11.1/flink.version
/propertiesbuildplugins  
  plugin
artifactIdmaven-compiler-plugin/artifactId
configuration
source${jdk.version}/source
target${jdk.version}/target
/configuration/plugin
plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-assembly-plugin/artifactId
executionsexecution   
 phasepackage/phase
goals
goalsingle/goal/goals 
   /execution
/executionsconfiguration  
  descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef   
 /descriptorRefs
/configuration/plugin
/plugins/buildthx
在 2020-07-24 17:36:46,"Benchao Li"  写道:
>可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
>
>如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
>如果你用的是shade plugin,需要看下这个transformer[1]
>
>[1]
>https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer
>
>RS  于2020年7月24日周五 下午5:02写道:
>
>> hi,
>> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> 编译的jar包是jar-with-dependencies的
>>
>>
>> 代码片段:
>> public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> "  number BIGINT,\n" +
>> "  msg STRING,\n" +
>> "  username STRING,\n" +
>> "  update_time TIMESTAMP(3)\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = '%s',\n" +
>> " 'properties.bootstrap.servers' = '%s',\n" +
>> " 'properties.group.id' = '%s',\n" +
>> " 'format' = 'json',\n" +
>> " 'json.fail-on-missing-field' = 'false',\n" +
>> " 'json.ignore-parse-errors' = 'true'\n" +
>> ")\n", tableName, topic, servers, group);
>>
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> tableEnv.executeSql(ddlSql);
>>
>>
>> 报错信息:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> Available factory identifiers are:
>> datagen
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 33 more
>>
>>
>> 参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>>
>>
>> 附上pom依赖:
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-sql-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-json
>> ${flink.version}
>> 
>> 
>>
>>
>> 感谢各位~
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread Benchao Li
可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
如果你用的是shade plugin,需要看下这个transformer[1]

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer

RS  于2020年7月24日周五 下午5:02写道:

> hi,
> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> 编译的jar包是jar-with-dependencies的
>
>
> 代码片段:
> public String ddlSql = String.format("CREATE TABLE %s (\n" +
> "  number BIGINT,\n" +
> "  msg STRING,\n" +
> "  username STRING,\n" +
> "  update_time TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = '%s',\n" +
> " 'properties.bootstrap.servers' = '%s',\n" +
> " 'properties.group.id' = '%s',\n" +
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")\n", tableName, topic, servers, group);
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> tableEnv.executeSql(ddlSql);
>
>
> 报错信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> Available factory identifiers are:
> datagen
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 33 more
>
>
> 参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
> 附上pom依赖:
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java-bridge_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-sql-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
>
>
> 感谢各位~



-- 

Best,
Benchao Li


Re: Kafka connector with PyFlink

2020-07-24 Thread Xingbo Huang
Hi Wojtek,
The following ways of using Pyflink is my personal recommendation:

1. Use DDL[1] to create your source and sink instead of the descriptor way,
because as of flink 1.11, there are some bugs in the descriptor way.

2. Use `execute_sql` for single statement, use `create_statement_set` for
multiple DML statements.[2]

3. Use `execute_insert` for single sink, use
`TableTableEnvironment#create_statement_set` for multiple sinks

4. Use `from_path` method instead of `scan` method

5. Call the method `get_job_client().get_job_execution_result().result()`
of TableResult  which is the returned type of execute_insert or execute_sql
after calling the method `excute_*`


All PyFlink related common questions you can refer to the doc[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html

Best,
Xingbo

Wojciech Korczyński  于2020年7月24日周五
下午4:44写道:

> Hi,
> thank you for your answer, it is very helpful.
>
> Currently my python program looks like:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
> StreamTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, 
> Json, Csv
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_config = TableConfig()
> t_env = StreamTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(Kafka()
>   .version("universal")
>   .topic("my-topic")
>   .property("bootstrap.servers", 
> 'my-cluster-kafka-bootstrap:9092')
>   ) \
> .in_append_mode() \
> .with_format(Csv()
>  .line_delimiter("\r\n") \
>  .derive_schema()) \
> .with_schema(Schema()
>  .field("value", DataTypes.STRING())) \
> .create_temporary_table('mySource')
>
> t_env.connect(Kafka()
>   .version("universal")
>   .topic("my-topic-out")
>   .property("bootstrap.servers", 
> 'my-cluster-kafka-bootstrap:9092')
>   ) \
> .with_format(Csv()
>  .line_delimiter("\r\n") \
>  .derive_schema()) \
> .with_schema(Schema()
>  .field("value", DataTypes.STRING())) \
> .in_append_mode() \
> .create_temporary_table('mySink')
>
>
> t_env.scan('mySource') \
> .select('"flink_job_" + value') \
> .insert_into('mySink')
>
> t_env.execute("tutorial_job")
>
> I have installed PyFlink 1.11 so the IDE is pointing me out the commands
> connect, scan, insert_into, *execute *are deprectade. What is the correct
> way the program should be different following 1.11 version of PyFlink?
>
> Kind regards,
> Wojtek
>
>
> pt., 24 lip 2020 o 04:21 Xingbo Huang  napisał(a):
>
>> Hi Wojtek,
>> In flink 1.11, the methods register_table_source and register_table_sink
>> of ConnectTableDescriptor have been removed. You need to use
>> createTemporaryTable instead of these two methods.Besides, it seems that
>> the version of your pyflink is 1.10, but the corresponding flink is 1.11.
>>
>> Best,
>> Xingbo
>>
>> Wojciech Korczyński  于2020年7月23日周四
>> 下午9:01写道:
>>
>>> Thank you for your answer.
>>>
>>> I have replaced that .jar with Kafka version universal - the links to
>>> other versions are extinct.
>>>
>>> After the attempt of deploying:
>>> bin/flink run -py
>>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
>>> /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar
>>>
>>> there another error occurs:
>>> Traceback (most recent call last):
>>>   File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
>>> line 20, in 
>>> .field("tbd", DataTypes.INT())) \
>>> AttributeError: 'StreamTableDescriptor' object has no attribute
>>> 'register_table_source'
>>> org.apache.flink.client.program.ProgramAbortException
>>> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> at

Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
public String ddlSql = String.format("CREATE TABLE %s (\n" +
"  number BIGINT,\n" +
"  msg STRING,\n" +
"  username STRING,\n" +
"  update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = '%s',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")\n", tableName, topic, servers, group);


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}


org.apache.flink
flink-table-api-java
${flink.version}


org.apache.flink
flink-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-json
${flink.version}




感谢各位~

Re: GenericData cannot be cast to type scala.Product

2020-07-24 Thread Aljoscha Krettek
For anyone following this: the discussion is happening on the Jira 
issue: https://issues.apache.org/jira/browse/FLINK-18478


Best,
Aljoscha

On 23.07.20 15:32, Georg Heiler wrote:

Hi,

as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now
face a class cast exception.
The reproducible example is available at
https://gist.github.com/geoHeil/5a5a4ae0ca2a8049617afa91acf40f89

I do not understand (yet) why such a simple example of reading Avro from a
Schema Registry and Kafka (in the scala API) is still causing problems.

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to scala.Product

ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to scala.Product
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(
CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordsWithTimestamps(AbstractFetcher.java:352)
~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1
.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11
.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

Best,
Georg





Re: Kafka connector with PyFlink

2020-07-24 Thread Wojciech Korczyński
Hi,
thank you for your answer, it is very helpful.

Currently my python program looks like:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes,
BatchTableEnvironment, StreamTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem,
Kafka, Json, Csv

exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

t_env.connect(Kafka()
  .version("universal")
  .topic("my-topic")
  .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092')
  ) \
.in_append_mode() \
.with_format(Csv()
 .line_delimiter("\r\n") \
 .derive_schema()) \
.with_schema(Schema()
 .field("value", DataTypes.STRING())) \
.create_temporary_table('mySource')

t_env.connect(Kafka()
  .version("universal")
  .topic("my-topic-out")
  .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092')
  ) \
.with_format(Csv()
 .line_delimiter("\r\n") \
 .derive_schema()) \
.with_schema(Schema()
 .field("value", DataTypes.STRING())) \
.in_append_mode() \
.create_temporary_table('mySink')


t_env.scan('mySource') \
.select('"flink_job_" + value') \
.insert_into('mySink')

t_env.execute("tutorial_job")

I have installed PyFlink 1.11 so the IDE is pointing me out the commands
connect, scan, insert_into, *execute *are deprectade. What is the correct
way the program should be different following 1.11 version of PyFlink?

Kind regards,
Wojtek


pt., 24 lip 2020 o 04:21 Xingbo Huang  napisał(a):

> Hi Wojtek,
> In flink 1.11, the methods register_table_source and register_table_sink
> of ConnectTableDescriptor have been removed. You need to use
> createTemporaryTable instead of these two methods.Besides, it seems that
> the version of your pyflink is 1.10, but the corresponding flink is 1.11.
>
> Best,
> Xingbo
>
> Wojciech Korczyński  于2020年7月23日周四
> 下午9:01写道:
>
>> Thank you for your answer.
>>
>> I have replaced that .jar with Kafka version universal - the links to
>> other versions are extinct.
>>
>> After the attempt of deploying:
>> bin/flink run -py
>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
>> /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar
>>
>> there another error occurs:
>> Traceback (most recent call last):
>>   File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line
>> 20, in 
>> .field("tbd", DataTypes.INT())) \
>> AttributeError: 'StreamTableDescriptor' object has no attribute
>> 'register_table_source'
>> org.apache.flink.client.program.ProgramAbortException
>> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>
>> Maybe the way the python program is written is incorrect. Can it be
>> deprecated taking into account that the installed flink version is 1.11?
>>
>> Best regards,
>> Wojtek
>>
>> czw., 23 lip 2020 o 12:01 Xingbo Huang  napisał(a):
>>
>>> Hi Wojtek,
>>> you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar'
>>> which you can download in the doc[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>>
>>> Best,
>>> Xingbo
>>>
>>> Wojciech Korczyński  于2020年7月23日周四
>>> 下午4:57写道:
>>>
 Hello,

 I am trying to deploy a Python job with Kafka connector:

 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.table import TableConfig, DataTypes,
 BatchTableEnvironment, 

Re: Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Leonard Xu
Hi, Kale

Unfortunately Flink SQL does not support read/write Kafka message keys yet, 
there is a FLIP[1] to discuss this feature. 

Best
Leonard Xu
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Readingandwritingfromkey,value,timestamp
 


> 在 2020年7月24日,15:01,Manas Kale  写道:
> 
> Hi,
> How do I read/write Kafka message keys using DDL? I have not been able to see 
> any documentation for the same.
> 
> Thanks!
> 



Re: Re: 关于 sql-client

2020-07-24 Thread chengyanan1...@foxmail.com
zeppelin 可以网页上提交各种作业,也是很不错的
另外 submit with SQL file 可以参考大佬写的 https://github.com/wuchong/flink-sql-submit,
然后在大佬的基础上,我自己稍微简化了一下,https://github.com/Chengyanan1008/flink-sql-submit-client
直接在服务器上执行./sql-submit.sh -f  就可以执行SQL 文件了




chengyanan1...@foxmail.com
 
发件人: Jeff Zhang
发送时间: 2020-07-24 15:46
收件人: user-zh
主题: Re: 关于 sql-client
可以用zeppelin来提交flink sql作业,可以加入钉钉群讨论:32803524
 
杨荣  于2020年7月24日周五 下午3:19写道:
 
> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>
 
 
-- 
Best Regards
 
Jeff Zhang


Re: flink sql 读取mysql

2020-07-24 Thread Leonard Xu
Hello

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句

祝好
Leonard Xu
> 在 2020年7月24日,16:20,liunaihua521  写道:
> 
> org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered 
> "timestamp,"at line
> Was expecting one of:
> "CURSOR"...



Re: flink 1.11 cdc相关问题

2020-07-24 Thread Leonard Xu
Hi amenhub

针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]

Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 

[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html 


> 在 2020年7月23日,09:14,amen...@163.com 写道:
> 
> 感谢二位大佬@Leonard, @Jark的解答!
> 
> 
> 
> amen...@163.com
> 
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
> 
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
> 
> Best,
> Jark
> 
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
> 
>> Hello,
>> 
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>> 
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>> 
>> 
>> {
>>"payload": {
>>"before": null,
>>"after": {
>>"id": 2,
>>"name": "liushimin",
>>"age": "24",
>>"sex": "man",
>>"phone": "1"
>>},
>>"source": {
>>"version": "1.2.0.Final",
>>"connector": "postgresql",
>>"name": "postgres",
>>"ts_ms": 1595409754151,
>>"snapshot": "false",
>>"db": "postgres",
>>"schema": "public",
>>"table": "person",
>>"txId": 569,
>>"lsn": 23632344,
>>"xmin": null
>>},
>>"op": "u",
>>"ts_ms": 1595409754270,
>>"transaction": null
>>}
>> }
>> 
>>> 在 2020年7月22日,17:34,amen...@163.com 写道:
>>> 
>>> 
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>> 
>> 



回复: flink sql 读取mysql

2020-07-24 Thread liunaihua521
hi!
您好,我明白您的意思了,并且看了下网上的资料,改完后如下


DDL:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (
'connector.type' = 'kafka',  -- kafka connector
'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = '',  -- zk 地址
'connector.properties.bootstrap.servers' = '',  -- broker 地址
'format.type' = 'json'  -- 数据源格式为 json
);




CREATE TABLE category_info (
parent_id BIGINT, -- 商品大类
category_id BIGINT  -- 商品详细类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://:3306/flinkdemo',
'connector.table' = 'category_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = '',
'connector.password' = '',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);


SQL:


SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF 
U.proctime AS C
ON U.category_id = C.category_id;


但是执行SQL报错了(由于代码在办公环境粘不出来,就手打如下部分):
org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered 
"timestamp,"at line
Was expecting one of:
"CURSOR"...
"EXISTS"...
"NOT"...
"ROW"...
"("...


一直调试不好,望指教




在2020年7月24日 14:25,Leonard Xu 写道:
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1]

祝好
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 


在 2020年7月24日,14:14,liunaihua521  写道:

hi!
版本:flink  1.10
mysql 5.7.24

需求场景是:
使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.






Flink CPU利用率低

2020-07-24 Thread guaishushu1...@163.com


想问下大佬们 Flink的cpu利用率这么低吗 0.012?


guaishushu1...@163.com


Re: 关于 sql-client

2020-07-24 Thread Jeff Zhang
可以用zeppelin来提交flink sql作业,可以加入钉钉群讨论:32803524

杨荣  于2020年7月24日周五 下午3:19写道:

> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 
Best Regards

Jeff Zhang


Re: 关于 sql-client

2020-07-24 Thread 杨荣
你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
submit with sql file 的 feature 到现在都还没实现呢。

Harold.Miao  于2020年7月24日周五 上午11:42写道:

> 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> 源码里面有加载主配置文件的逻辑
>
> public LocalExecutor(URL defaultEnv, List jars, List libraries) {
>// discover configuration
>final String flinkConfigDir;
>try {
>   // find the configuration directory
>   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
>
>   // load the global configuration
>   this.flinkConfig =
> GlobalConfiguration.loadConfiguration(flinkConfigDir);
>
>   // initialize default file system
>   FileSystem.initialize(flinkConfig,
> PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
>
>   // load command lines for deployment
>   this.commandLines =
> CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
>   this.commandLineOptions = collectCommandLineOptions(commandLines);
>} catch (Exception e) {
>   throw new SqlClientException("Could not load Flink configuration.",
> e);
>}
>
>
> 2  因为等不及官方的  我们自己wrapper实现了一个
>
>
>
>
> 杨荣  于2020年7月24日周五 上午10:53写道:
>
> > Hi all,
> >
> > 请问:
> > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> >
> > 2. GateWay mode 预计在那个版本 release?
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Manas Kale
Hi,
How do I read/write Kafka message keys using DDL? I have not been able to
see any documentation for the same.

Thanks!


Re: flink sql 读取mysql

2020-07-24 Thread Leonard Xu

另外社区中文邮件交流直接发邮件到user-zh@flink.apache.org 
就可以了,不用发user-zh-...@flink.apache.org 
 这个地址。


> 在 2020年7月24日,14:25,Leonard Xu  写道:
> 
> Hello
> 图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] 
> 
> 祝好
> Leonard Xu
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
>  
> 
>  
> 
>> 在 2020年7月24日,14:14,liunaihua521 > > 写道:
>> 
>> hi!
>> 版本:flink  1.10
>> mysql 5.7.24
>> 
>> 需求场景是:
>> 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
>> connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
>> 
>> 现在本地测试时,维表的DDL是:
>> 
>> 但是去mysql修改了数据后,join操作还是旧数据.
>> 
>> 望大神们指点方向,提前谢谢了.
>> 
>> 
>> 
> 



Re: flink sql 读取mysql

2020-07-24 Thread Leonard Xu
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] 

祝好
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 

 

> 在 2020年7月24日,14:14,liunaihua521  写道:
> 
> hi!
> 版本:flink  1.10
> mysql 5.7.24
> 
> 需求场景是:
> 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
> connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
> 
> 现在本地测试时,维表的DDL是:
> 
> 但是去mysql修改了数据后,join操作还是旧数据.
> 
> 望大神们指点方向,提前谢谢了.
> 
> 
> 



flink sql 读取mysql

2020-07-24 Thread liunaihua521
hi!
版本:flink  1.10
mysql 5.7.24


需求场景是:
使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?


现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.


望大神们指点方向,提前谢谢了.