Re: FlinkKafkaProducer - Avro - Schema Registry

2022-04-07 Thread Dan Serb
Hello Qingsheng,

Removing KafkaAvroSerializer from the producer properties worked, indeed.
I validated this by using a FlinkKafkaConsumer, using 
ConfluentRegistryAvroDeserializationSchema, so it's working properly.

The problem I'm still having, is that I will have to use schema registry where 
I will register multiple types of schemas, for messages that are going to come 
on the same kafka topic.
That means, that I will need the implementation that KafkaAvroSerializer.class 
is providing, and that is - going to schema registry to get the schema back by 
subject.

By only using ConfluentRegistryAvroDeserializationSceham.forSpecific() in my 
FlinkKafkaProducer, I don't see how I can have access to that functionality, as 
I debugged internally, and it seems like it's not going through the path I 
would like it to go.

So, in conclusion, I think I somehow need to have the producer properties 
together with the KafkaAvroSerializer still, for me to force the serialization 
to go through Schema Registry.

Regards,
Dan Serb

On 08.04.2022, 05:23, "Qingsheng Ren"  wrote:

Hi Dan,

In FlinkKafkaProducer, records are serialized by the SerializationSchema 
specified in the constructor, which is the “schema” 
(ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in 
your case, instead of the serializer specified in producer properties. The 
default serializer used by FlinkKafkaProducer is ByteArraySerializer, so the 
flow of serialization would be:

[AvroObject] -> SerializationSchema -> [Bytes] -> ByteArraySerializer -> 
[Bytes]

So I think removing KafkaAvroSerializer from producer config and use 
AvroSerializationSchema is the right way. As you mentioned that messages could 
not be consumed back successfully, could you provide more information about how 
you consume message from Kafka (like using KafkaSource by Flink or just a 
KafkaConsumer, maybe also the configuration you are using)?

Best regards,

Qingsheng


> On Apr 5, 2022, at 16:54, Dan Serb  wrote:
> 
> Hi guys,
>  
> I’m working on a solution where I ingest Kafka Records and I need to sink 
them to another topic using Avro and Schema Registry.
> The problem I’m facing, is that I can’t find a suitable configuration 
that actually works for me.
>  
> I’m going to explain.
>  
>   • I have a KafkaSource that consumes basically the initial stream of 
data.
>   • I have an Operator that maps the kafka records to Avro Objects (Java 
POJOs generated using mvn avro plugin, based on .avsc files)
>   • I register the schemas in Schema Registry using the mvn 
schema-registry:register plugin/goal (registering the schema type as AVRO.
>   • I have a FlinkKafkaProducer where I provide a 
serialization schema of type ConfluentRegistrySerializationSchema.
>  
> My Kafka Properties for the Producer:
>  
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> kafkaProps.put(
> KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
"http://schemaregistry:38081;);
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaAvroSerializer.class);
> kafkaProps.put("auto.register.schemas", false);
> kafkaProps.put("use.latest.version", true);
>  
> As I learned from other tutorials/articles, I need to basically use 
KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
> This will bring me eventually in the place from KafkaAvroSerializer, 
where based on how the record actually looks, it will get me the schema, it 
will go to the schema registry and bring the schema for the needed record, and 
serialize it before it gets sent.
> The problem I’m having, is that, in the FlinkKafkaProducer class, in 
invoke() method, the keyedSchema is null in my case, but kafkaSchema is not 
null, and it basically does a ‘pre-serialization’ that is transforming my 
Record into a byte[]. This has an effect when it ends up in the 
KafkaAvroSerializer, as the Record is already a byte[] and it basically returns 
back a schema of type “bytes” instead of returning the schema I have for that 
SpecificRecord. And when it brings the propper schema from the schema registry, 
it basically fails for not being compatible. Schema {} is not compatible with 
schema of type “bytes”.
>  
> For more context, this is how my Processor looks at this moment.
>  
> DataStream kafkaRecords =
> env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka");
> 
> SingleOutputStreamOperator producedRecords =
> kafkaRecords
> .map(
> value -> {
>   String kafkaKey = value.get(KEY).asText();
>   String kafkaRecordJson = 
MAPPER.writeValueAsString(value.get(VALUE));
>   return Converter.convert(kafkaKey, kafkaRecordJson);
> })
> 

退订

2022-04-07 Thread co_zjw
退订

Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 Thread yidan zhao
貌似官网对flink k8s情况有2个入口,分别为:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#session-mode
和
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/。

分别对应 Resource Providers/Standalone/Kubernetes 和 Kubernetes Resource
Providers/Native
Kubernetes。有人知道啥区别吗。从文档来看,貌似前者是给了具体的service、deployment等yml描述,然后自己创建集群。后者是脚本一键创建。但如果仅仅是这个区别,为啥有“standalone/kubernetes”和“native
kubernetes”这种区分呢?

>
> 集群是3台物理机搭建,非minikube。
> 不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。
> 但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。
>
> yidan zhao  于2022年4月8日周五 10:38写道:
> >
> > 如下是 describe svc my-first-flink-cluster-rest 的结果:
> > Name: my-first-flink-cluster-rest
> > Namespace:default
> > Labels:   app=my-first-flink-cluster
> >   type=flink-native-kubernetes
> > Annotations:  
> > Selector:
> > app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
> > Type: LoadBalancer
> > IP Family Policy: SingleStack
> > IP Families:  IPv4
> > IP:   192.168.127.57
> > IPs:  192.168.127.57
> > Port: rest  8081/TCP
> > TargetPort:   8081/TCP
> > NodePort: rest  31419/TCP
> > Endpoints:192.168.130.11:8081
> > Session Affinity: None
> > External Traffic Policy:  Cluster
> > Events:   
> >
> > 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
> > -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。
> >
> > yu'an huang  于2022年4月8日周五 02:11写道:
> > >
> > > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
> > >
> > > 方便的话可以分享你运行这个命令的结果吗?
> > > 》kubectl describe svc  my-first-flink-cluster-rest
> > >
> > >
> > >
> > > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  
> > > > wrote:
> > > >
> > > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: yidan zhao 
> > > > Sent: Thursday, April 7, 2022 11:41
> > > > To: user-zh 
> > > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> > > >
> > > > 参考 
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > > > flink session cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> > > >
> > > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > > > ./bin/flink run \
> > > >--target kubernetes-session \
> > > >-Dkubernetes.cluster-id=my-first-flink-cluster \
> > > >./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > > > 的clusterIp:port为192.168.127.57:8081。
> > > > 2 查看任务
> > > > flink list  -m 192.168.127.57:8081
> > > > 3 提交任务
> > > > flink run  -m 192.168.127.57:8081
> > > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
> > >


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 Thread yidan zhao
集群是3台物理机搭建,非minikube。
不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。
但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。

yidan zhao  于2022年4月8日周五 10:38写道:
>
> 如下是 describe svc my-first-flink-cluster-rest 的结果:
> Name: my-first-flink-cluster-rest
> Namespace:default
> Labels:   app=my-first-flink-cluster
>   type=flink-native-kubernetes
> Annotations:  
> Selector:
> app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
> Type: LoadBalancer
> IP Family Policy: SingleStack
> IP Families:  IPv4
> IP:   192.168.127.57
> IPs:  192.168.127.57
> Port: rest  8081/TCP
> TargetPort:   8081/TCP
> NodePort: rest  31419/TCP
> Endpoints:192.168.130.11:8081
> Session Affinity: None
> External Traffic Policy:  Cluster
> Events:   
>
> 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
> -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。
>
> yu'an huang  于2022年4月8日周五 02:11写道:
> >
> > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
> >
> > 方便的话可以分享你运行这个命令的结果吗?
> > 》kubectl describe svc  my-first-flink-cluster-rest
> >
> >
> >
> > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  
> > > wrote:
> > >
> > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> > >
> > > Best,
> > > Zhanghao Chen
> > > 
> > > From: yidan zhao 
> > > Sent: Thursday, April 7, 2022 11:41
> > > To: user-zh 
> > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> > >
> > > 参考 
> > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > >
> > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > > flink session cluster my-first-flink-cluster successfully, JobManager
> > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> > >
> > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > > ./bin/flink run \
> > >--target kubernetes-session \
> > >-Dkubernetes.cluster-id=my-first-flink-cluster \
> > >./examples/streaming/TopSpeedWindowing.jar
> > >
> > > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > > 的clusterIp:port为192.168.127.57:8081。
> > > 2 查看任务
> > > flink list  -m 192.168.127.57:8081
> > > 3 提交任务
> > > flink run  -m 192.168.127.57:8081
> > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> > >
> > > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
> >


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 Thread yidan zhao
如下是 describe svc my-first-flink-cluster-rest 的结果:
Name: my-first-flink-cluster-rest
Namespace:default
Labels:   app=my-first-flink-cluster
  type=flink-native-kubernetes
Annotations:  
Selector:
app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
Type: LoadBalancer
IP Family Policy: SingleStack
IP Families:  IPv4
IP:   192.168.127.57
IPs:  192.168.127.57
Port: rest  8081/TCP
TargetPort:   8081/TCP
NodePort: rest  31419/TCP
Endpoints:192.168.130.11:8081
Session Affinity: None
External Traffic Policy:  Cluster
Events:   

如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
-Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。

yu'an huang  于2022年4月8日周五 02:11写道:
>
> 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
>
> 方便的话可以分享你运行这个命令的结果吗?
> 》kubectl describe svc  my-first-flink-cluster-rest
>
>
>
> > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  wrote:
> >
> > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: yidan zhao 
> > Sent: Thursday, April 7, 2022 11:41
> > To: user-zh 
> > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> >
> > 参考 
> > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> >
> > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > flink session cluster my-first-flink-cluster successfully, JobManager
> > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> >
> > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > ./bin/flink run \
> >--target kubernetes-session \
> >-Dkubernetes.cluster-id=my-first-flink-cluster \
> >./examples/streaming/TopSpeedWindowing.jar
> >
> > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > 的clusterIp:port为192.168.127.57:8081。
> > 2 查看任务
> > flink list  -m 192.168.127.57:8081
> > 3 提交任务
> > flink run  -m 192.168.127.57:8081
> > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> >
> > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
>


Re: Official Flink operator additional class paths

2022-04-07 Thread Francis Conroy
Hi Yang,

thanks a lot for your help. It ended up being the case that my command in
the initContainer was specified incorrectly.


On Thu, 7 Apr 2022 at 18:41, Yang Wang  wrote:

> It seems that you have a typo when specifying the pipeline classpath.
> "file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar" ->
> "file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar"
>
> If this is not the root cause, maybe you could have a try with downloading
> the connector jars to /opt/flink/usrlib. The usrlib will be loaded to the
> user classloader automatically without any configuration.
>
> BTW, I am not aware of any other bugs which will cause pipeline classpath
> not take effect except FLINK-21289[1].
>
> [1]. https://issues.apache.org/jira/browse/FLINK-21289
>
> Best,
> Yang
>
> Francis Conroy  于2022年4月7日周四 15:14写道:
>
>> Hi all,
>> thanks in advance for any tips.
>>
>> I've been trying to specify some additional classpaths in my kubernetes
>> yaml file when using the official flink operator and nothing seems to work.
>>
>> I know the technique for getting my job jar works fine since it's finding
>> the class ok, but I cannot get the RabbitMQ connector jar to load.
>>
>> apiVersion: flink.apache.org/v1alpha1
>> kind: FlinkDeployment
>> metadata:
>>   namespace: default
>>   name: http-over-mqtt
>> spec:
>>   image: flink:1.14.4-scala_2.12-java11
>>   flinkVersion: v1_14
>>   flinkConfiguration:
>> taskmanager.numberOfTaskSlots: "2"
>> pipeline.classpaths: 
>> "file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar"
>>   serviceAccount: flink
>>   jobManager:
>> replicas: 1
>> resource:
>>   memory: "1024m"
>>   cpu: 1
>>   taskManager:
>> resource:
>>   memory: "1024m"
>>   cpu: 1
>>   podTemplate:
>> spec:
>>   serviceAccount: flink
>>   containers:
>> - name: flink-main-container
>>   volumeMounts:
>> - mountPath: /flink-job
>>   name: flink-jobs
>> - mountPath: /flink-jars
>>   name: flink-jars
>>   initContainers:
>> - name: grab-mqtt-over-http-jar
>>   image: busybox
>>   command: [ '/bin/sh', '-c',
>>  'cd /tmp/job; wget 
>> https://jenkins/job/platform_flink/job/master/39/artifact/src-java/switchdin-topologies/target/switchdin-topologies-1.0-SNAPSHOT.jar
>>  --no-check-certificate;',
>>  'cd /tmp/jar; wget 
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-rabbitmq_2.12/1.14.4/flink-connector-rabbitmq_2.12-1.14.4.jar'
>>  ]
>>   volumeMounts:
>> - name: flink-jobs
>>   mountPath: /tmp/job
>> - name: flink-jars
>>   mountPath: /tmp/jar
>>   volumes:
>> - name: flink-jobs
>>   emptyDir: { }
>> - name: flink-jars
>>   emptyDir: { }
>>   job:
>> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
>> entryClass: org.switchdin.HTTPOverMQTT
>> parallelism: 1
>> upgradeMode: stateless
>> state: running
>>
>> Any ideas? I've looked at the ConfigMaps that result and they also look
>> fine.
>> apiVersion: v1
>> data:
>>   flink-conf.yaml: "blob.server.port: 6124\nkubernetes.jobmanager.replicas:
>> 1\njobmanager.rpc.address:
>> http-over-mqtt.default\nkubernetes.taskmanager.cpu: 1.0\n
>> kubernetes.service-account:
>> flink\nkubernetes.cluster-id: http-over-mqtt\n
>> $internal.application.program-args:
>> \nkubernetes.container.image: flink:1.14.4-scala_2.12-java11\n
>> parallelism.default:
>> 1\nkubernetes.namespace: default\ntaskmanager.numberOfTaskSlots: 2\n
>> kubernetes.rest-service.exposed.type:
>> ClusterIP\n$internal.application.main: org.switchdin.HTTPOverMQTT\n
>> taskmanager.memory.process.size:
>> 1024m\nkubernetes.internal.jobmanager.entrypoint.class:
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>> \nkubernetes.pod-template-file:
>> /tmp/podTemplate_11292791104169595925.yaml\n
>> kubernetes.pod-template-file.taskmanager:
>> /tmp/podTemplate_17362225267763549900.yaml\nexecution.target:
>> kubernetes-application\njobmanager.memory.process.size:
>> 1024m\njobmanager.rpc.port: 6123\ntaskmanager.rpc.port: 6122\n
>> internal.cluster.execution-mode:
>> NORMAL\nqueryable-state.proxy.ports: 6125\npipeline.jars:
>> local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar\n
>> kubernetes.jobmanager.cpu:
>> 1.0\npipeline.classpath:
>> file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar\n
>> kubernetes.pod-template-file.jobmanager:
>> /tmp/podTemplate_17029501154997462433.yaml\n"
>>
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received 

Re: FlinkKafkaProducer - Avro - Schema Registry

2022-04-07 Thread Qingsheng Ren
Hi Dan,

In FlinkKafkaProducer, records are serialized by the SerializationSchema 
specified in the constructor, which is the “schema” 
(ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in 
your case, instead of the serializer specified in producer properties. The 
default serializer used by FlinkKafkaProducer is ByteArraySerializer, so the 
flow of serialization would be:

[AvroObject] -> SerializationSchema -> [Bytes] -> ByteArraySerializer -> [Bytes]

So I think removing KafkaAvroSerializer from producer config and use 
AvroSerializationSchema is the right way. As you mentioned that messages could 
not be consumed back successfully, could you provide more information about how 
you consume message from Kafka (like using KafkaSource by Flink or just a 
KafkaConsumer, maybe also the configuration you are using)?

Best regards,

Qingsheng


> On Apr 5, 2022, at 16:54, Dan Serb  wrote:
> 
> Hi guys,
>  
> I’m working on a solution where I ingest Kafka Records and I need to sink 
> them to another topic using Avro and Schema Registry.
> The problem I’m facing, is that I can’t find a suitable configuration that 
> actually works for me.
>  
> I’m going to explain.
>  
>   • I have a KafkaSource that consumes basically the initial stream of 
> data.
>   • I have an Operator that maps the kafka records to Avro Objects (Java 
> POJOs generated using mvn avro plugin, based on .avsc files)
>   • I register the schemas in Schema Registry using the mvn 
> schema-registry:register plugin/goal (registering the schema type as AVRO.
>   • I have a FlinkKafkaProducer where I provide a 
> serialization schema of type ConfluentRegistrySerializationSchema.
>  
> My Kafka Properties for the Producer:
>  
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> kafkaProps.put(
> KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
> "http://schemaregistry:38081;);
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> KafkaAvroSerializer.class);
> kafkaProps.put("auto.register.schemas", false);
> kafkaProps.put("use.latest.version", true);
>  
> As I learned from other tutorials/articles, I need to basically use 
> KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
> This will bring me eventually in the place from KafkaAvroSerializer, where 
> based on how the record actually looks, it will get me the schema, it will go 
> to the schema registry and bring the schema for the needed record, and 
> serialize it before it gets sent.
> The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() 
> method, the keyedSchema is null in my case, but kafkaSchema is not null, and 
> it basically does a ‘pre-serialization’ that is transforming my Record into a 
> byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the 
> Record is already a byte[] and it basically returns back a schema of type 
> “bytes” instead of returning the schema I have for that SpecificRecord. And 
> when it brings the propper schema from the schema registry, it basically 
> fails for not being compatible. Schema {} is not compatible with schema of 
> type “bytes”.
>  
> For more context, this is how my Processor looks at this moment.
>  
> DataStream kafkaRecords =
> env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");
> 
> SingleOutputStreamOperator producedRecords =
> kafkaRecords
> .map(
> value -> {
>   String kafkaKey = value.get(KEY).asText();
>   String kafkaRecordJson = 
> MAPPER.writeValueAsString(value.get(VALUE));
>   return Converter.convert(kafkaKey, kafkaRecordJson);
> })
> .returns(TypeInformation.of(AvroObject.class));
> 
> AvroSerializationSchema schema =
> 
> ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);
> 
> FlinkKafkaProducer< AvroObject > kafkaProducer =
> new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);
> 
> producedRecords.addSink(kafkaProducer);
> 
> env.execute();
>  
> Exception:
> Caused by: java.io.IOException: Incompatible schema { avro schema here} }with 
> refs [] of type AVRO for schema "bytes".
>  
> PS: If I remove the KafkaAvroSerializer from the producer properties, it 
> works fine, but when I consume the messages, the first message gets consumed 
> but the values from the record are default ones. And the second message 
> throws exception EOFExcetion – could not debug yet exactly the cause. It 
> seems like, when I don’t have the KafkaAvroSerializer, is not actually going 
> to use the schema registry to get the schema back and use that as a 
> serializer, so I definitely need to have that there, but I still think I need 
> to do some more config changes maybe in other places, because it’s definitely 
> not working as expected.
>  
> Thanks a lot!
> I would appreciate at least some points where I could investigate more and if 
> there 

退订

2022-04-07 Thread 朱福生
退订

发自我的iPhone

yarn api 提交报错

2022-04-07 Thread 周涛






hi,
我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
flink版本1.14.4
Hadoop版本:3.0.0-cdh6.2.1
application模式,使用命令提交正常运行,api提交失败
提交失败,yarn日志:
   LogType:jobmanager.err
   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
   LogLength:107
   LogContents:
   Error: Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
   End of LogType:jobmanager.err
以下是代码:


 System.setProperty("HADOOP_USER_NAME", "hdfs");
//flink的本地配置目录,为了得到flink的配置
String configurationDirectory = 
"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";

//存放flink集群相关的jar包目录
String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
//用户jar
String userJarPath = 
"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
String flinkDistJar = 
"hdfs://nameservice1/flink/jar/libs/flink-dist.jar";

YarnClientService yarnClientService = new YarnClientService();
//yarnclient创建
YarnClient yarnClient = yarnClientService.getYarnClient();
yarnClient.start();

// 设置日志的,没有的话看不到日志
YarnClusterInformationRetriever clusterInformationRetriever = 
YarnClientYarnClusterInformationRetriever
.create(yarnClient);

//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);


flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");

flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
true);

flinkConfiguration.set(PipelineOptions.JARS, 
Collections.singletonList(userJarPath));

Path remoteLib = new Path(flinkLibs);
flinkConfiguration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString()));

flinkConfiguration.set(
YarnConfigOptions.FLINK_DIST_JAR,
flinkDistJar);

// 设置为application模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());

// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");

YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
configurationDirectory);

// 设置用户jar的参数和主类
ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
String[]{}, "com.zt.FlinkTest1");


final int jobManagerMemoryMB =

JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, 
JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
final int taskManagerMemoryMB =
TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils

.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
flinkConfiguration,

TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)

.setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
.createClusterSpecification();
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
(YarnConfiguration) yarnClient.getConfig(),
yarnClient,
clusterInformationRetriever,
true);

try {
ClusterClientProvider clusterClientProvider = 
yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);

ClusterClient clusterClient = 
clusterClientProvider.getClusterClient();

ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
log.error("applicationId is {}", applicationId);
log.error("webInterfaceURL is {}", webInterfaceURL);

// 退出
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
//yarnClient.close();
}

以下是提交的部分日志:
 09:24:01.288 [IPC Parameter Sending Thread #0] DEBUG 
org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to 
bigdata-beta2/192.168.15.185:8032 from hdfs sending #31 

Re: Produnction : Flink 1.14.4 : Kafka reader threads blocked

2022-04-07 Thread yu'an huang
Hi Vignesh,

I think you can check the following things:
1. Check the cpu usage of the workers. Are they close to zero or almost full? 
2. Any back pressure happened in downstream tasks?
3. Is the fullGC significant serious?

Best,
Yuan



> On 7 Apr 2022, at 12:33 PM, Vignesh Ramesh  wrote:
> 
> Hi Team,
> 
> We are using flink 1.14.4 and facing issues in production where our threads 
> are blocked waiting for 
> LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:346).
> 
> "Legacy Source Thread - Source: ReindexBatchProcess Source (61/64)#0" Id=376 
> WAITING on java.util.concurrent.CompletableFuture$Signaller@6ddd0af
>   at sun.misc.Unsafe.park(Native Method)
>   -  waiting on java.util.concurrent.CompletableFuture$Signaller@6ddd0af
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at org.apache.flink.runtime.io 
> .network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:346)
>   at org.apache.flink.runtime.io 
> .network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:318)
> 
> Attaching the thread dump with this mail. We also don't see any issues in 
> downstream processing . The network buffer memory and heap memory is also 
> used very less please refer below pic.
> 
> 
> 
> Kindly let us know if we can do anything to solve this issue.. We are also 
> open for commercial support.
> 



Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 Thread yu'an huang
理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?

方便的话可以分享你运行这个命令的结果吗?
》kubectl describe svc  my-first-flink-cluster-rest



> On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  wrote:
> 
> 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> 
> Best,
> Zhanghao Chen
> 
> From: yidan zhao 
> Sent: Thursday, April 7, 2022 11:41
> To: user-zh 
> Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> 
> 参考 
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> 
> 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> flink session cluster my-first-flink-cluster successfully, JobManager
> Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> 
> 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> ./bin/flink run \
>--target kubernetes-session \
>-Dkubernetes.cluster-id=my-first-flink-cluster \
>./examples/streaming/TopSpeedWindowing.jar
> 
> --- 然后如下方式是可以的,不清楚是啥问题呢。
> 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> 的clusterIp:port为192.168.127.57:8081。
> 2 查看任务
> flink list  -m 192.168.127.57:8081
> 3 提交任务
> flink run  -m 192.168.127.57:8081
> /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> 
> --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。



Re: flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread yu'an huang
Hi Sigalit,


In your settings, I guess each job will only have one slot (parallelism). So is 
it too many input  for your jobs with parallelism only one? One easy way to 
confirm is that you increase your slots and job parallelism twice and then see 
whether the QPS is increased.

Hope this would help you.

Yuan

> On 7 Apr 2022, at 7:12 PM, Sigalit Eliazov  wrote:
> 
> hi all
> 
> I would appreciate some help to understand the pipeline behaviour... 
> 
> We deployed a standalone flink cluster. The pipelines are deployed via the jm 
> rest api.
> We have 5 task managers with 1 slot each.
> 
> In total i am deploying 5 pipelines which mainly read from kafka, a simple 
> object conversion and either write back to kafka or GCP pub/sub or save in 
> the DB. 
> 
> These jobs run "forever" and basically each task manager runs a specific job 
> (this is how flink handled it).
> 
>  We have a test that sends to kafka 10k messages per second. but according to 
> the metrics exposed by flink i see that the relevant job handles only 500 
> messages per second.
> I would expect all the 10K to be handled. I guess the setup is not correct.
> 
> The messages are in avro format 
> Currently we are not using checkpoints at all.
> Any suggestions are welcome.
> 
> Thanks alot
> Sigalit
> 
> 
> 



Re: flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread Yun Tang
Hi Sigalit,

First of all, did you ensure different source operator consumes different 
consumer id for the kafka source? Did each flink job share the same data or 
consumed the data independently?

Moreover, was your job behaves back pressured? It might need to break the 
chained operator to see whether the sink back-pressured the source to impact 
the throughput of source.

Last but not least, did your source already have 100% CPU usage, which means 
your source operator has already reached to its highest throughput.

Best
Yun Tang

From: Sigalit Eliazov 
Sent: Thursday, April 7, 2022 19:12
To: user 
Subject: flink pipeline handles very small amount of messages in a second (only 
500)

hi all

I would appreciate some help to understand the pipeline behaviour...

We deployed a standalone flink cluster. The pipelines are deployed via the jm 
rest api.
We have 5 task managers with 1 slot each.

In total i am deploying 5 pipelines which mainly read from kafka, a simple 
object conversion and either write back to kafka or GCP pub/sub or save in the 
DB.

These jobs run "forever" and basically each task manager runs a specific job 
(this is how flink handled it).

 We have a test that sends to kafka 10k messages per second. but according to 
the metrics exposed by flink i see that the relevant job handles only 500 
messages per second.
I would expect all the 10K to be handled. I guess the setup is not correct.

The messages are in avro format
Currently we are not using checkpoints at all.
Any suggestions are welcome.

Thanks alot
Sigalit





flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread Sigalit Eliazov
hi all

I would appreciate some help to understand the pipeline behaviour...

We deployed a standalone flink cluster. The pipelines are deployed via
the jm rest api.
We have 5 task managers with 1 slot each.

In total i am deploying 5 pipelines which mainly read from kafka, a simple
object conversion and either write back to kafka or GCP pub/sub or save in
the DB.

These jobs run "forever" and basically each task manager runs a specific
job (this is how flink handled it).

 We have a test that sends to kafka 10k messages per second. but according
to the metrics exposed by flink i see that the relevant job handles only
500 messages per second.
I would expect all the 10K to be handled. I guess the setup is not correct.

The messages are in avro format
Currently we are not using checkpoints at all.
Any suggestions are welcome.

Thanks alot
Sigalit


Re: Missing metrics in Flink v 1.15.0 rc-0

2022-04-07 Thread Jing Ge
Hi,

Flink 1.15 has developed a new feature to support different sink pre- and
post-topologies[1].  New metrics e.g. NumRecordsSend has been developed to
measure records sent to the external system.  Metrics like "Bytes Sent" and
"Records Sent" measure records sent to the next task. So, in your case, it
is expected to be 0.

There are some further improvements that need to be done on the WebUI.
Task[2] has been created.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
[2]https://issues.apache.org/jira/browse/FLINK-27112

Best regards
Jing

On Thu, Apr 7, 2022 at 4:03 AM Xintong Song  wrote:

> Hi Peter,
>
> Have you compared the DAT topologies in 1.15 / 1.14?
>
> I think it's expected that "Records Received", "Bytes Sent" and "Records
> Sent" are 0. These metrics trace the internal data exchanges between Flink
> tasks. External data changes, i.e., source reading / sink writing data from
> / to external systems, are not counted. In your case, there's only 1
> vertex in the DAG, thus no internal data exchanges.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Apr 6, 2022 at 11:21 PM Peter Schrott 
> wrote:
>
>> Hi there,
>>
>> I just successfully upgraded our Flink cluster to 1.15.0 rc0 - also the
>> corresponding job is running on this version. Looks great so far!
>>
>> In the Web UI I noticed some metrics are missing, especially "Records
>> Received", "Bytes Sent" and "Records Sent". Those were shown in v 1.14.4.
>> See attached screenshot.
>>
>> Other than that I noticed, when using
>> org.apache.flink.metrics.prometheus.PrometheusReporter , the taskmanager
>> on which the job is running does not report the metrics on the configured
>> port. Rather it returns:
>>
>> ➜  ~ curl http://flink-taskmanager-xx:/
>> curl: (52) Empty reply from server
>>
>> The other taskmanager reports metrics.
>>
>> The exporter is configured as followed:
>>
>> # Prometheus metrics
>> metrics.reporters: prom
>> metrics.reporter.prom.class: 
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> metrics.reporter.prom.port: xx
>>
>> Is this a known issue with flink 1.15 rc0?
>>
>> Best, Peter
>>
>> [image: missingmetricsinui.png]
>>
>


RE: python table api

2022-04-07 Thread ivan.ros...@agilent.com
Hello Dian,

Indeed.  Thank you very much.  Now getting

+I[2022-01-01T10:00:20, 2022-01-01T10:00:25, 2]
+I[2022-01-01T10:00:25, 2022-01-01T10:00:30, 5]
+I[2022-01-01T10:00:30, 2022-01-01T10:00:35, 1]



from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

t_env.execute_sql("""
create table source (
ts TIMESTAMP(3),
data STRING,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format("source.csv"))

t_env.execute_sql("""
CREATE TABLE print (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
how_any BIGINT
) WITH (
'connector' = 'print'
)
""")

t_env.execute_sql("""
INSERT INTO print (
SELECT window_start, window_end, COUNT(*) as how_any
FROM TABLE(
TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end
)
""").wait()


From: Dian Fu 
Sent: Thursday, April 7, 2022 3:08 AM
To: ROSERO,IVAN (Agilent CHE) 
Cc: user 
Subject: Re: python table api



External Sender - Use caution opening files, clicking links, or responding to 
requests.

You have not configured the tumbling window at all. Please refer to [1] for 
more details.

Regards,
Dian

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation

On Wed, Apr 6, 2022 at 10:46 PM 
ivan.ros...@agilent.com 
mailto:ivan.ros...@agilent.com>> wrote:
Hello,

I'm trying to understand tumbling windows at the level of the python table api. 
   For this short example:

Input csv
Print output
2022-01-01 10:00:23.0, "data line 3"
2022-01-01 10:00:24.0, "data line 4"
2022-01-01 10:00:18.0, "data line 1"
2022-01-01 10:00:25.0, "data line 5"
2022-01-01 10:00:26.0, "data line 6"
2022-01-01 10:00:27.0, "data line 7"
2022-01-01 10:00:22.0, "data line 2"
2022-01-01 10:00:28.0, "data line 8"
2022-01-01 10:00:29.0, "data line 9"
2022-01-01 10:00:30.0, "data line 10"
+I[2022-01-01T10:00:23,  "data line 3"]
+I[2022-01-01T10:00:24,  "data line 4"]
+I[2022-01-01T10:00:18,  "data line 1"]
+I[2022-01-01T10:00:25,  "data line 5"]
+I[2022-01-01T10:00:26,  "data line 6"]
+I[2022-01-01T10:00:27,  "data line 7"]
+I[2022-01-01T10:00:28,  "data line 8"]
+I[2022-01-01T10:00:29,  "data line 9"]
+I[2022-01-01T10:00:22,  "data line 2"]
+I[2022-01-01T10:00:30,  "data line 10"]

Below, I'm trying to process this data in 5 second windows.  So I would at 
least expect not to see the bold line above, in print output.

Am I not really configuring tumbling windows in the source table?

from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

t_env.execute_sql("""
create table source (
ts TIMESTAMP(3),
data STRING,
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format("source.csv"))

t_env.execute_sql("""
CREATE TABLE print (
ts TIMESTAMP(3),
data STRING
) WITH (
'connector' = 'print'
)
""")

t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()


Thank you,

Ivan


Re: Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-07 Thread Isidoros Ioannou
Hello Arvid ,
thank you for your reply.

Actually using a window to aggregate the events for a time period is not
applicable to my case since I need the records to be processed immediately.
Even if I could I still can not understand how I could forward
the aggregated events to lets say 2 parallel operators. The slot assignment
of the KeyGroup is done by flink. You mean key by again by a different
property so that the previous aggregate events get reassigned again to
operators. I apologize if my question is naive but I got a little confused.




Στις Δευ 4 Απρ 2022 στις 10:38 π.μ., ο/η Arvid Heise 
έγραψε:

> You should create a histogram over the keys of the records. If you see a
> skew, one way to go about it is to refine the key or split aggregations.
>
> For example, consider you want to count events per users and 2 users are
> actually bots spamming lots of events accounting for 50% of all events.
> Then, you will always collect all events of each bot on one machine which
> limits scalability. You can, however, first aggregate all events per user
> per day (or any other way to subdivide). Then, the same bot can be
> processed in parallel and you then do an overall aggregation.
>
> If that's not possible, then your problem itself limits the scalability
> and you can only try to not get both bot users on the same machine (which
> can happen in 2). Then you can simply try to shift the key by adding
> constants to it and check if the distribution looks better. Have a look at
> KeyGroupRangeAssignment [1] to test that out without running Flink itself.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>
> On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou 
> wrote:
>
>> Hello Qingsheng,
>>
>> thank you a lot for your answer.
>>
>> I will try to modify the key as you mentioned in your first assumption.
>> In case the second assumption is valid also, what would you propose to
>> remedy the situation? Try to experiment with different values of max
>> parallelism?
>>
>>
>> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren 
>> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Two assumptions in my mind:
>>>
>>> 1. Records are not evenly distributed across different keys, e.g. some
>>> accountId just has more events than others. If the record distribution is
>>> predicable, you can try to combine other fields or include more information
>>> into the key field to help balancing the distribution.
>>>
>>> 2. Keys themselves are not distributed evenly. In short the subtask ID
>>> that a key belongs to is calculated by murmurHash(key.hashCode()) %
>>> maxParallelism, so if the distribution of keys is quite strange, it’s
>>> possible that most keys drop into the same subtask with the algorithm
>>> above. AFAIK there isn't such kind of metric for monitoring number of keys
>>> in a subtask, but I think you can simply investigate it with a map function
>>> after keyBy.
>>>
>>> Hope this would be helpful!
>>>
>>> Qingsheng
>>>
>>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou  wrote:
>>> >
>>> > Hello,
>>> >
>>> > we ran a flink application version 1.13.2 that consists of a kafka
>>> source with one partition so far
>>> > then we filter the data based on some conditions, mapped to POJOS and
>>> we transform to a KeyedStream based on an accountId long property from the
>>> POJO. The downstream operators are 10 CEP operators that run with
>>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
>>> * operatorParallelism).
>>> > As you see in the image attached the events are distributed unevenly
>>> so some subtasks are busy and others are idle.
>>> > Is there any way to distribute evenly the load to the subtasks? Thank
>>> you in advance.
>>> > 
>>> >
>>>
>>>


Re: BIGDECIMAL data handling

2022-04-07 Thread Francesco Guardiani
Is there any reason for not using DECIMAL provided by Flink SQL?

On Tue, Apr 5, 2022 at 4:06 PM Anitha Thankappan <
anitha.thankap...@quantiphi.com> wrote:

> Hi Martijn,
>
> I am using flink version 1.11.0.
> The flink application code snippet is like:
>
> [image: image.png]
>
> The Error I am receiving while providing BIGDECIMAL as datatype is :
> [image: image.png]
>
> Can I use unregistered structured custom data types in DDLs like Create
> Table?
>
> Thanks and Regards,
> Anitha Thankappan
>
>
> On Tue, Apr 5, 2022 at 7:21 PM Martijn Visser 
> wrote:
>
>> Hi Anitha,
>>
>> Looking at Bigquery's documentation, they're aliasing it as a BIGDECIMAL
>> [1]. According to Flink's documentation, you can create an unregistered
>> structured type as an user-defined data type [2]. You're mentioning that
>> you've failed to implement this, but what is the issue that you're running
>> into? Bigdecimal is mentioned in Flink's documentation.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>> https://github.com/MartijnVisser
>>
>> [1]
>> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types
>>
>>
>> On Tue, 5 Apr 2022 at 15:41, Anitha Thankappan <
>> anitha.thankap...@quantiphi.com> wrote:
>>
>>> Hi,
>>>
>>> I am doing a developed a Fink connector for GCP BigQuery.
>>> When we are reading BIGNUMERIC data from  BigQuery  table, we
>>> didn't find any matching data types in Flink. Also failed to implement user
>>> defined data type for  BIGNUMERIC.
>>>
>>> Please let me know if there is any way to handle this.
>>>
>>> Thanks in Advance,
>>> Anitha Thankappan
>>>
>>> *This message contains information that may be privileged or
>>> confidential and is the property of the Quantiphi Inc and/or its 
>>> affiliates**.
>>> It is intended only for the person to whom it is addressed. **If you
>>> are not the intended recipient, any review, dissemination, distribution,
>>> copying, storage or other use of all or any portion of this message is
>>> strictly prohibited. If you received this message in error, please
>>> immediately notify the sender by reply e-mail and delete this message in
>>> its **entirety*
>>>
>>
> *This message contains information that may be privileged or confidential
> and is the property of the Quantiphi Inc and/or its affiliates**. It is
> intended only for the person to whom it is addressed. **If you are not
> the intended recipient, any review, dissemination, distribution, copying,
> storage or other use of all or any portion of this message is strictly
> prohibited. If you received this message in error, please immediately
> notify the sender by reply e-mail and delete this message in its *
> *entirety*
>


Re: HOP_PROCTIME is returning null

2022-04-07 Thread Francesco Guardiani
Maybe the reason is because the HOP_PROCTIME gets the name of the column?
Can you share query and plan?

On Mon, Apr 4, 2022 at 3:41 PM Surendra Lalwani 
wrote:

> Hi Team,
>
> HOP_PROCTIME in flink version 1.13.6 is returning null while in previous
> versions it used to output a time attribute, any idea why this behaviour is
> observed?
>
> If anybody has any alternative, it will be highly appreciable.
>
> Example: HOP_PROCTIME(PROCTIME() , INTERVAL '30' SECOND, INTERVAL '30'
> SECOND)
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> --
> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
> confidential information and is intended only for the addressee(s) named
> above. If you are not the intended recipient(s), you should not
> disseminate, distribute, or copy this e-mail. Please notify the sender by
> reply e-mail immediately if you have received this e-mail in error and
> permanently delete all copies of the original message from your system.
> E-mail transmission cannot be guaranteed to be secure as it could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. Company accepts no liability for any damage or loss of
> confidential information caused by this email or due to any virus
> transmitted by this email or otherwise.


Re: python table api

2022-04-07 Thread Francesco Guardiani
As Dian sad, your insert into query is just selecting records from source
to print, flowing them without any computation whatsoever.

Please check out [1] and [2] to learn how to develop queries that perform
aggregations over windows. Note that the second method (window tvf) is
preferred and recommended over the first.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/

On Thu, Apr 7, 2022 at 3:09 AM Dian Fu  wrote:

> You have not configured the tumbling window at all. Please refer to [1]
> for more details.
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
>
> On Wed, Apr 6, 2022 at 10:46 PM ivan.ros...@agilent.com <
> ivan.ros...@agilent.com> wrote:
>
>> Hello,
>>
>>
>>
>> I’m trying to understand tumbling windows at the level of the python
>> table api.For this short example:
>>
>>
>>
>> Input csv
>>
>> Print output
>>
>> 2022-01-01 10:00:23.0, "data line 3"
>>
>> 2022-01-01 10:00:24.0, "data line 4"
>>
>> 2022-01-01 10:00:18.0, "data line 1"
>>
>> 2022-01-01 10:00:25.0, "data line 5"
>>
>> 2022-01-01 10:00:26.0, "data line 6"
>>
>> 2022-01-01 10:00:27.0, "data line 7"
>>
>> 2022-01-01 10:00:22.0, "data line 2"
>>
>> 2022-01-01 10:00:28.0, "data line 8"
>>
>> 2022-01-01 10:00:29.0, "data line 9"
>>
>> 2022-01-01 10:00:30.0, "data line 10"
>>
>> +I[2022-01-01T10:00:23,  "data line 3"]
>>
>> +I[2022-01-01T10:00:24,  "data line 4"]
>>
>> +I[2022-01-01T10:00:18,  "data line 1"]
>>
>> +I[2022-01-01T10:00:25,  "data line 5"]
>>
>> +I[2022-01-01T10:00:26,  "data line 6"]
>>
>> +I[2022-01-01T10:00:27,  "data line 7"]
>>
>> +I[2022-01-01T10:00:28,  "data line 8"]
>>
>> +I[2022-01-01T10:00:29,  "data line 9"]
>>
>> *+I[2022-01-01T10:00:22,  "data line 2"]*
>>
>> +I[2022-01-01T10:00:30,  "data line 10"]
>>
>>
>>
>> Below, I’m trying to process this data in 5 second windows.  So I would
>> at least expect not to see the bold line above, in print output.
>>
>>
>>
>> Am I not really configuring tumbling windows in the source table?
>>
>>
>>
>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>
>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>
>> t_env.get_config().get_configuration().set_string("parallelism.default",
>> "1")
>>
>>
>>
>> t_env.execute_sql("""
>>
>> create table source (
>>
>> ts TIMESTAMP(3),
>>
>> data STRING,
>>
>> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
>>
>> ) with (
>>
>> 'connector' = 'filesystem',
>>
>> 'format' = 'csv',
>>
>> 'path' = '{}'
>>
>> )
>>
>> """.format("source.csv"))
>>
>>
>>
>> t_env.execute_sql("""
>>
>> CREATE TABLE print (
>>
>> ts TIMESTAMP(3),
>>
>> data STRING
>>
>> ) WITH (
>>
>> 'connector' = 'print'
>>
>> )
>>
>> """)
>>
>>
>>
>> t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
>>
>>
>>
>>
>>
>> Thank you,
>>
>>
>>
>> Ivan
>>
>


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 Thread Zhanghao Chen
你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Thursday, April 7, 2022 11:41
To: user-zh 
Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。

参考 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes

基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
flink session cluster my-first-flink-cluster successfully, JobManager
Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。

并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar

--- 然后如下方式是可以的,不清楚是啥问题呢。
1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
的clusterIp:port为192.168.127.57:8081。
2 查看任务
flink list  -m 192.168.127.57:8081
3 提交任务
flink run  -m 192.168.127.57:8081
/home/work/flink/examples/streaming/TopSpeedWindowing.jar

--- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。


Re: Official Flink operator additional class paths

2022-04-07 Thread Yang Wang
It seems that you have a typo when specifying the pipeline classpath.
"file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar" ->
"file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar"

If this is not the root cause, maybe you could have a try with downloading
the connector jars to /opt/flink/usrlib. The usrlib will be loaded to the
user classloader automatically without any configuration.

BTW, I am not aware of any other bugs which will cause pipeline classpath
not take effect except FLINK-21289[1].

[1]. https://issues.apache.org/jira/browse/FLINK-21289

Best,
Yang

Francis Conroy  于2022年4月7日周四 15:14写道:

> Hi all,
> thanks in advance for any tips.
>
> I've been trying to specify some additional classpaths in my kubernetes
> yaml file when using the official flink operator and nothing seems to work.
>
> I know the technique for getting my job jar works fine since it's finding
> the class ok, but I cannot get the RabbitMQ connector jar to load.
>
> apiVersion: flink.apache.org/v1alpha1
> kind: FlinkDeployment
> metadata:
>   namespace: default
>   name: http-over-mqtt
> spec:
>   image: flink:1.14.4-scala_2.12-java11
>   flinkVersion: v1_14
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> pipeline.classpaths: 
> "file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar"
>   serviceAccount: flink
>   jobManager:
> replicas: 1
> resource:
>   memory: "1024m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "1024m"
>   cpu: 1
>   podTemplate:
> spec:
>   serviceAccount: flink
>   containers:
> - name: flink-main-container
>   volumeMounts:
> - mountPath: /flink-job
>   name: flink-jobs
> - mountPath: /flink-jars
>   name: flink-jars
>   initContainers:
> - name: grab-mqtt-over-http-jar
>   image: busybox
>   command: [ '/bin/sh', '-c',
>  'cd /tmp/job; wget 
> https://jenkins/job/platform_flink/job/master/39/artifact/src-java/switchdin-topologies/target/switchdin-topologies-1.0-SNAPSHOT.jar
>  --no-check-certificate;',
>  'cd /tmp/jar; wget 
> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-rabbitmq_2.12/1.14.4/flink-connector-rabbitmq_2.12-1.14.4.jar'
>  ]
>   volumeMounts:
> - name: flink-jobs
>   mountPath: /tmp/job
> - name: flink-jars
>   mountPath: /tmp/jar
>   volumes:
> - name: flink-jobs
>   emptyDir: { }
> - name: flink-jars
>   emptyDir: { }
>   job:
> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
> entryClass: org.switchdin.HTTPOverMQTT
> parallelism: 1
> upgradeMode: stateless
> state: running
>
> Any ideas? I've looked at the ConfigMaps that result and they also look
> fine.
> apiVersion: v1
> data:
>   flink-conf.yaml: "blob.server.port: 6124\nkubernetes.jobmanager.replicas:
> 1\njobmanager.rpc.address:
> http-over-mqtt.default\nkubernetes.taskmanager.cpu: 1.0\n
> kubernetes.service-account:
> flink\nkubernetes.cluster-id: http-over-mqtt\n
> $internal.application.program-args:
> \nkubernetes.container.image: flink:1.14.4-scala_2.12-java11\n
> parallelism.default:
> 1\nkubernetes.namespace: default\ntaskmanager.numberOfTaskSlots: 2\n
> kubernetes.rest-service.exposed.type:
> ClusterIP\n$internal.application.main: org.switchdin.HTTPOverMQTT\n
> taskmanager.memory.process.size:
> 1024m\nkubernetes.internal.jobmanager.entrypoint.class:
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> \nkubernetes.pod-template-file:
> /tmp/podTemplate_11292791104169595925.yaml\n
> kubernetes.pod-template-file.taskmanager:
> /tmp/podTemplate_17362225267763549900.yaml\nexecution.target:
> kubernetes-application\njobmanager.memory.process.size:
> 1024m\njobmanager.rpc.port: 6123\ntaskmanager.rpc.port: 6122\n
> internal.cluster.execution-mode:
> NORMAL\nqueryable-state.proxy.ports: 6125\npipeline.jars:
> local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar\n
> kubernetes.jobmanager.cpu:
> 1.0\npipeline.classpath:
> file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar\n
> kubernetes.pod-template-file.jobmanager:
> /tmp/podTemplate_17029501154997462433.yaml\n"
>
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71

2022-04-07 Thread su wenwen
hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:
[cid:b11b980a-9bcd-4e7d-993a-e83a9322c66c]

blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。


Is there any way to get the ExecutionConfigurations in Dynamic factory class

2022-04-07 Thread Anitha Thankappan
Hi

I have developed a BigQuery Flink connector by
implementing DynamicTableSourceFactory.
I have a requirement to :
   get the configured parallelism value of
StreamExecutionEnvironment in the Factory class.
   or
   set the parallelism at Factory class or Table source
class level.
Please help me on this.


Thanks and Regards,
Anitha Thankappan

-- 
_This message contains information that may be privileged or confidential 
and is the property of the Quantiphi Inc and/or its affiliates_. It is 
intended only for the person to whom it is addressed. _If you are not the 
intended recipient, any review, dissemination, distribution, copying, 
storage or other use of all or any portion of this message is strictly 
prohibited. If you received this message in error, please immediately 
notify the sender by reply e-mail and delete this message in its 
*entirety*___


Re: flink table store

2022-04-07 Thread Paul Lam
@tison 
https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/
 


Best,
Paul Lam

> 2022年4月7日 15:05,tison  写道:
> 
> 我有点好奇官网看到的链接在哪,能不能来个链接捏。
> 
> Best,
> tison.
> 
> 
> Leonard Xu  于2022年4月7日周四 14:47写道:
> 
>> 
>> 项目是开源的[1], 最近快要发布第一个版本了,可以关注下
>> 
>> Best,
>> Leonard
>> [1] https://github.com/apache/flink-table-store <
>> https://github.com/apache/flink-table-store>
>> 
>> 
>> 
>>> 2022年4月7日 上午9:54,Xianxun Ye  写道:
>>> 
>>> 这里有 flink table store 的设计文档,你可以了解下。
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
>>> 
>>> 
>>> Best regards,
>>> 
>>> 
>>> Xianxun
>>> 
>>> 
>>> On 04/6/2022 16:56,LuNing Wang wrote:
>>> Hi,
>>> 
>>> Table store是存储,应和数据湖类似
>>> 
>>> Best,
>>> LuNing Wang
>>> 
>>> yidan zhao  于2022年4月6日周三 16:55写道:
>>> 
>>> 看官网出来个 flink table store,和 flink table、flink sql 那一套有啥区别呢?
>>> 
>> 
>> 



Official Flink operator additional class paths

2022-04-07 Thread Francis Conroy
Hi all,
thanks in advance for any tips.

I've been trying to specify some additional classpaths in my kubernetes
yaml file when using the official flink operator and nothing seems to work.

I know the technique for getting my job jar works fine since it's finding
the class ok, but I cannot get the RabbitMQ connector jar to load.

apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
  namespace: default
  name: http-over-mqtt
spec:
  image: flink:1.14.4-scala_2.12-java11
  flinkVersion: v1_14
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
pipeline.classpaths:
"file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar"
  serviceAccount: flink
  jobManager:
replicas: 1
resource:
  memory: "1024m"
  cpu: 1
  taskManager:
resource:
  memory: "1024m"
  cpu: 1
  podTemplate:
spec:
  serviceAccount: flink
  containers:
- name: flink-main-container
  volumeMounts:
- mountPath: /flink-job
  name: flink-jobs
- mountPath: /flink-jars
  name: flink-jars
  initContainers:
- name: grab-mqtt-over-http-jar
  image: busybox
  command: [ '/bin/sh', '-c',
 'cd /tmp/job; wget
https://jenkins/job/platform_flink/job/master/39/artifact/src-java/switchdin-topologies/target/switchdin-topologies-1.0-SNAPSHOT.jar
--no-check-certificate;',
 'cd /tmp/jar; wget
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-rabbitmq_2.12/1.14.4/flink-connector-rabbitmq_2.12-1.14.4.jar'
]
  volumeMounts:
- name: flink-jobs
  mountPath: /tmp/job
- name: flink-jars
  mountPath: /tmp/jar
  volumes:
- name: flink-jobs
  emptyDir: { }
- name: flink-jars
  emptyDir: { }
  job:
jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
entryClass: org.switchdin.HTTPOverMQTT
parallelism: 1
upgradeMode: stateless
state: running

Any ideas? I've looked at the ConfigMaps that result and they also look
fine.
apiVersion: v1
data:
  flink-conf.yaml: "blob.server.port: 6124\nkubernetes.jobmanager.replicas:
1\njobmanager.rpc.address:
http-over-mqtt.default\nkubernetes.taskmanager.cpu: 1.0\n
kubernetes.service-account:
flink\nkubernetes.cluster-id: http-over-mqtt\n
$internal.application.program-args:
\nkubernetes.container.image: flink:1.14.4-scala_2.12-java11\n
parallelism.default:
1\nkubernetes.namespace: default\ntaskmanager.numberOfTaskSlots: 2\n
kubernetes.rest-service.exposed.type:
ClusterIP\n$internal.application.main: org.switchdin.HTTPOverMQTT\n
taskmanager.memory.process.size:
1024m\nkubernetes.internal.jobmanager.entrypoint.class:
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
\nkubernetes.pod-template-file:
/tmp/podTemplate_11292791104169595925.yaml\n
kubernetes.pod-template-file.taskmanager:
/tmp/podTemplate_17362225267763549900.yaml\nexecution.target:
kubernetes-application\njobmanager.memory.process.size:
1024m\njobmanager.rpc.port: 6123\ntaskmanager.rpc.port: 6122\n
internal.cluster.execution-mode:
NORMAL\nqueryable-state.proxy.ports: 6125\npipeline.jars:
local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar\n
kubernetes.jobmanager.cpu:
1.0\npipeline.classpath:
file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar\n
kubernetes.pod-template-file.jobmanager:
/tmp/podTemplate_17029501154997462433.yaml\n"

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: flink table store

2022-04-07 Thread tison
我有点好奇官网看到的链接在哪,能不能来个链接捏。

Best,
tison.


Leonard Xu  于2022年4月7日周四 14:47写道:

>
> 项目是开源的[1], 最近快要发布第一个版本了,可以关注下
>
> Best,
> Leonard
> [1] https://github.com/apache/flink-table-store <
> https://github.com/apache/flink-table-store>
>
>
>
> > 2022年4月7日 上午9:54,Xianxun Ye  写道:
> >
> > 这里有 flink table store 的设计文档,你可以了解下。
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > Best regards,
> >
> >
> > Xianxun
> >
> >
> > On 04/6/2022 16:56,LuNing Wang wrote:
> > Hi,
> >
> > Table store是存储,应和数据湖类似
> >
> > Best,
> > LuNing Wang
> >
> > yidan zhao  于2022年4月6日周三 16:55写道:
> >
> > 看官网出来个 flink table store,和 flink table、flink sql 那一套有啥区别呢?
> >
>
>


Re: flink table store

2022-04-07 Thread Leonard Xu

项目是开源的[1], 最近快要发布第一个版本了,可以关注下

Best,
Leonard
[1] https://github.com/apache/flink-table-store 




> 2022年4月7日 上午9:54,Xianxun Ye  写道:
> 
> 这里有 flink table store 的设计文档,你可以了解下。
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> 
> 
> Best regards,
> 
> 
> Xianxun
> 
> 
> On 04/6/2022 16:56,LuNing Wang wrote:
> Hi,
> 
> Table store是存储,应和数据湖类似
> 
> Best,
> LuNing Wang
> 
> yidan zhao  于2022年4月6日周三 16:55写道:
> 
> 看官网出来个 flink table store,和 flink table、flink sql 那一套有啥区别呢?
>