退订

2021-03-29 Thread 于林之下

退订

| |
于林之下
|
|
邮箱:sishengqikuo...@163.com
|

签名由 网易邮箱大师 定制

Re:回复:flink 从mysql读取数据异常

2021-03-29 Thread air23
这边是想离线读取。不是走实时的 
看到异常是 Only insert statement is supported now

















在 2021-03-30 10:31:51,"guoyb" <861277...@qq.com> 写道:
>可以读取的,还有内置flink cdc
>select得用query方法,看看是不是用错了execute。
>
>
>
>---原始邮件---
>发件人: "air23"发送时间: 2021年3月30日(周二) 上午10:25
>收件人: "user-zh"主题: flink 从mysql读取数据异常
>
>
>你好 参考官网 
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
>这边读取mysql jdbc数据报错Exception in thread "main" 
>org.apache.flink.table.api.TableException: Only insert statement is supported 
>now.
>
>
>String a = "-- register a MySQL table 'users' in Flink SQL\n" +
>"CREATE TABLE MyUserTable (\n" +
>" id BIGINT\n" +
>") WITH (\n" +
>" 'connector' = 'jdbc',\n" +
>" 'url' = 'jdbc:mysql://***:3306/monitor',\n" +
>" 'table-name' = 't1',\n" +
>" 'username' = 'root',\n" +
>" 'password' = '***'\n" +
>") ";
>
>String b ="-- scan data from the JDBC table\n" +
>"SELECT id FROM MyUserTable\n";
>
>tEnv.executeSql(a);
>
>
>
>请问是不可以从mysql读取数据吗?


Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Sumeet Malhotra
Thanks. Yes, that's a possibility. I'd still prefer something that can be
done within the Table API. If it's not possible, then there's no other
option but to use the DataStream API to read from Kafka, do the time
conversion and create a table from it.

..Sumeet

On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski 
wrote:

> Hi,
>
> I hope someone else might have a better answer, but one thing that would
> most likely work is to convert this field and define even time during
> DataStream to table conversion [1]. You could always pre process this field
> in the DataStream API.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>
> pon., 29 mar 2021 o 18:07 Sumeet Malhotra 
> napisał(a):
>
>> Hi,
>>
>> Might be a simple, stupid question, but I'm not able to find how to
>> convert/interpret a UTC datetime string like
>> *2021-03-23T07:37:00.613910Z* as event-time using a DDL/Table API. I'm
>> ingesting data from Kafka and can read this field as a string, but would
>> like to mark it as event-time by defining a watermark.
>>
>> I'm able to achieve this using the DataStream API, by defining my own
>> TimestampAssigner that converts the datetime string to milliseconds since
>> epoch. How can I do this using a SQL DDL or Table API?
>>
>> I tried to directly interpret the string as TIMESTAMP(3) but it fails
>> with the following exception:
>>
>> java.time.format.DateTimeParseException: Text
>> '2021-03-23T07:37:00.613910Z' could not be parsed...
>>
>> Any pointers?
>>
>> Thanks!
>> Sumeet
>>
>>


Fw:A question about flink watermark illustration in official documents

2021-03-29 Thread 罗昊















Recently I read flink official documents for something about watermarks。
url:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
there are two pictures illustrating flink watermark mechanism, which puzzle me 
mush:




The first picture is easy to understand, But in the second, I wonder how do we 
get w(11) and w(17)?
As we know, we can define how to generate watermark in the flink job, in other 
words, watermarks are generated by certain rules. So what are the rules that 
the watermarks are generated in the second pic.


I look up for almost  all offficial documents of different version flink and 
they use the same pictures.
It puzzled me much。Is there any explaination?
waiting for your answers ,Thx!


















 

Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-29 Thread Yang Wang
Hi Fuyao,

Thanks for trying the native Kubernetes integration.

Just like you know, the Flink rest service could be exposed in following
three types, configured via "kubernetes.rest-service.exposed.type".

* ClusterIP, which means you could only access the Flink rest endpoint
inside the K8s cluster. Simply, users could start a Flink client in the
K8s cluster via the following yaml file. And use "kubectl exec" to tunnel
in the pod to create a Flink session/application cluster. Also the
"flink list/cancel" could work well.



















*apiVersion: apps/v1kind: Deploymentmetadata:  name: flink-clientspec:
replicas: 1  selector:matchLabels:  app: flink-client  template:
metadata:  labels:app: flink-clientspec:  containers:
- name: clientimage: flink:1.12.2imagePullPolicy:
Alwaysargs: ["sleep", "86400"]*

* NodePort
Currently, we have a limitation that only the Kubernetes master nodes could
be used to build the Flink exposed rest endpoint. So if your
APIServer node does not have the kube proxy, then the printed URL in the
Flink client logs could not be used. We already have a ticket[1] to
support one of the slave nodes for accessing the rest endpoint. But I have
not managed myself to get it done.

* LoadBalancer
Is the resolved rest endpoint "http://144.25.13.78:8081/; accessible on
your Flink client side? If it is yes, then I think the Flink client
should be able to contact to JobManager rest server to list/cancel the
jobs. I have verified in Alibaba container service, and it works well.


[1]. https://issues.apache.org/jira/browse/FLINK-16601


Best,
Yang

Fuyao Li  于2021年3月27日周六 上午5:59写道:

> Hi Community, Yang,
>
>
>
> I am new to Flink on native Kubernetes and I am trying to do a POC for
> native Kubernetes application mode on Oracle Cloud Infrastructure. I was
> following the documentation here step by step: [1]
>
>
>
> I am using Flink 1.12.1, Scala 2.11, java 11.
>
> I was able to create a native Kubernetes Deployment, but I am not able to
> use any further commands like list / cancel etc.. I always run into timeout
> error. I think the issue could be the JobManager Web Interface IP address
> printed after job deployment is not accessible. This issue is causing me
> not able to shut down the deployment with a savepoint. It could be
> Kubernetes configuration issue. I have exposed all related ports traffic
> and validated the security list, but still couldn’t make it work. Any help
> is appreciated.
>
>
>
>
>
> The relevant Flink source code is CliFrontend.java class [2]
>
> The ./bin/flink list and cancel command is trying to send traffic to the
> Flink dashboard UI IP address and it gets timeout. I tried to both
> LoadBalancer and NodePort option for
> -Dkubernetes.rest-service.exposed.type configuration. Both of them
> doesn’t work.
>
>
>
> # List running job on the cluster (I can’t execute this command
> successfully due to timeout, logs shared below)
>
> $ ./bin/flink list --target kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster
>
> # Cancel running job (I can’t execute this command succcessfully)
>
> $ ./bin/flink cancel --target kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster 
>
>
>
> I think those commands needs to communicate with the endpoint that shows
> after the the job submission command.
>
>
>
>1. Use case 1(deploy with NodePort)
>
>
>
> # fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
>
> $ ./bin/flink run-application \
>
> --target kubernetes-application \
>
> -Dkubernetes.cluster-id=my-first-application-cluster \
>
> -Dkubernetes.container.image=
> us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \
>
> -Dkubernetes.container.image.pull-policy=IfNotPresent \
>
> -Dkubernetes.container.image.pull-secrets=ocirsecret \
>
> -Dkubernetes.rest-service.exposed.type=NodePort \
>
> -Dkubernetes.service-account=flink-service-account \
>
> local:///opt/flink/usrlib/quickstart-0.1.jar
>
>
>
>
>
> When the expose type is NodePort, the printed messages says the the Flink
> JobManager Web Interface:is at http://192.29.104.156:30996
> 192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that
> exposes the service. However, Flink dashboard in this address is not
> resolvable.
>
> I can only get access to dashboard UI on each node IP address(There are
> three nodes in my K8S cluster)
>
> 100.104.154.73:30996
>
> 100.104.154.74:30996
>
> 100.104.154.75:30996
>
>   I got the following errors when trying to do list command for such a
> native Kubernetes deployment. See in [4]. *According to the documentation
> here [3], this shouldn’t happen since Kubernetes api server address should
> also have the Flink Web UI… Did I miss any configurations in Kubernetes to
> make webUI available in Kubernetes apiserver address?*
>
>
>
>
>
>1. Use case 2 (deploy with LoadBalancer)
>
> # fuyli @ fuyli-mac in 

????

2021-03-29 Thread Tent


退订

2021-03-29 Thread 徐永健
退订

Re: flink Container exited with a non-zero exit code 2. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err

2021-03-29 Thread Yang Wang
这样的报错多半是Flink的JM/TM进程就没有被Yarn正常拉起,你查看Yarn的NodeManager日志
搜索对应的container,应该会有一些线索的

Best,
Yang

flink2021  于2021年3月30日周二 上午9:40写道:

> 实时作业运行一段时间后报错:
> Container exited with a non-zero exit code 2. Error file: prelaunch.err.
> Last 4096 bytes of prelaunch.err
> 具体原因是什么呢?有哪位大佬帮忙看看呢,日志中只有这条错误提示,其它没有。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink-1.12.2 TM无法使用自定的serviceAccount访问configmap

2021-03-29 Thread Yang Wang
我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志

Best,
Yang

1120344670 <1120344...@qq.com> 于2021年3月29日周一 下午5:09写道:

> 您好:
>之前提交过一个关于这方面的issue,链接如下:
> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
>目前看还是没有fix对应的issue。
>
>报错如下:
>
>
> 目前看jira上的issue已经关闭了, 请确认是否修复。
>


flink-sql ??????????execution.target: yarn-per-job ????????????????????????????

2021-03-29 Thread ????
1??yarn-session??
2??yarn-perjob?? -- 
??
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with 
id container_e66_1616483562588_2358_01_02() timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1378)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_151]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]

3??flink-session nohup bin/yarn-session.sh -jm 1024 -tm 
1024 -s 2 -qu root.CBT -d  
4yarn-perjob??FLINK-CDC??sql??flink-per 
-job

Re: 【疑问】RocksDBStateBackend为什么使用单独封装的frocksdbjni,而不用RocksDB官方提供的RocksJava

2021-03-29 Thread Yun Tang
Hi Feifan,

主要原因是为了支持TTL state的compaction filter 
[1],因为属于定制化内容,RocksDB社区无法纳入主分支,我们后续也在考虑采用插件化形式,复用RocksDB社区的原生RocksJava。当然,现在还有个棘手的问题是RocksJava在5.18
 之后存在性能回退,导致目前无法直接升级。你可以关注 FLINK-14482 跟踪后续的发展


[1] 
https://github.com/ververica/frocksdb/commit/01dca02244522e405c9258000903fee81496f72c
[2] https://issues.apache.org/jira/browse/FLINK-14482

Best
Yun Tang

From: zoltar9264 
Sent: Wednesday, March 24, 2021 13:48
To: user-zh 
Subject: 【疑问】RocksDBStateBackend为什么使用单独封装的frocksdbjni,而不用RocksDB官方提供的RocksJava

大家好,
在RocksDBStateBackend的pom中看到是使用了 
frocksdbjni,看了下这个包是dataArtisans自己的。而RocksDBStateBackend是有提供Java 
sdk的,叫RocksJava。RocksDBStateBackend为什么不直接用 RocksJava呢?


| |
Feifan Wang
|
|
zoltar9...@163.com
|
签名由网易邮箱大师定制



(无主题)

2021-03-29 Thread 于林之下

TD

| |
于林之下
|
|
邮箱:sishengqikuo...@163.com
|

签名由 网易邮箱大师 定制

flink 从mysql读取数据异常

2021-03-29 Thread air23
你好 参考官网 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
这边读取mysql jdbc数据报错Exception in thread "main" 
org.apache.flink.table.api.TableException: Only insert statement is supported 
now.


String  a = "-- register a MySQL table 'users' in Flink SQL\n" +
"CREATE TABLE MyUserTable (\n" +
"  id BIGINT\n" +
") WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://***:3306/monitor',\n" +
"   'table-name' = 't1',\n" +
"   'username' = 'root',\n" +
"   'password' = '***'\n" +
") ";

String b ="-- scan data from the JDBC table\n" +
"SELECT id FROM MyUserTable\n";

tEnv.executeSql(a);



请问是不可以从mysql读取数据吗?




 

回复:flink 从mysql读取数据异常

2021-03-29 Thread guoyb
可以读取的,还有内置flink cdc
select得用query方法,看看是不是用错了execute。



---原始邮件---
发件人: "air23"https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
这边读取mysql jdbc数据报错Exception in thread "main" 
org.apache.flink.table.api.TableException: Only insert statement is supported 
now.


String a = "-- register a MySQL table 'users' in Flink SQL\n" +
"CREATE TABLE MyUserTable (\n" +
" id BIGINT\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://***:3306/monitor',\n" +
" 'table-name' = 't1',\n" +
" 'username' = 'root',\n" +
" 'password' = '***'\n" +
") ";

String b ="-- scan data from the JDBC table\n" +
"SELECT id FROM MyUserTable\n";

tEnv.executeSql(a);



请问是不可以从mysql读取数据吗?

flink 从mysql读取数据异常

2021-03-29 Thread air23
你好 参考官网 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
这边读取mysql jdbc数据报错Exception in thread "main" 
org.apache.flink.table.api.TableException: Only insert statement is supported 
now.


String  a = "-- register a MySQL table 'users' in Flink SQL\n" +
"CREATE TABLE MyUserTable (\n" +
"  id BIGINT\n" +
") WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://***:3306/monitor',\n" +
"   'table-name' = 't1',\n" +
"   'username' = 'root',\n" +
"   'password' = '***'\n" +
") ";

String b ="-- scan data from the JDBC table\n" +
"SELECT id FROM MyUserTable\n";

tEnv.executeSql(a);



请问是不可以从mysql读取数据吗?

回复:Flink Window算子在WebUI中Bytes/Records Sent为0

2021-03-29 Thread allanqinjy
你的图看不到,我猜到的是你window之后是一个算子链,最后是sink了,所以bytes 
sent是0,sink后应该就不属于flink的管辖范围了,所以sent是0。你可以设置disableOperatorChaining()一下,然后你在window完后,再map算子一下,就能看到window后面有没有sent了。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月30日 09:37,王 浩成 写道:

你好,我有下图这样的一个数据流图,以csv文件作为数据源生成数据流,数据流的时间跨度约15分钟,时间窗口设定为1分钟。

使用单节点提交Flink Job之后,在Web UI界面发现从Window算子开始,后面的Bytes 
Sent全部为0B了,可是实际上window算子及其后面的算子都进行了相应的操作,并且生成了对应的数据结果。

请问这里是为什么出现了这个问题,如何让它能像我想象的显示发送的数据量?谢谢!

flink Container exited with a non-zero exit code 2. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err

2021-03-29 Thread flink2021
实时作业运行一段时间后报错:
Container exited with a non-zero exit code 2. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err
具体原因是什么呢?有哪位大佬帮忙看看呢,日志中只有这条错误提示,其它没有。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink Window算子在WebUI中Bytes/Records Sent为0

2021-03-29 Thread 王 浩成
你好,我有下图这样的一个数据流图,以csv文件作为数据源生成数据流,数据流的时间跨度约15分钟,时间窗口设定为1分钟。
使用单节点提交Flink Job之后,在Web UI界面发现从Window算子开始,后面的Bytes 
Sent全部为0B了,可是实际上window算子及其后面的算子都进行了相应的操作,并且生成了对应的数据结果。
请问这里是为什么出现了这个问题,如何让它能像我想象的显示发送的数据量?谢谢!
[表格  描述已自动生成]


Re: SP with Drain and Cancel hangs after take a SP

2021-03-29 Thread Vishal Santoshi
More interested whether a  StreamingFileSink without a drain
negatively affects it's exactly-once semantics , given that I state on SP
would have the offsets from kafka + the valid lengths of the part files at
SP.  To be honest not sure whether the flushed buffers on sink are included
in the length, or this is not an issue with StreamingFileSink. If it is the
former then I would assume we should be documented and then have to look
why this hang happens.

On Mon, Mar 29, 2021 at 4:08 PM Vishal Santoshi 
wrote:

> Is this a known issue. We do a stop + savepoint with drain. I see no back
> pressure on our operators. It essentially takes a SP and then the SInk (
> StreamingFileSink to S3 ) just stays in the RUNNING state.
>
> Without drain i stop + savepoint works fine.  I would imagine drain is
> important ( flush the buffers etc  ) but why this hang ( I did it 3 times
> and waited 15 minutes each time ).
>
> Regards.
>


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread David Anderson
Yes, since the two streams have the same type, you can union the two
streams, key the resulting stream, and then apply something like a
RichFlatMapFunction. Or you can connect the two streams (again, they'll
need to be keyed so you can use state), and apply a RichCoFlatMapFunction.
You can use whichever of these approaches is simpler for your use case.

On Mon, Mar 29, 2021 at 7:56 AM vishalovercome  wrote:

> I've gone through the example as well as the documentation and I still
> couldn't understand whether my use case requires joining. 1. What would
> happen if I didn't join? 2. As the 2 incoming data streams have the same
> type, if joining is absolutely necessary then just a union
> (oneStream.union(anotherStream)) followed by a keyBy should be good enough
> right? I am asking this because I would prefer to use the simple
> RichMapFunction or RichFlatMapFunction as opposed to the
> RichCoFlatMapFunction. Thanks a lot!
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Install/Run Streaming Anomaly Detection R package in Flink

2021-03-29 Thread Robert Cullen
Wei,

Thank you for pointing to those examples. Here is a code sample of how it's
configured for me:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.add_python_archive("/Users/admin/pyflink/venv.zip")
env.set_python_executable("venv.zip/venv/bin/python")
...

But when I run the virtual environment on my cluster I’m getting this error:

2021-03-29 15:42:35
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to execute the command:
venv.zip/venv/bin/python -c import pyflink;import
os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),
'bin'))
output: venv.zip/venv/bin/python: 1: venv.zip/venv/bin/python: Syntax
error: "(" unexpected

at 
org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:198)
at 
org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
at 
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
at 
org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
at 
org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
at 

Re: Question about checkpoints and savepoints

2021-03-29 Thread Robert Metzger
Mh, did you also check the TaskManger logs?
I'm not aware of any known or issues in the past in that direction, the
codepaths for checkpoint / savepoint are fairly similar when it comes to
storing the data.

You could also try to run Flink on DEBUG log level, maybe that reveals
something?!


On Fri, Mar 26, 2021 at 1:37 PM Robert Cullen  wrote:

> Here’s a snippet from the logs, there are no errors in the logs
>
> 2021-03-23 13:11:52,247 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
> 
> 2021-03-23 13:11:52,249 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  
> Preconfiguration:
> 2021-03-23 13:11:52,249 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
> JM_RESOURCE_PARAMS extraction logs:
> jvm_params: -Xmx2097152000 -Xms2097152000 -XX:MaxMetaspaceSize=268435456
> logs: INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> flink-jobmanager
> INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
> INFO  [] - Loading configuration property: blob.server.port, 6124
> INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
> INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122
> INFO  [] - Loading configuration property: queryable-state.proxy.ports, 6125
> INFO  [] - Loading configuration property: jobmanager.memory.heap.size, 2000m
> INFO  [] - Loading configuration property: taskmanager.memory.task.heap.size, 
> 2000m
> INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 
> 3000m
> INFO  [] - Loading configuration property: parallelism.default, 2
> INFO  [] - Loading configuration property: state.backend, filesystem
> INFO  [] - Loading configuration property: state.checkpoints.dir, 
> s3://flink/checkpoints
> INFO  [] - Loading configuration property: state.savepoints.dir, 
> s3://flink/savepoints
> INFO  [] - Loading configuration property: s3.endpoint, 
> http://cmdaa-minio:9000
> INFO  [] - Loading configuration property: s3.path-style-access, true
> INFO  [] - Loading configuration property: s3.path.style.access, true
> INFO  [] - Loading configuration property: s3.access-key, cmdaa123
> INFO  [] - Loading configuration property: s3.secret-key, **
> INFO  [] - Final Master Memory configuration:
> INFO  [] -   Total Process Memory: 2.587gb (2777561320 bytes)
> INFO  [] - Total Flink Memory: 2.078gb (2231369728 bytes)
> INFO  [] -   JVM Heap: 1.953gb (2097152000 bytes)
> INFO  [] -   Off-heap: 128.000mb (134217728 bytes)
> INFO  [] - JVM Metaspace:  256.000mb (268435456 bytes)
> INFO  [] - JVM Overhead:   264.889mb (277756136 bytes)
>
>
> On Fri, Mar 26, 2021 at 4:03 AM Robert Metzger 
> wrote:
>
>> Hi,
>>
>> has the "state.savepoints.dir" configuration key the same value as
>> "state.checkpoints.dir"?
>> If not, can you post your configuration keys, and the invocation how you
>> trigger a savepoint?
>> Have you checked the logs? Maybe there's an error message?
>>
>> On Thu, Mar 25, 2021 at 7:17 PM Robert Cullen 
>> wrote:
>>
>>> When I run a job on my Kubernetes session cluster only the checkpoint
>>> directories are created but not the savepoints. (Filesystem configured to
>>> S3 Minio)  Any ideas?
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


DataStream from kafka topic

2021-03-29 Thread Maminspapin
Hi everyone.

How can I get entry in GenericRecord format from kafka topic using
SchemaRegistry? 
I read this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
But can't to build it in my code... 

Is there some tutorials or examples to deserialise data using
schema.rgistry.url?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Glob support on file access

2021-03-29 Thread Arvid Heise
Hi Etienne,

In general, any small PR on this subject is very welcome. I don't think
that the community as a whole will invest much into FileInputFormat as the
whole DataSet API is phasing out.

Afaik SQL and Table API are only using InputFormat for the legacy
compatibility layer (e.g. when it comes to translating into DataSet). All
the new batchy stuff is based on BulkFormat and unified source/sink
interface. I'm CC'ing Timo who can correct me if I'm wrong.

So if you just want to add glob support on FileInputFormat /only/ for SQL
and Table API, I don't think it's worth the effort. It would be more
interesting to see if the new FileSource does support it properly and
rather add it there.

On Mon, Mar 29, 2021 at 4:57 PM Etienne Chauchot 
wrote:

> But still this workaround would only work when you have access to the
> underlying /FileInputFormat/. For//SQL and Table APIs, you don't so
> you'll be unable to apply this workaround. So what we could do is make a
> PR to support glob at the FileInputFormat level to profit for all APIs.
>
> I'm gonna do it if everyone agrees.
>
> Best
>
> Etienne Chauchot
>
> On 25/03/2021 13:12, Etienne Chauchot wrote:
> >
> > Hi all,
> >
> > In case it is useful to some of you:
> >
> > I have a big batch that needs to use globs (*.parquet for example) to
> > read input files. It seems that globs do not work out of the box (see
> > https://issues.apache.org/jira/browse/FLINK-6417)
> >
> > But there is a workaround:
> >
> >
> > final  FileInputFormat inputFormat =new  FileInputFormat(new
> Path(extractDir(filePath)));/* or any subclass of FileInputFormat*/
> /*extact parent dir*/
> > inputFormat.setFilesFilter(new
> GlobFilePathFilter(Collections.singletonList(filePath),
> Collections.emptyList()));/*filePath contains glob, the whole path needs to
> be provided to
> > GlobFilePathFilter*/
> > inputFormat.setNestedFileEnumeration(true);
> >
> > Hope, it helps some people
> >
> > Etienne Chauchot
> >
> >
>


Re: Flink failing to restore from checkpoint

2021-03-29 Thread Piotr Nowojski
Hi,

What Flink version are you using and what is the scenario that's happening?
It can be a number of things, most likely an issue that your filed mounted
under:
>
/mnt/checkpoints/5dde50b6e70608c63708cbf979bce4aa/shared/47993871-c7eb-4fec-ae23-207d307c384a
disappeared or stopped being accessible. For example something like this
[1] (this is not a Flink bug).

Have you tried looking for this path manually? Does it exist? Have you
looked in the JobManager/TaskManager logs for all entries that are
referring to this path?

To help you, we would need more information. If it has happened after
taking a savepoint this could be a recently fixed issue [2]. If you are
using SQL (Blink planner) it could be for example this [3].

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-16470
[2] https://issues.apache.org/jira/browse/FLINK-21351
[3] https://issues.apache.org/jira/browse/FLINK-20665


pon., 29 mar 2021 o 14:58 Claude M  napisał(a):

> Hello,
>
> I executed a flink job in a Kubernetes Application cluster w/ four
> taskmanagers.  The job was running fine for several hours but then crashed
> w/ the following exception which seems to be when restoring from a
> checkpoint.The UI shows the following for the checkpoint counts:
>
> Triggered: 68In Progress: 0Completed: 67Failed: 1Restored: 292
>
>
> Any ideas about this failure?
>
>
> Thanks
>
>
>


Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Piotr Nowojski
Hi,

I hope someone else might have a better answer, but one thing that would
most likely work is to convert this field and define even time during
DataStream to table conversion [1]. You could always pre process this field
in the DataStream API.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion

pon., 29 mar 2021 o 18:07 Sumeet Malhotra 
napisał(a):

> Hi,
>
> Might be a simple, stupid question, but I'm not able to find how to
> convert/interpret a UTC datetime string like *2021-03-23T07:37:00.613910Z*
> as event-time using a DDL/Table API. I'm ingesting data from Kafka and can
> read this field as a string, but would like to mark it as event-time by
> defining a watermark.
>
> I'm able to achieve this using the DataStream API, by defining my own
> TimestampAssigner that converts the datetime string to milliseconds since
> epoch. How can I do this using a SQL DDL or Table API?
>
> I tried to directly interpret the string as TIMESTAMP(3) but it fails with
> the following exception:
>
> java.time.format.DateTimeParseException: Text
> '2021-03-23T07:37:00.613910Z' could not be parsed...
>
> Any pointers?
>
> Thanks!
> Sumeet
>
>


Re: Restore from Checkpoint from local Standalone Job

2021-03-29 Thread Piotr Nowojski
Hi Sandeep,

I think it should work fine with `StandaloneCompletedCheckpointStore`.

Have you checked if your directory /Users/test/savepoint  is being
populated in the first place? And if so, if the restarted job is not
throwing some exceptions like it can not access those files?

Also note, that cancel with savepoint is deprecated and you should be using
stop with savepoint [1]

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#terminating-a-job

pt., 26 mar 2021 o 18:55 Sandeep khanzode 
napisał(a):

> Hello
>
>
> I was reading this:
> https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question
>
>
> I am trying to run a standalone job on my local with a single job manager
> and task manager.
>
>
>
> I have enabled checkpointing as below:
>
> env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", 
> true));
>
> env.enableCheckpointing(30 * 1000);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
> After I stop my job (I also tried to cancel the job using bin/flink cancel
> -s /Users/test/savepoint ), I tried to start the same job using…
>
> ./standalone-job.sh start-foreground test.jar --job-id 
> --job-classname com.test.MyClass --fromSavepoint /Users/test/savepoint
>
>
> But it never restores the state, and always starts afresh.
>
>
> In Flink, I see this:
>
> StandaloneCompletedCheckpointStore
>
> * {@link CompletedCheckpointStore} for JobManagers running in {@link 
> HighAvailabilityMode#NONE}.
>
> public void recover() throws Exception {
>
> // Nothing to do
> }
>
>
> Does this have something to do with not being able to restore state?
>
> Does this need Zookeeper or K8S HA for functioning?
>
>
> Thanks,
> Sandeep
>
>


PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Sumeet Malhotra
Hi,

Might be a simple, stupid question, but I'm not able to find how to
convert/interpret a UTC datetime string like *2021-03-23T07:37:00.613910Z*
as event-time using a DDL/Table API. I'm ingesting data from Kafka and can
read this field as a string, but would like to mark it as event-time by
defining a watermark.

I'm able to achieve this using the DataStream API, by defining my own
TimestampAssigner that converts the datetime string to milliseconds since
epoch. How can I do this using a SQL DDL or Table API?

I tried to directly interpret the string as TIMESTAMP(3) but it fails with
the following exception:

java.time.format.DateTimeParseException: Text '2021-03-23T07:37:00.613910Z'
could not be parsed...

Any pointers?

Thanks!
Sumeet


Re: Glob support on file access

2021-03-29 Thread Etienne Chauchot
But still this workaround would only work when you have access to the 
underlying /FileInputFormat/. For//SQL and Table APIs, you don't so 
you'll be unable to apply this workaround. So what we could do is make a 
PR to support glob at the FileInputFormat level to profit for all APIs.


I'm gonna do it if everyone agrees.

Best

Etienne Chauchot

On 25/03/2021 13:12, Etienne Chauchot wrote:


Hi all,

In case it is useful to some of you:

I have a big batch that needs to use globs (*.parquet for example) to 
read input files. It seems that globs do not work out of the box (see 
https://issues.apache.org/jira/browse/FLINK-6417)


But there is a workaround:


final  FileInputFormat inputFormat =new  FileInputFormat(new  
Path(extractDir(filePath)));/* or any subclass of FileInputFormat*/  /*extact 
parent dir*/
inputFormat.setFilesFilter(new  GlobFilePathFilter(Collections.singletonList(filePath), Collections.emptyList()));/*filePath contains glob, the whole path needs to be provided to 
GlobFilePathFilter*/

inputFormat.setNestedFileEnumeration(true);

Hope, it helps some people

Etienne Chauchot




Flink State Query Server threads stuck in infinite loop with high GC activity on CopyOnWriteStateMap get

2021-03-29 Thread Aashutosh Swarnakar
Hi Folks,



I've recently started using Flink for a pilot project where I need to
aggregate event counts on per minute window basis. The state has been made
queryable so that external services can query the state via Flink State
Query API. I am using memory state backend with a keyed process function
and map state.



I've a simple job running on a 6 node flink standalone cluster. 1 job
manager and 5 task managers. External services can query the 5 task manager
nodes for flink state.



The job operates fine whenever external clients are not querying flink
state but once the external clients start quering the flink state via flink
queryable client, I observe that flink query server threads and the
aggregate task thread gets stuck into an infinite loop in
CopyOnWriteStateMap.get() method. Also the GC activity peaks to 100% along
with 100% CPU usage. The task manager nodes are unable to recover from this
situation and I have to restart the cluster. Let me know if anybody has
faced this issue before.



Any information with regards to below queries will be very helpful.



1. Is this a thread synchronisation issue ?

2. Is CopyOnWriteStateMap class thread safe ?

3. Is there a possibility for any race conditions when incremental
rehashing is done for CopyOnWriteStateMap ?

4. Can this be an issue with state usage in my job implementation (I am
doing a get and put on map state for processing each element in the stream)
?





I have added the thread dump below along with the code snippet where the
threads go into infinite loop.



Task thread:



"aggregates-stream -> Map -> Sink: Cassandra Sink (2/10)#0"
- Thread t@76

   java.lang.Thread.State: RUNNABLE

at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)

at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)

at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:136)

at
org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:86)

at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)

at
com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:44)

at
com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:20)

at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)

at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$202/2001022910.runDefaultAction(Unknown
Source)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

at java.lang.Thread.run(Thread.java:748)



   Locked ownable synchronizers:

- None



Flink State Query Server Threads:



"Flink Queryable State Server Thread 3" - Thread t@136

   java.lang.Thread.State: RUNNABLE

at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.incrementalRehash(CopyOnWriteStateMap.java:680)

at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateMap.java:645)

at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:270)

at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)

at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)

at

Flink failing to restore from checkpoint

2021-03-29 Thread Claude M
Hello,

I executed a flink job in a Kubernetes Application cluster w/ four
taskmanagers.  The job was running fine for several hours but then crashed
w/ the following exception which seems to be when restoring from a
checkpoint.The UI shows the following for the checkpoint counts:

Triggered: 68In Progress: 0Completed: 67Failed: 1Restored: 292


Any ideas about this failure?


Thanks
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_233fe9791f870db6076db489fea576c1_(31/32) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 11 more
Caused by: java.io.FileNotFoundException: 
/mnt/checkpoints/5dde50b6e70608c63708cbf979bce4aa/shared/47993871-c7eb-4fec-ae23-207d307c384a
 (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
at 
java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
at 
java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:82)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:63)
at 

Re: Kubernetes Application Cluster Not Working

2021-03-29 Thread Claude M
This issue was resolved by adding the following environment variable to
both the jobmanager and taskmanager:
- name: JOB_MANAGER_RPC_ADDRESS
  value: jobmanager



On Wed, Mar 24, 2021 at 1:33 AM Yang Wang  wrote:

> Are you sure that the JobManager akka address is binded to
> "flink-jobmanager"?
> You could set "jobmanager.rpc.address" to flink-jobmanager in the
> ConfigMap.
>
> Best,
> Yang
>
> Guowei Ma  于2021年3月24日周三 上午10:22写道:
>
>> Hi, M
>> Could you give the full stack? This might not be the root cause.
>> Best,
>> Guowei
>>
>>
>> On Wed, Mar 24, 2021 at 2:46 AM Claude M  wrote:
>>
>>> Hello,
>>>
>>> I'm trying to setup Flink in Kubernetes using the Application Mode as
>>> described here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>>>
>>> The doc mentions that there needs to be a aervice exposing the
>>> JobManager’s REST and UI ports.  It then points to a link w/ the resource
>>> definitions:
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions
>>> and I defined the following service along w/ the jobmanager, taskmanager,
>>> and flink-conf.
>>>
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>>   name: flink-jobmanager
>>> spec:
>>>   type: ClusterIP
>>>   ports:
>>>   - name: rpc
>>> port: 6123
>>>   - name: blob-server
>>> port: 6124
>>>   - name: webui
>>> port: 8081
>>>   selector:
>>> app: flink
>>> component: jobmanager
>>>
>>>
>>> I am able to access the jobmanager UI but the taskmanagers are failing
>>> w/ the following error:
>>> Could not resolve ResourceManager address
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*
>>>
>>> Any ideas about this?
>>>
>>>
>>> Thanks
>>>
>>


Flink任务中写Kafka超时导致任务超时

2021-03-29 Thread Frost Wong
各位好,

我的Flink任务中有写Kafka的操作,最近发现当写的这个Kafka整体QPS很高时,响应时间也会很长。我的核心逻辑是继承了RichFlatMapFunction,在flatMap方法中调用KafkaProducer.produce(),在方法最后调用KafkaProducer.flush()。

我知道调用KafkaProducer.flush()可能会很耗时,但我的业务不允许数据丢失,我考虑把KafkaProducer.flush()放在RichFlatMapFunction的close()方法中。不知道这个close()方法是什么时候调用的。

我的问题是:


  1.  
主动cancel任务的时候会调用close()方法吗?如果这时KafkaProducer.flush()卡住,任务就cancel不掉了,最终数据会丢失
  2.  由于其他异常(如网络超时)等导致任务重启时会调用close()方法吗?

谢谢大家!


union operation cannot support data type : map on flink 1.12

2021-03-29 Thread 张颖
this is the error message:


Flink SQL> (SELECT number_feature FROM map_string_string1) UNION (SELECT 
number_feature FROM map_string_string2);


2021-03-29 16:57:15,479 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,483 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,580 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1
2021-03-29 16:57:15,651 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,698 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1




[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: MAP






I meet a problem just like this: My flink version is 1.12.0,and I change the 
version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)


but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'

Re:image

2021-03-29 Thread 张颖
this is the contents of this image:


Flink SQL> (SELECT number_feature FROM map_string_string1) UNION (SELECT 
number_feature FROM map_string_string2);


2021-03-29 16:57:15,479 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,483 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,580 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1
2021-03-29 16:57:15,651 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,698 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1




[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: MAP






























At 2021-03-29 17:42:05, "张颖"  wrote:

hello, I meet a problem just like this: My flink version is 1.12.0,and I change 
the version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)






but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'






 

Re:image

2021-03-29 Thread 张颖
this is the image:


















At 2021-03-29 17:42:05, "张颖"  wrote:

hello, I meet a problem just like this: My flink version is 1.12.0,and I change 
the version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)






but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'






 

Re:image

2021-03-29 Thread 张颖
















At 2021-03-29 17:42:05, "张颖"  wrote:

hello, I meet a problem just like this: My flink version is 1.12.0,and I change 
the version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)






but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'






 

Re:退订

2021-03-29 Thread 张颖
什么意思?直接退订吗?

















在 2021-03-29 17:44:22,"纪军伟"  写道:
>退订


Re:退订

2021-03-29 Thread 张颖
什么意思?直接退订吗?

















在 2021-03-29 17:44:22,"纪军伟"  写道:
>退订


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread vishalovercome
I've gone through the example as well as the documentation and I still
couldn't understand whether my use case requires joining. 1. What would
happen if I didn't join?2. As the 2 incoming data streams have the same
type, if joining is absolutely necessary then just a union
(oneStream.union(anotherStream)) followed by a keyBy should be good enough
right? I am asking this because I would prefer to use the simple
RichMapFunction or RichFlatMapFunction as opposed to the
RichCoFlatMapFunction.Thanks a lot!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

??????????

2021-03-29 Thread ????




----
??: "??"

退订

2021-03-29 Thread 纪军伟
退订

回复:flink 1.11 cp size越来越大

2021-03-29 Thread Shuai Xia
Hi,有看过HDFS上真实的CK文件大小么,可以参考下这个邮件
http://apache-flink.147419.n8.nabble.com/Flink-sql-checkpoint-td10176.html#a10200


--
发件人:liangji 
发送时间:2021年3月26日(星期五) 16:39
收件人:user-zh 
主 题:flink 1.11 cp size越来越大

读取kafka数据写入mysql
1、部分代码如下

 
2、从cp图中看到cp size越来越大,7天的时间从400m增加到了快2g

 
下面是一次cp的详细数据,两次window的过程中cp size很大

 
3、近7天的kafka消息量如下

 

请问大佬们,cp size为啥会越来越大?或者有什么排查思路吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-1.12.2 TM??????????????serviceAccount????configmap

2021-03-29 Thread 1120344670
: 
??issue??:http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
 ??fix??issue??


 :



??jiraissue 

Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2021-03-29 Thread suisuimu
Hi,我使用的1.12也出现了这个问题,请问你怎么解决的呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-29 Thread Robert Metzger
+1



On Mon, Mar 29, 2021 at 5:44 AM Yangze Guo  wrote:

> +1
>
> Best,
> Yangze Guo
>
> On Mon, Mar 29, 2021 at 11:31 AM Xintong Song 
> wrote:
> >
> > +1
> > It's already a matter of fact for a while that we no longer port new
> features to the Mesos deployment.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Mar 26, 2021 at 10:37 PM Till Rohrmann 
> wrote:
> >>
> >> +1 for officially deprecating this component for the 1.13 release.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Mar 25, 2021 at 1:49 PM Konstantin Knauf 
> wrote:
> >>>
> >>> Hi Matthias,
> >>>
> >>> Thank you for following up on this. +1 to officially deprecate Mesos
> in the code and documentation, too. It will be confusing for users if this
> diverges from the roadmap.
> >>>
> >>> Cheers,
> >>>
> >>> Konstantin
> >>>
> >>> On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl 
> wrote:
> 
>  Hi everyone,
>  considering the upcoming release of Flink 1.13, I wanted to revive the
>  discussion about the Mesos support ones more. Mesos is also already
> listed
>  as deprecated in Flink's overall roadmap [1]. Maybe, it's time to
> align the
>  documentation accordingly to make it more explicit?
> 
>  What do you think?
> 
>  Best,
>  Matthias
> 
>  [1] https://flink.apache.org/roadmap.html#feature-radar
> 
>  On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann 
> wrote:
> 
>  > Hi Oleksandr,
>  >
>  > yes you are right. The biggest problem is at the moment the lack of
> test
>  > coverage and thereby confidence to make changes. We have some e2e
> tests
>  > which you can find here [1]. These tests are, however, quite coarse
> grained
>  > and are missing a lot of cases. One idea would be to add a Mesos
> e2e test
>  > based on Flink's end-to-end test framework [2]. I think what needs
> to be
>  > done there is to add a Mesos resource and a way to submit jobs to a
> Mesos
>  > cluster to write e2e tests.
>  >
>  > [1] https://github.com/apache/flink/tree/master/flink-jepsen
>  > [2]
>  >
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
>  >
>  > Cheers,
>  > Till
>  >
>  > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
>  > o.nitavs...@criteo.com> wrote:
>  >
>  >> Hello Xintong,
>  >>
>  >> Thanks for the insights and support.
>  >>
>  >> Browsing the Mesos backlog and didn't identify anything critical,
> which
>  >> is left there.
>  >>
>  >> I see that there are were quite a lot of contributions to the
> Flink Mesos
>  >> in the recent version:
>  >> https://github.com/apache/flink/commits/master/flink-mesos.
>  >> We plan to validate the current Flink master (or release 1.12
> branch) our
>  >> Mesos setup. In case of any issues, we will try to propose changes.
>  >> My feeling is that our test results shouldn't affect the Flink 1.12
>  >> release cycle. And if any potential commits will land into the
> 1.12.1 it
>  >> should be totally fine.
>  >>
>  >> In the future, we would be glad to help you guys with any
>  >> maintenance-related questions. One of the highest priorities
> around this
>  >> component seems to be the development of the full e2e test.
>  >>
>  >> Kind Regards
>  >> Oleksandr Nitavskyi
>  >> 
>  >> From: Xintong Song 
>  >> Sent: Tuesday, October 27, 2020 7:14 AM
>  >> To: dev ; user 
>  >> Cc: Piyush Narang 
>  >> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>  >>
>  >> Hi Piyush,
>  >>
>  >> Thanks a lot for sharing the information. It would be a great
> relief that
>  >> you are good with Flink on Mesos as is.
>  >>
>  >> As for the jira issues, I believe the most essential ones should
> have
>  >> already been resolved. You may find some remaining open issues
> here [1],
>  >> but not all of them are necessary if we decide to keep Flink on
> Mesos as is.
>  >>
>  >> At the moment and in the short future, I think helps are mostly
> needed on
>  >> testing the upcoming release 1.12 with Mesos use cases. The
> community is
>  >> currently actively preparing the new release, and hopefully we
> could come
>  >> up with a release candidate early next month. It would be greatly
>  >> appreciated if you fork as experienced Flink on Mesos users can
> help with
>  >> verifying the release candidates.
>  >>
>  >>
>  >> Thank you~
>  >>
>  >> Xintong Song
>  >>
>  >> [1]
>  >>
> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
>  >> <
>  >>
> 

????

2021-03-29 Thread ????????


How to specific key serializer

2021-03-29 Thread 陳昌倬
Hi,

Currently we use sbt-avrohugger [0] to generate key class for keyed
state.  The key class generated by sbt-avrohugger is both case class,
and AVRO specific record. However, in the following scenarons, Flink
uses different serializers:


* In streaming application, Flink uses CaseClassSerializer for key
  class.
* In state processor API application, Flink uses AvroSerializer for key
  class.


Since they use different serializers for key, they are not compatible.
Is there any way to specific key serializer so that both applications
use the same serializer?



[0] https://github.com/julianpeeters/sbt-avrohugger

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


转发:在FlinkKafkaProducer获取sink表的建表key

2021-03-29 Thread Jimmy Zhang





|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: Jimmy Zhang
发送日期: 2021年03月29日 14:05
收件人: Qishang
抄送人:
主题: 回复:在FlinkKafkaProducer获取sink表的建表key
Hi,Qiishang,非常感谢你的回复,我看了你说的代码,应该是可以解决我的需求,不过我还没有细看,因为本身这个需求可能涉及Dynamic这块不多。
另外,我已经通过这种方法成功解决该问题。
1.我发现FlinkKafkaProducer是在KafkaTableSink.createKafkaProducer中进行构造的
2.KafkaTableSink继承于KafkaTableSinkBase,而在后者中,有TableSchema类作为成员变量,而TableSchema有getFieldNames方法获取到sink表字段名字,KafkaTableSinkBase封装了这个方法,名字一样,返回值是一个String[],这正是我需要的。
3.我在KafkaTableSink.createKafkaProducer中利用super.getFieldNames获取到String[],并新创建一个FlinkKafkaProducer的构造函数,将参数传入,达到我的目的。




|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

在2021年03月29日 11:26,Qishang 写道:
Hi Jimmy.
FlinkKafkaProducer 里面是没有的,可以试着从  KafkaDynamicSink 里面传到 FlinkKafkaProducer
中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType
这个里面可以拿到

Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道:

> Hi!大家好。
> 目前有一个需求,需要获取Kafka
> sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
> 例如:CREATE TABLE kafkaTable (
>
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset'
> )
> 想获取到   user_id, item_id ,category_id ,behavior这四个字段。
>
>
> | |
> Jimmy Zhang
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制