退订

2024-03-13 Thread 一飞
退订




退订

2024-01-18 Thread
退订

退订

2024-01-18 Thread
退订

Socket timeout when report metrics to pushgateway

2023-12-12 Thread
hello,
  we build flink report metrics to prometheus pushgateway, the program has been 
running for a period of time, with a amount of data reported to pushgateway, 
pushgateway response socket timeout exception, and much of metrics data 
reported failed. following is the exception:


 2023-12-12 04:13:07,812 WARN 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter [] - Failed 
to push metrics to PushGateway with jobName
00034937_20231211200917_54ede15602bb8704c3a98ec481bea96, groupingKey{}.
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream. socketRead(Native Method) ~[?:1.8.0_281]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0 
281]
at java.net.SocketInputStream.read(SocketInputStream. java:171) ~[?:1.8.0 281] 
at java.net.SocketInputStream.read(SocketInputStream. java:141) ~[?:1.8.0 2811
at java.io.BufferedInputStream.fill (BufferedInputStream. java:246) ~[?:1.8.0 
2811 at java.io. BufferedInputStream.read1(BufferedInputStream.java:286) 
~[?:1.8.0_281] at 
java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0 281] 
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) 
~[?:1.8.0_281] at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) 
~[?:1.8.0_281] at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
 ~[?:1.8.0_281] at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
 ~[?:1.8.0 2811 at 
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)~[?:1.8.0_281]
 at 
io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:315)~[flink-metrics-prometheus-1.13.5.jar:1.13.5]
at io.prometheus. client.exporter .PushGateway .push (PushGatevay . java:138) 
~[flink-metrics-prometheus-1.13.5. jar:1.13.51
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:63)
[flink-metrics-prometheus-1.13.5.jar:1.13.51
at org.apache. flink.runtime.metrics.MetricRegistryImp1$ReporterTask.run 
(MetricRegistryImpl. java:494) [flink-dist_2.11-1.13.5.jar:1.13.5]

after test, it was caused with amount of data reported to pushgateway, then we 
restart pushgateway server and the exception disappeared, but after sever hours 
the exception re-emergenced.

so i want to know how to config flink or pushgateway to avoid the exception?

best regards.
leilinee 

Re:Re: 退订

2023-11-22 Thread 国辉

退订




--
发自我的网易邮箱手机智能版



- Original Message -
From: "Junrui Lee" 
To: user-zh@flink.apache.org
Sent: Wed, 22 Nov 2023 10:19:32 +0800
Subject: Re: 退订

Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。

Best,
Junrui

万建国 <1097498...@qq.com.invalid> 于2023年11月22日周三 10:10写道:

> 退订


回复: flink sql不支持show create catalog 吗?

2023-10-20 Thread 宇彬
Hi Feng


我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
| |
 回复的原邮件 
| 发件人 | Feng Jin |
| 发送日期 | 2023年10月20日 13:18 |
| 收件人 |  |
| 主题 | Re: flink sql不支持show create catalog 吗? |
hi casel


从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。


Best,
Feng

On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:

之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
sql不支持show create catalog 。
而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


flink异步IO超时时,如何释放连接池资源

2023-07-20 Thread 一飞
代码如下:
package com.didichuxing.iov.func

import com.didichuxing.iov.util.JedisTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.async.{ResultFuture, 
RichAsyncFunction}
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis

import java.util.concurrent.{CompletableFuture, ExecutorService, Executors}
import java.util.function.{Consumer, Supplier}
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable

abstract class RedisMgetAsyncFunc[IN, OUT] extends RichAsyncFunction[IN, 
Option[OUT]] with RedisMgetAsyncBase[IN, OUT] {
 private val logger = LoggerFactory.getLogger(this.getClass)
 var service: ExecutorService = _
 var start_ts: Long = 0l
 var connect_complete_ts: Long = 0l
 var research_complete_ts: Long = 0l
 var timeout_ts: Long = 0l
 var end_ts: Long = 0l

 override def open(parameters: Configuration): Unit = {
   service = Executors.newFixedThreadPool(20)
 }

 override def close(): Unit = {
   service.shutdown()
 }

 override def asyncInvoke(input: IN, resultFuture: ResultFuture[Option[OUT]]): 
Unit = {
   try {
 CompletableFuture
   //异步发送查询请求
   .supplyAsync(new Supplier[mutable.Buffer[String]] {
 override def get(): mutable.Buffer[String] = {
   start_ts = System.currentTimeMillis()
   var jedis: Jedis = null
   try {
 jedis = JedisTool.getJedis()
 connect_complete_ts = System.currentTimeMillis()
 val gpsCellKeys = getKey(input)
 jedis.mget(gpsCellKeys: _*).asScala
   }
   catch {
 case exception: Exception => {
   logger.error("mget异步查询 redis获取连接失败:" + 
exception.fillInStackTrace())
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   mutable.Buffer()
 }
   } finally {
 try {
   jedis.close()
 }
 catch {
   case exception: Exception => {
 logger.error("mget异步查询 redis关闭失败:" + 
exception.fillInStackTrace())
 logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   }
 }
   }
 }
   }, service)
   //查询结果回调
   .thenAccept(new Consumer[mutable.Buffer[String]] {
 override def accept(gpsCellResults: mutable.Buffer[String]): Unit = {
   research_complete_ts = System.currentTimeMillis()
   process(input, gpsCellResults, resultFuture)
   end_ts = System.currentTimeMillis()
 }
   })
   }
   catch {
 case exception: Exception => {
   logger.error("mget异步查询 失败:" + exception.fillInStackTrace())
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
 }
   resultFuture.complete(Iterable(None))
   }
 }

 override def timeout(input: IN, resultFuture: ResultFuture[Option[OUT]]): Unit 
= {
   timeout_ts = System.currentTimeMillis()
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   resultFuture.complete(Iterable(None))
   //super.timeout(input, resultFuture)
 }
}




ASF jira account

2023-07-20 Thread 天龙
您好!
我想注册一个flink jira的账号,但由于提出的里有不充分给拒掉了,想再次申请,却提示邮箱已申请过,还未处理:


There is already a pending Jira account request associated with this email 
address. Please wait for it to be processed


请问怎么解决这个问题,并且成功申请一个账号






--
发自我的网易邮箱平板适配版

How to set hdfs configuration in flink kubernetes operator

2023-06-23 Thread

Hi all,

Recently, I have been testing the Flink Kubernetes Operator. In the official 
example, the checkpoint/savepoint path is configured with a file system:


state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha

However, in our production environment, we use HDFS to store checkpoint data. 
I'm wondering if it's possible to store checkpoint data in the Flink Kubernetes 
Operator as well. If so, could you please guide me on how to set up HDFS 
configuration in the Flink Kubernetes Operator?

I would greatly appreciate any assistance you can provide. Thank you!


Re: How to set hdfs configuration in flink kubernetes operator?

2023-06-22 Thread
Hi Dongwoo,

Thank you very much for your response. It has been very helpful to me.

Your email mentioned the configuration of keytab and krb.file, as well as how 
to configure and write them into HDFS security.

However, if the pod doesn't know the location of the HDFS namenode, it needs to 
load the hdfs-core.xml file into the Flink environment and notify the HDFS 
namenode to write data to HDFS.

In Flink on YARN mode, we can set the "export HADOOP_CONF_DIR" environment 
variable, and the hdfs-core.xml file can be saved in the HADOOP_CONF_DIR. Flink 
can automatically detect the namenode. My main question is how to load the 
hdfs-core.xml file in the Flink Kubernetes operator. If you know how to do 
that, please let me know.

I hope to receive your response via email. Thank you!


发件人: Dongwoo Kim 
发送时间: Wednesday, June 21, 2023 7:56:52 PM
收件人: 李 琳 
抄送: user@flink.apache.org 
主题: Re: How to set hdfs configuration in flink kubernetes operator?

Hi leilinee,

I'm not sure whether this is the best practice but I would like to share our 
experience about configuring HDFS as checkpoint storage while using flink 
kubernetes operator.
There are two steps.

Step 1) Mount krb5-conf & keytab file to flink kubernetes operator pod

You have to create configmap and secret for krb5.conf and keytab respectively, 
and apply below configs to flink kuberentes operator's values.yaml

operatorVolumeMounts:
  create: true
  data:
- mountPath: /opt/flink/krb5.conf
  name: krb5-conf
  subPath: krb5.conf
- mountPath: /opt/flink/{keytab_file}
  name: custom-keytab
  subPath: {keytab_file}
operatorVolumes:
  create: true
  data:
- configMap:
name: krb5-configmap
  name: krb5-conf
- name: custom-keytab
  secret:
secretName: custom-keytab

Step 2) Configure FlinkDeployment like below in your application

apiVersion: flink.apache.org/v1beta1<http://flink.apache.org/v1beta1>
kind: FlinkDeployment
spec:
  flinkConfiguration:
state.checkpoint-storage: "filesystem"
state.checkpoints.dir: "hdfs:{path_for_checkpoint}"
security.kerberos.login.keytab: "/opt/flink/{keytab_file}"   # Absolute 
path in flink k8s operator pod
security.kerberos.login.principal: "{principal_name}"
security.kerberos.relogin.period: "5m"
security.kerberos.krb5-conf.path: "/opt/flink/krb5.conf" # Absolute 
path in flink k8s operator pod

I hope this could help your work.

Best regards
dongwoo



2023년 6월 21일 (수) 오후 7:36, 李 琳 
mailto:leili...@outlook.com>>님이 작성:
Hi all,

Recently, I have been testing the Flink Kubernetes Operator. In the official 
example, the checkpoint/savepoint path is configured with a file system:


state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha

However, in our production environment, we use HDFS to store checkpoint data. 
I'm wondering if it's possible to store checkpoint data in the Flink Kubernetes 
Operator as well. If so, could you please guide me on how to set up HDFS 
configuration in the Flink Kubernetes Operator?

I would greatly appreciate any assistance you can provide. Thank you!


How to set hdfs configuration in flink kubernetes operator?

2023-06-21 Thread
Hi all,

Recently, I have been testing the Flink Kubernetes Operator. In the official 
example, the checkpoint/savepoint path is configured with a file system:


state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha

However, in our production environment, we use HDFS to store checkpoint data. 
I'm wondering if it's possible to store checkpoint data in the Flink Kubernetes 
Operator as well. If so, could you please guide me on how to set up HDFS 
configuration in the Flink Kubernetes Operator?

I would greatly appreciate any assistance you can provide. Thank you!


退订

2023-05-05 Thread



(无主题)

2023-03-22 Thread


Re:Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-07 Thread 银苗
退订

关于Flink重启策略疑惑

2022-12-08 Thread
你好,我们团队在调研Flink相关技术。关于故障重启策略有些困惑
Task 故障恢复 | Apache Flink

1.故障重启是通过什么技术手段触发的,我搜查了很多资料 ,都仅提到重启策略是怎么配置的,但是谁触发的? 它不可能挂掉了自己重启吧?
2.故障重启是Task级别还是作用于TaskManager服务?

感谢并支持Flink开发者们的工作,Thanks!


How's JobManager bring up TaskManager in Application Mode or Session Mode?

2022-11-28 Thread
Hi,

   How's JobManager bring up TaskManager in Application Mode or Session Mode? I 
can’t get it even after reading source code of flink operator?

Any help will be appreciate, Thank you.

 Mark




Re:Re: Flink Operator in an off-line k8s enviroment

2022-11-22 Thread
Hi Geng Biao,
I works for me, thank you.
















At 2022-11-22 23:16:41, "Geng Biao"  wrote:

Hi Mark,

 

I guess you have to create your own local image registry service which your k8s 
cluster can connect to and upload the image of flink k8s operator to the 
service. After that, you can run something like `helm install 
flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set 
image.repository= ` to tell the k8s to use 
your local image.

 

Best,
Biao Geng

 

From: Mark Lee 
Date: Tuesday, November 22, 2022 at 9:01 PM
To: user@flink.apache.org 
Subject: Flink Operator in an off-line k8s enviroment

Hi all,

I installed flink operator following 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/.

helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/

helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator

 

I execute above commands in a helm client(can touch internet) collect a k8s 
environment which cann’t connect internet.

 

  The flink operator is installed correctly but I got such errors for my k8s 
cluster can’t connect internet.

What steps can I do to run flink operator correctly in my off-line k8s cluster?

Should I run a local helm repo to replace the image 
“ghcr.io/apache/flink-kubernetes-operator:95128bf” to a local image?

 

   Thank you.

 

[root@localhost ~]# kubectl  get pods

NAMEREADY   STATUS RESTARTS 
  AGE

flink-kubernetes-operator-7797c7bd7-tpbqf   0/1 ImagePullBackOff   0
  124m

 

[root@localhost ~]# kubectl  describe pod 
flink-kubernetes-operator-7797c7bd7-tpbqf | grep Image -C 5

Normal   AddedInterface  124mmultus Add eth0 
[10.128.6.212/14] from kube-ovn

  Warning  Failed  119m (x4 over 123m) kubeletError: 
ErrImagePull

  Warning  Failed  118m (x7 over 123m) kubeletError: 
ImagePullBackOff

  Normal   Pulling 34m (x19 over 124m) kubeletPulling 
image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

  Warning  Failed  8m53s (x23 over 123m)   kubeletFailed to 
pull image "ghcr.io/apache/flink-kubernetes-operator:95128bf": rpc error: code 
= Unknown desc = pinging container registry ghcr.io: Get "https://ghcr.io/v2/": 
dial tcp 20.205.243.164:443: i/o timeout

  Normal   BackOff 4m20s (x424 over 123m)  kubeletBack-off 
pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

 

退订

2022-11-21 Thread


回复: Re: table store 和connector-kafka包冲突吗?

2022-10-11 Thread 宇彬
方便贴下sql吗,我在flink 1.15 + fts 0.3.0-SNAPSHOT没复现出这个问题


在2022年10月11日 09:19,RS 写道:
Hi,


去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。


Thanks

在 2022-10-10 12:50:33,"yanfei lei"  写道:
Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
把flink-connector-kafka-1.15.1.jar 去掉再试试?


RS  于2022年10月8日周六 17:19写道:

Hi,
报错如下:


[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Multiple factories for
identifier 'kafka' that implement
'org.apache.flink.table.factories.DynamicTableFactory' found in the
classpath.


Ambiguous factory classes are:


org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.table.store.kafka.KafkaLogStoreFactory

org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory


Thanks





在 2022-10-08 13:38:20,"Shammon FY"  写道:
Hi RS
你这边能提供一下具体的冲突错误栈吗?

On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:

Hi,


版本:flink-1.15.1
使用table

store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了


是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?


Thanks



partition

2022-09-11 Thread
最近在使用flink1.12 
的sql方式时,发现ROW_NUMBER去重的结果不符合预期。具体表现为:当在第二天有新的optime出现时,下发的时间却还是昨天日志中的optime,看到官方文档中说该去重函数只支持单个的时间属性
 排序/降序,所以不知是否我自己的sql中使用了两个字段排序造成的?代码如下,大佬帮忙看看:




SELECT
pk_id, 
optime_timestamp,
process_time
FROM (
SELECT
pk_id,
process_time,
CAST(optime / 1000 AS BIGINT) AS optime_timestamp,
ROW_NUMBER() OVER(PARTITION BY pk_id ORDER BY 
FROM_UNIXTIME(CAST(optime / 1000 AS BIGINT), 'MMdd') DESC, optime) AS rn
FROM 
tabke_a
) t
WHERE  

  rn = 1











回复:Re: 关于Flink对中标/银河麒麟和统信的适配

2022-07-23 Thread 世钰
目前可以直接部署到 中标麒麟v7和银河麒麟v10系统中,生产环境已运行一年多时间。

--

该邮件从移动设备发送


--原始邮件--
发件人:"PxNew "<15701181132mr@gmail.com;
发送时间:2022年7月23日(星期六) 晚上6:47
收件人:"user-zh" 

es连接器支持lookup

2022-07-06 Thread 宇彬
各位大佬,
看了下现有的es连接器只支持sink,请问有支持lookup的计划吗,还是说已经有jira了?

任务 cancel 失败,个别 task 一直处于 CANCELING 状态

2022-06-27 Thread
求助:如题,Flink 版本 1.13.2,作业部署在 k8s

1、概览:


2、被 hang 住的TM 日志,之后没有其他日志了,也没有异常:



3、jstack 分析,没有发现 Block 状态的线程


taskmanager_mipha-69-taskmanager-1-18_thread_dump.json
Description: application/json




(无主题)

2022-06-12 Thread 国辉

退订

Re:Re: flink sql api, exception when setting "table.exec.state.ttl"

2022-05-25 Thread 诗君



I have feagured this out.
It was because I put a flink-connector-tidb-cdc.jar in my Flink's lib folder 
earlier, and it is shipped with scala 2.11, while my flink is shipped with 
scala2.12.
Some how when I submit a job with GroupAggregate operator, it needs to load 
keyed rocksdb states, and here come into a conflict.
I will look into it and give a solution.










At 2022-05-23 20:55:39, "Chesnay Schepler"  wrote:

You're probably mixing Flink versions.


From the stack trace we can see that Flink classes are being loaded from 2 
different jars (rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); 
I'd suggest to resolve that first and see if the error persists.



On 23/05/2022 14:32, 李诗君 wrote:

flink version: 1.13.5


java code:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

env.setStateBackend(new 
RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
true));

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
Configuration configuration = tableEnv.getConfig().getConfiguration();
//
configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ 
test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.Ab

flink sql api, exception when setting "table.exec.state.ttl"

2022-05-23 Thread 诗君
flink version: 1.13.5


java code:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

env.enableCheckpointing(6);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

env.setStateBackend(new 
RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
true));

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
Configuration configuration = tableEnv.getConfig().getConfiguration();
//
configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ 
test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)
 

Question about Flink(Standalone)

2022-03-21 Thread 杰进13922030138


Hi~  


I am Jiejin Li, a Flinkusers from china.I failed to submit a flink task using 
Flink Standalone mode with the following error:


Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme  The scheme is 
not directly supported by Flink and no Hadoop file system to support this 
scheme could be loaded. For a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


According to the error prompt, I downloaded the “ 
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar” and imported it into the 
${FLINK_HOME}/lib directory, and also configured the Flink-conf.yaml file with 
“env.hadoop.conf.dir:/opt/modules/hadoop-2.7.3/etc/hadoop”, but the error could 
not be solved. Please advise how to deal with this situation.


Thank you !


 


从 Windows 版邮件发送


 


 




设定kafka消费结束位置

2022-03-20 Thread 航飞
我想通过Table API 的方式消费kafka
需要设定消费kafka 开始位置(时间)和结束位置(时间)
请问有没有相关属性 例如DataStream中的setUnbounded属性,
有没有其他办法可以实现类似功能?

设定kafka消费结束位置

2022-03-20 Thread 航飞
我想通过Table API 的方式消费kafka
需要设定消费kafka 开始位置(时间)和结束位置(时间)
请问有没有相关属性 例如DataStream中的setUnbounded属性,
有没有其他办法可以实现类似功能?

Table Api Connectors kafka bounded

2022-03-18 Thread 航飞
现在有一个需求是,创建一个任务,消费kafka,仅消费一个片段 即设定起始消费点和结束消费位置
我看到DataStream Connectors kafka 中有一个setBounded (setUnbounded)属性 ,可以满足需求。
问题;
我想使用 Table API 完成上面的需求,该怎么办?
Table API 是否有相关属性? 
有其他办法满足这个需求吗?
流处理批处理都行。

退订

2021-12-28 Thread 航飞
退订

Re: stateSerializer(1.14.0) not compatible with previous stateSerializer(1.13.1)

2021-12-15 Thread 诗君
Hi, Im using Flink-SQL, so maybe it is the default kryo serializer.

> 2021年12月10日 下午4:15,Roman Khachatryan  写道:
> 
> Hi,
> 
> Compatibility might depend on specific serializers,
> could you please share which serializers you use to access the state?
> 
> Regards,
> Roman
> 
> On Fri, Dec 10, 2021 at 3:41 AM 李诗君  wrote:
>> 
>> I am trying to upgrade my flink cluster version from 1.13.1 to 1.14.0 , I 
>> did like below steps:
>> 
>> 1. savepoint running tasks in version1.13.1
>> 2. stop tasks and upgrade cluster version to 1.14.0
>> 3. recover tasks with savepoints
>> 
>> and this happened:
>> 
>> 
>> java.lang.RuntimeException: Error while getting state
>> at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)
>>  ~[flink-table_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>  ~[flink-fsp-connector-rksc-1.0-SNAPSHOT.jar:?]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
>> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
>> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
>> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
>> Caused by: org.apache.flink.util.StateMigrationException: The new state 
>> serializer 
>> (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e) 
>> must not be incompatible with the old state serializer 
>> (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e).
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>  ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>> ... 15 more



stateSerializer(1.14.0) not compatible with previous stateSerializer(1.13.1)

2021-12-09 Thread 诗君
I am trying to upgrade my flink cluster version from 1.13.1 to 1.14.0 , I did 
like below steps:

1. savepoint running tasks in version1.13.1
2. stop tasks and upgrade cluster version to 1.14.0
3. recover tasks with savepoints

and this happened:


java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)
 ~[flink-table_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-fsp-connector-rksc-1.0-SNAPSHOT.jar:?]
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e) 
must not be incompatible with the old state serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e).
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
... 15 more

Tumbling Windows 窗口可开的最小单位

2021-11-04 Thread 航飞
滚动窗口最小可开多大,100ms?
对性能有什么影响吗?

退订

2021-11-02 Thread 芳奎
退订

felix 

felix_...@163.com

Re:​异步IO算子无法完成checkpoint

2021-10-11 Thread 一飞
图片上传到附件中了
















在 2021-10-12 10:33:12,"李一飞"  写道:

异步IO算子无法完成checkpoint,帮忙看下是什么原因  




 

​异步IO算子无法完成checkpoint

2021-10-11 Thread 一飞
异步IO算子无法完成checkpoint,帮忙看下是什么原因  

Re:Re: table.exec.state.ttl

2021-08-26 Thread 航飞
你好:

我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设?

这样 conf.setString("exec.state.ttl","15 s");
或者这样  conf.setString("stream.exec.state.ttl","15 s");











在 2021-08-26 19:05:07,"Caizhi Weng"  写道:
>Hi!
>
>table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
>里无效。
>
>李航飞  于2021年8月26日周四 下午7:02写道:
>
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15s");
>> conf.setString("table.exec.mini-batch.size","50");
>> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
>> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> execEnv.configure(conf,this.getClass().getClassLoader());
>> EnvironmentSetting setting = ...
>> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
>> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
>> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
>> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
>> 程序是通过StatementSet .execute()执行的


table.exec.state.ttl

2021-08-26 Thread 航飞
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的

mini-batch配置信息没产生效果

2021-08-25 Thread 航飞


Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");


StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.configure(conf,this.getClass().getClassLoader());
EnvironmentSetting setting = ...
StreamTableEnvironment.create(execEnv,setting);


基于flink1.13.2
微批配置信息如上,kafka流数据,测试效果数据计算没有延迟15s 

Re:Re: mini-batch 设置后没效果

2021-08-24 Thread 航飞
设置的延迟15秒,实际测试发现数据来了之后就处理了,没有延迟15秒








在 2021-08-25 11:12:19,"Caizhi Weng"  写道:
>Hi!
>
>所谓的没效果指的是什么现象呢?建议详细描述一下场景与问题。
>
>李航飞  于2021年8月25日周三 上午11:04写道:
>
>> 通过Configuration 设置
>> table.exec.mini-batch.enabled= true;
>> table.exec.mini-batch.allow-latency = 15s;
>> table.exec.mini-batch.size = 50;
>> 上面的配置测试了,没效果,下面的测试
>>  table.exec.mini-batch.allow-latency = 15000
>> 也没效果什么原因?
>>
>>


mini-batch 设置后没效果

2021-08-24 Thread 航飞
通过Configuration 设置
table.exec.mini-batch.enabled= true;
table.exec.mini-batch.allow-latency = 15s;
table.exec.mini-batch.size = 50;
上面的配置测试了,没效果,下面的测试
 table.exec.mini-batch.allow-latency = 15000
也没效果什么原因?



Re:Re: cumulate累加函数输出问题

2021-08-20 Thread 航飞



你好:
我使用的场景是要实时统计一天的数据,在小窗口进行即时输出,延迟不能太高,cumulate window符合要求,tumble window 延迟太高了。
在 2021-08-20 16:01:57,"Caizhi Weng"  写道:
>Hi!
>
>你可能想要的是 tumble window 而不是 cumulate window。
>
>李航飞  于2021年8月20日周五 下午3:26写道:
>
>> 能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出
>>
>>
>> 目前测试发现,输入的数据越多,到下次输出的数据也会越来越多,
>> 不同窗口的计算结果,都会再下次窗口中输出,
>>
>>


cumulate累加函数输出问题

2021-08-20 Thread 航飞
能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出


目前测试发现,输入的数据越多,到下次输出的数据也会越来越多,
不同窗口的计算结果,都会再下次窗口中输出,



Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 航飞
你好:

SELECT window_start,window_end,SUM(price),item 
 FROM TABLE(
CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))

GROUP BY window_start,window_end,item
语句没有问题,正常每1分钟输出一次,过期时间代码已注释,
public ChangelogMode  getChanglogMode(ChangelogMode arg0){
   return ChangelogMode.upsert();
}
实现RedisMapper 方法  落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样
这upsert 不合理啊
在 2021-08-20 11:15:17,"Caizhi Weng"  写道:
>Hi!
>
>之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
>确实应该每分钟收到一条消息。
>
>sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。
>
>李航飞  于2021年8月20日周五 上午10:03写道:
>
>> 你好:
>> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>>
>>
>>
>> 在 2021-08-20 09:10:44,"李航飞"  写道:
>> >你好:
>> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>> >
>> >
>> >我在RichMapFunction接口里面实现open方法
>> >设置过StateTtlConfig;
>> >之后在RedisConmmand.SETEX设置过期时间
>> >都注释了,但upsert()方法还是没效
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>> >>Hi!
>> >>
>> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>> >>
>> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>> >>
>> >>李航飞  于2021年8月19日周四 下午5:03写道:
>> >>
>> >>> 版本 flink1.13.2
>> >>> 具体场景
>> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>> >>>
>> >>>
>> >>> 问题:
>> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>> >>>
>> >>>
>>


Re:Re: Re: cumulate函数和比较函数连用报错

2021-08-19 Thread 航飞
你好:
具体场景是对agg结果之前进行过滤,现在通过create view进行提取过滤了
现在我想通过DynameicTable的方式,以upsert写入redis里面




在 2021-08-20 10:31:18,"Caizhi Weng"  写道:
>Hi!
>
>具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。
>
>李航飞  于2021年8月18日周三 下午4:34写道:
>
>> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
>> 在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>> >Hi!
>> >
>> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>> >
>> >李航飞  于2021年8月18日周三 下午3:55写道:
>> >
>> >> 通过flinksql建立数据处理通道
>> >> SELECT window_start,window_end,SUM(price)
>> >>
>> >> FROM TABLE(
>> >>
>> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL
>> '10'
>> >> MINUTES))
>> >>
>> >> GROUP BY window_start,window_end;
>> >>
>> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> >> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> >> java.lang.UnsupportedOperationException:
>> >> Currently Flink doesn't support individual window table-valued function
>> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>> >>  Please use window table-valued function with aggregate together using
>> >> window_start and window_end as group keys.
>> >> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>> >>
>> >>
>>


Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 航飞
你好:
我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?



在 2021-08-20 09:10:44,"李航飞"  写道:
>你好:
>我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>
>
>我在RichMapFunction接口里面实现open方法
>设置过StateTtlConfig;
>之后在RedisConmmand.SETEX设置过期时间
>都注释了,但upsert()方法还是没效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>>Hi!
>>
>>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>>
>>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>>
>>李航飞  于2021年8月19日周四 下午5:03写道:
>>
>>> 版本 flink1.13.2
>>> 具体场景
>>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>>
>>>
>>> 问题:
>>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>>
>>>


Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 航飞
你好:
我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。


我在RichMapFunction接口里面实现open方法
设置过StateTtlConfig;
之后在RedisConmmand.SETEX设置过期时间
都注释了,但upsert()方法还是没效














在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>Hi!
>
>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>
>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>
>李航飞  于2021年8月19日周四 下午5:03写道:
>
>> 版本 flink1.13.2
>> 具体场景
>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>
>>
>> 问题:
>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>
>>


flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 航飞
版本 flink1.13.2
具体场景
flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次


问题:
测试发现,每1分钟都会输出一次,落地的数据一样,
根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?



Re:Re: cumulate函数和比较函数连用报错

2021-08-18 Thread 航飞
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>Hi!
>
>目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>
>李航飞  于2021年8月18日周三 下午3:55写道:
>
>> 通过flinksql建立数据处理通道
>> SELECT window_start,window_end,SUM(price)
>>
>> FROM TABLE(
>>
>> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10'
>> MINUTES))
>>
>> GROUP BY window_start,window_end;
>>
>> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> java.lang.UnsupportedOperationException:
>> Currently Flink doesn't support individual window table-valued function
>> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>>  Please use window table-valued function with aggregate together using
>> window_start and window_end as group keys.
>> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>>
>>


cumulate函数和比较函数连用报错

2021-08-18 Thread 航飞
通过flinksql建立数据处理通道
SELECT window_start,window_end,SUM(price)

FROM TABLE(

CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' 
MINUTES))

GROUP BY window_start,window_end;

大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
关键一步是 StatementSet对象 sta.execute() 执行报错
java.lang.UnsupportedOperationException:
Currently Flink doesn't support individual window table-valued function 
CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
 Please use window table-valued function with aggregate together using 
window_start and window_end as group keys.
执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。



FLink 1.13.2 use TVF data is not correct

2021-08-13 Thread 占阳
Hi all:
  我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql:
String message = " CREATE TABLE test(\n" +
"gid VARCHAR COMMENT 'uuid 唯一标识',\n" +
"ip VARCHAR COMMENT 'ip 地址',\n" +
"business_no VARCHAR COMMENT '商户号',\n" +
"rtime  BIGINT ,\n" +
"event_time as TO_TIMESTAMP_LTZ(rtime,3)  ,\n" +
"WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE  
, \n"+
"ts AS PROCTIME () , \n"+
"   `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  \n"+
" ) \n" +
"WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'test',\n" +
"'properties.group.id' = 'consumer-02',\n" +
"'properties.bootstrap.servers' = 'XXX:9092',\n" +
"'properties.security.protocol' = 'SASL_PLAINTEXT',\n" +
"'properties.sasl.mechanism' = 'GSSAPI',\n" +
"'properties.sasl.kerberos.service.name' = 'kafka',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
"'format' = 'json'\n" +
")";
//" 




String message_cnts="SELECT " +
"ip ," +
"business_no as business_no ," +
" min(record_time) as record_time," +
" count(distinct gid) as total_call_num \n" +
",window_start, window_end" +
"  FROM TABLE(\n" +
"TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))\n" +
"  GROUP BY window_start, window_end, GROUPING SETS ((business_no ,ip)) ";





Re:Re:Re: cumulate 不能和比较函数连用

2021-08-11 Thread 航飞
抱歉,sql语句是我手打的,没注意到,我确定和这个descriptor没关系。我去掉where条件就能正常运行,同时,我测试in,not in 
函数的时候,会报同样的错误。
At 2021-08-11 13:51:16, "李航飞"  wrote:
>org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
>Could not execute application: 
>org.apache.flink.client.program.ProgramInvocationException: The main 
>method caused an error: Currently Flink doesn't support individual window 
>table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 
>min]). Please use window table-valued function with aggregate together 
>using window_start and window_end as group keys.at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-clients_2.12-1.13.1.jar:1.13.1] at 
>org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
>~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
>org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~
>在 2021-08-11 12:44:38,"Caizhi Weng"  写道:
>>Hi!
>>
>>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢?
>>
>>李航飞  于2021年8月11日周三 上午11:41写道:
>>
>>> sql语句如下:
>>> select count(clicknum) as num
>>>
>>> from table(
>>>
>>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes,
>>> interval '10' minutes))
>>>
>>> where clicknum <>'-99'
>>>
>>> group by window_start,window_end
>>>
>>>
>>> 报错 信息:
>>> Flink doesn't support individual window table-valued function
>>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...
>>>
>>>
>>> 请问如何解决,谢谢


Re:Re: cumulate 不能和比较函数连用

2021-08-10 Thread 航飞
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Currently Flink doesn't support individual window 
table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 
min]). Please use window table-valued function with aggregate together 
using window_start and window_end as group keys. at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-clients_2.12-1.13.1.jar:1.13.1] at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~
在 2021-08-11 12:44:38,"Caizhi Weng"  写道:
>Hi!
>
>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢?
>
>李航飞  于2021年8月11日周三 上午11:41写道:
>
>> sql语句如下:
>> select count(clicknum) as num
>>
>> from table(
>>
>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes,
>> interval '10' minutes))
>>
>> where clicknum <>'-99'
>>
>> group by window_start,window_end
>>
>>
>> 报错 信息:
>> Flink doesn't support individual window table-valued function
>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...
>>
>>
>> 请问如何解决,谢谢


cumulate 不能和比较函数连用

2021-08-10 Thread 航飞
sql语句如下:
select count(clicknum) as num 

from table(

cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, interval 
'10' minutes))

where clicknum <>'-99'

group by window_start,window_end


报错 信息:
Flink doesn't support individual window table-valued function 
cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...


请问如何解决,谢谢

退订

2021-06-03 Thread 朋辉
退订


| |
李朋辉
|
|
邮箱:lipengh...@126.com
|

签名由 网易邮箱大师 定制

退订

2021-06-03 Thread 朋辉





| |
李朋辉
|
|
邮箱:lipengh...@126.com
|

签名由 网易邮箱大师 定制

- 转发的邮件 -

发件人: Fighting
发送日期: 2021年06月02日 11:00
收件人: user-zh
抄送人:
主题: 退订
退订

Flink写clickhouse怎么实现精准一次性

2021-05-07 Thread 一飞
请问   Flink写clickhouse怎么实现精准一次性,有啥好办法呀

Re:请问在使用processfunction 中的processelement()和onTimer()需要考虑并发问题吗?

2021-04-22 Thread 一飞
这两方法是同步的方式执行的,同时只能执行一个
在 2021-04-22 15:35:07,"x2009438"  写道:
>如题,谢谢各位。
>
>
>发自我的iPhone


Re:Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 Thread 一飞
明白了,谢谢~
在 2021-04-21 19:58:23,"Peihui He"  写道:
>fetch.min.bytes
>fetch.wait.max.ms
>还可以用着两个参数控制下的
>
>熊云昆  于2021年4月21日周三 下午7:10写道:
>
>> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2021年04月20日 18:19,李一飞 写道:
>> flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>> 最好分流、批场景回答一下,谢谢!


Re:回复:flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 Thread 一飞
谢谢
在 2021-04-21 19:10:17,"熊云昆"  写道:
>有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>
>
>| |
>熊云昆
>|
>|
>邮箱:xiongyun...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2021年04月20日 18:19,李一飞 写道:
>flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>最好分流、批场景回答一下,谢谢!


flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-20 Thread 一飞
flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
最好分流、批场景回答一下,谢谢!

邮件组退订

2021-03-20 Thread 朋辉
您好,希望退订邮件组邮件,使用新的邮件接收。

退订

2021-02-26 Thread 延念


退订

groupBy和keyBy的使用方式不同吗?

2021-02-25 Thread
case class Student(name: String, age: Int,teacher:Teacher)
case class Teacher(name:String,room:(Int,Int,Int),salary:Int)

def main(args: Array[String]): Unit = {
  val teacher = Teacher("teacher-w",(1,2,3),99)
  val students = List(Student("a",11,teacher),Student("b",22,teacher))
  val benv = ExecutionEnvironment.getExecutionEnvironment
  benv.fromElements(students:_*).groupBy("name").sum("teacher.salary").print()
}

以上代码会报错:

Fields 'teacher.salary' are not valid for
'com.lx.list.List1$Student(name: String, age: Integer, teacher:
com.lx.list.List1$Teacher(name: String, room: scala.Tuple3(_1:
Integer, _2: Integer, _3: Integer), salary: Integer))'.


如果把上面的groupBy换成StreamingAip 的 keyBy就可以运行通过
这是为什么?而且据我观察,好像不是groupBy的问题,而是sum方法不认嵌套类型


flink的算子没有类似于spark的cache操作吗?

2021-01-07 Thread
HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作

val env = getBatchEnv
val ds = env.fromElements("a","b","c")

val ds2 = ds.map(x=>{
  println("map op")
  x.charAt(0).toInt+1
})

//此操作会打印三遍map op
ds2.print()

//此操作又会打印三遍map op
ds2.filter(_>100).print()


Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-06 Thread
可以尝试用yarn application -list 去定期查找你的任务来判断任务是否挂掉

bradyMk  于2021年1月6日周三 下午4:35写道:

> Hi,请教大家一个问题:
>
> 目前使用grafana监控flink的作业,想实现一个任务挂掉就报警的功能,初步想法是:监控checkpoint
> size的指标,一旦这个指标为0,就认为任务挂掉,但实际操作后,发现了两个问题:
>
> ① 如果kill掉任务,grafana上的flink所有指标都会一直保持最后接收到的值不变;
> ② 如果cancel掉任务,grafana上的flink所有指标都会突然中断;
>
> 所以,我上面说的想法永远都不会出发告警,因为这个checkpoint size的指标在任务挂掉也不会归为0值;
>
>
> 我又尝试了用一分钟前的job_uptime减去一分钟后的job_uptime,但是这样报警并不优雅,在任务刚启动时会有误报,因为任务刚启动时,一分钟前是没有数据的。
>
> 所以现在很疑惑,请教一下大家如果用grafana监控flink作业的,该选用什么样的指标和用什么规则,可以优雅的报警呢?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: taskmanager.out配置滚动

2020-12-22 Thread
我们修改了flinkonyarn, 使得taskmanager.out 和 taskmanager.err /
jobmanager.err支持文件滚动。

zilong xiao  于2020年12月23日周三 下午2:05写道:

> 为啥1.11可以呢?
>
> hdxg1101300123  于2020年12月23日周三 下午1:51写道:
>
> > 1.11可以
> >
> >
> >
> > 发自vivo智能手机
> > > 之前在社区我提过一次redirect的方案,但其他人有一些concerns,可以参考一下
> > >
> > > https://github.com/apache/flink/pull/11839#pullrequestreview-399769862
> > >
> > > zilong xiao  于2020年12月22日周二 下午4:13写道:
> > >
> > > > 恩恩,这个场景是有的,目前看是可以通过重定向后实现,follow issue~
> > > >
> > > > 李杰  于2020年12月22日周二 下午3:58写道:
> > > >
> > > > > Hi,
> > > > > 这个功能我们之前做过,可以看下这里。
> > > > > https://issues.apache.org/jira/browse/FLINK-20713
> > > > >
> > > > > zilong xiao  于2020年12月3日周四 下午7:50写道:
> > > > >
> > > > > > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> > > > > >
> > > > >
> > > >
> >
>


Re: taskmanager.out配置滚动

2020-12-21 Thread
Hi,
这个功能我们之前做过,可以看下这里。
https://issues.apache.org/jira/browse/FLINK-20713

zilong xiao  于2020年12月3日周四 下午7:50写道:

> 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
>


Flink1.12.0/flink1.11.0版本出现FLINK-19677的问题

2020-12-17 Thread 延延
你好,我在测试1.12版本时,在虚拟机上部署了一个jobmanager,三个taskmanager;
环境信息:
1.centos7/虚拟机,双网卡(一个.4网段,一个.18网段)
2.jobmanager 1节点;taskmanager3节点,未开启高可用


配置中发现hostname 
对应的是其中的一个网卡的(18网段)ip;而flink的masters/workers文件全部配置的是.4网段的ip,最重要的是jobmanager.rpc.address也配置的是.4网段,启动集群之后提交WordCount示例,提交不成功,报错与FLINK-19677一样
在节点上重新部署flink1.10.0版本,使用相同的ip配置,可以正常提交任务,并运行成功


对于flink1.12.0版本报错最后的解决办法是,换成把ip换成18网段,或者改成hostname问题都能解决,所以这个问题应该是flink1.11.0开始引入的问题





flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?

2020-12-15 Thread 世钰
flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?
elasticsearch7.0






--

--

李世钰

Mail:m...@lishiyu.cn

TEL:18801236165

Motto:让身边的人快乐,你的身边就充满快乐!








 



Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 Thread
谢谢

发自我的iPhone

> 在 2020年12月10日,10:49,Jark Wu  写道:
> 
> 看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client
> 界面上,而不会插入到 postgres 中。
> 
> 你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式
> 还不支持这个类型。
> 
> 这个问题的解决可以关注下这个 issue:https://issues.apache.org/jira/browse/FLINK-17948
> 
> Best,
> Jark
> 
>> On Tue, 8 Dec 2020 at 19:32, 李轲  wrote:
>> 
>> 报错信息:
>> Exception in thread "main"
>> org.apache.flink.table.client.SqlClientException: Unexpected exception.
>> This is a bug. Please consider filing an issue.
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>> Caused by: org.apache.flink.table.api.TableException: Unsupported
>> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL'
>> (conversion class: java.time.Instant) to type information. Only data types
>> that originated from type information fully support a reverse conversion.
>> at
>> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
>> at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
>> at
>> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
>> at
>> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
>> at
>> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
>> at
>> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
>> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
>> at
>> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.(CollectStreamResult.java:71)
>> at
>> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:101)
>> at
>> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:129)
>> at
>> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
>> at
>> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
>> at
>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> 在 2020-12-08 19:24:43,"李轲"  写道:
>>> 项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
>>> 在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
>>> select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
>>> 有没有什么转换方法?或者只插入部分数据的方法?
>> 
> 



Re:关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 Thread
报错信息:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL' (conversion class: 
java.time.Instant) to type information. Only data types that originated from 
type information fully support a reverse conversion.
at 
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at 
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
at 
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
at 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
at 
org.apache.flink.table.client.gateway.local.result.CollectStreamResult.(CollectStreamResult.java:71)
at 
org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:101)
at 
org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:129)
at 
org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)

















在 2020-12-08 19:24:43,"李轲"  写道:
>项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
>在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
>select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
>有没有什么转换方法?或者只插入部分数据的方法?


关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 Thread
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
有没有什么转换方法?或者只插入部分数据的方法?

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-07 Thread
可以试试ROW

发自我的iPhone

> 在 2020年12月7日,15:43,xiao cai  写道:
> 
> String不行,取出来的值是null
> 
> 
> Original Message 
> Sender: silence
> Recipient: user-zh
> Date: Monday, Dec 7, 2020 14:26
> Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
> 
> 
> 可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/



Re: SQL解析复杂JSON问题

2020-12-04 Thread
如果1.11想做自定义解析和映射,只能通过udf么?

发自我的iPhone

> 在 2020年12月4日,16:52,Wei Zhong  写道:
> 
> Hi 你好,
> 
> 这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
> schema不是完全相同的话,需要手动写json-schema:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
>  
> 
> 
> 
>> 在 2020年12月4日,16:39,guaishushu1...@163.com 写道:
>> 
>> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>>   id VARCHAR,
>>   timestam VARCHAR,
>>   user_info ROW(user_id string, name string ),
>>   jsonArray ARRAY
>> ) WITH (
>>   'connector.type' = 'kafka',
>>   'connector.version' = 'universal',
>>   'connector.topic' = 'complex_string',
>>   'connector.properties.zookeeper.connect' = 'venn:2181',
>>   'connector.properties.bootstrap.servers' = 'venn:9092',
>>   'connector.startup-mode' = 'earliest-offset',
>>   'format.type' = 'json',
>>   'format.json-schema' = '{
>>   "type": "object",
>>   "properties": {
>>  "id": {type: "string"},
>>  "timestam": {type: "string"},
>>  "user_info":{type: "object",
>>  "properties" : {
>>  "user_id" : {type:"string"},
>>  "name":{type:"string"}
>>  }
>>},
>>   "jsonArray":{"type": "array",
>>"items": {
>> "type": "object",
>> "properties" : {
>> "user_id222" : {type:"string"},
>> "name222" : {type:"string"}
>>}
>> }
>>}
>>   }
>>   }'
>> );
>> 
>> 
>> 
>> 
>> guaishushu1...@163.com
> 
> 



Re: 【Flink SQL】无法启动 env.yaml

2020-12-01 Thread
谢谢,了解了

发自我的iPhone

> 在 2020年12月1日,23:31,Leonard Xu  写道:
> 
> Hi, 李轲
> 
> 这是因为yml只支持1.10之前老的connector,写法是connector.type=‘filesystem’, 
> 1.11之后的新connector都是 connetor=‘filesystem’, 除了简化了写法外,前者的工厂方法和后者的也不一样, 
> 所以通过yml定义的新的connector是不能被老的工厂 SPI 
> 发现的。而在yml中定义表从1.11开始就是不推荐了,因为已经支持了用DDL这种纯SQL的方式定义表。
> 
> 推荐你可以拉起sql-client后,用DDL的方式建表
> 
> 祝好
> Leonard
> 
> 
> 
>> 在 2020年12月1日,21:43,李轲  写道:
>> 
>> 在服务器上试用sql-client时,启动指令如下:
>> 
>> ./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d 
>> /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml  -e 
>> /root/flink-sql-client/sql-client-demo.yml
>> 
>> 配置如下:
>> 
>> # 定义表
>> tables:
>>  - name: SourceTable
>>type: source-table
>>update-mode: append
>>connector:
>>  type: datagen
>>  rows-per-second: 5
>>  fields:
>>f_sequence:
>>  kind: sequence
>>  start: 1
>>  end: 1000
>>f_random:
>>  min: 1
>>  max: 1000
>>f_random_str:
>>  length: 10
>>schema:
>>  - name: f_sequence
>>data-type: INT
>>  - name: f_random
>>data-type: INT
>>  - name: f_random_str
>>data-type: STRING
>> 
>> 遇到了如下报错:
>> 
>> Reading default environment from: 
>> file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml
>> Reading session environment from: 
>> file:/root/flink-sql-client/sql-client-demo.yml
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
>> Unexpected exception. This is a bug. Please consider filing an issue.
>>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
>> Could not create execution context.
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
>>at 
>> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
>>at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
>> not find a suitable table factory for 
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> Reason: Required context properties mismatch.
>> 
>> The matching candidates:
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> Missing properties:
>> format.type=csv
>> Mismatched properties:
>> 'connector.type' expects 'filesystem', but is 'datagen'
>> 
>> The following properties are requested:
>> connector.fields.f_random.max=1000
>> connector.fields.f_random.min=1
>> connector.fields.f_random_str.length=10
>> connector.fields.f_sequence.end=1000
>> connector.fields.f_sequence.kind=sequence
>> connector.fields.f_sequence.start=1
>> connector.rows-per-second=5
>> connector.type=datagen
>> schema.0.data-type=INT
>> schema.0.name=f_sequence
>> schema.1.data-type=INT
>> schema.1.name=f_random
>> schema.2.data-type=STRING
>> schema.2.name=f_random_str
>> update-mode=append
>> 
>> The following factories have been considered:
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.table.filesystem.FileSystemTableFactory
>>at 
>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>at 
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>at 
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>at 
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638)
>>at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
>>at 
>> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
>>... 3 more
>> 
>> 看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 
>> 也需要自己导包的么?哪里有更详细的资料,求指点,谢谢
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 



【Flink SQL】无法启动 env.yaml

2020-12-01 Thread
在服务器上试用sql-client时,启动指令如下:



./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d 
/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml  -e 
/root/flink-sql-client/sql-client-demo.yml


配置如下:


# 定义表
tables:
  - name: SourceTable
type: source-table
update-mode: append
connector:
  type: datagen
  rows-per-second: 5
  fields:
f_sequence:
  kind: sequence
  start: 1
  end: 1000
f_random:
  min: 1
  max: 1000
f_random_str:
  length: 10
schema:
  - name: f_sequence
data-type: INT
  - name: f_random
data-type: INT
  - name: f_random_str
data-type: STRING


遇到了如下报错:


Reading default environment from: 
file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml
Reading session environment from: 
file:/root/flink-sql-client/sql-client-demo.yml




Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.


Reason: Required context properties mismatch.


The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Missing properties:
format.type=csv
Mismatched properties:
'connector.type' expects 'filesystem', but is 'datagen'


The following properties are requested:
connector.fields.f_random.max=1000
connector.fields.f_random.min=1
connector.fields.f_random_str.length=10
connector.fields.f_sequence.end=1000
connector.fields.f_sequence.kind=sequence
connector.fields.f_sequence.start=1
connector.rows-per-second=5
connector.type=datagen
schema.0.data-type=INT
schema.0.name=f_sequence
schema.1.data-type=INT
schema.1.name=f_random
schema.2.data-type=STRING
schema.2.name=f_random_str
update-mode=append


The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
... 3 more


看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 
也需要自己导包的么?哪里有更详细的资料,求指点,谢谢




 

使用sql-client中遇到的一点问题

2020-12-01 Thread
在服务器上试用sql-client时,启动指令如下:


./sql-client.sh embedded -d /root/flink-sql-client/sql-client-demo.yml


配置见附件,读取的文件也在附件中


遇到了如下报错:


Reading default environment from: 
file:/root/flink-sql-client/sql-client-demo.yml
No session environment specified.




Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.


Reason: Required context properties mismatch.


The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'format.type' expects 'csv', but is 'json'


The following properties are requested:
connector.path=/root/flink-sql-client
connector.type=filesystem
format.schema=ROW(name STRING, count INT)
format.type=json
schema.0.data-type=STRING
schema.0.name=name
schema.1.data-type=INT
schema.1.name=count
update-mode=append


The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
... 3 more


看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 
也需要自己导包的么?哪里有更详细的资料,求指点,谢谢{"name":"Mr.J","count":3}
{"name":"Mr.L","count":2}
{"name":"Mr.J","count":2}
{"name":"Mr.L","count":1}
{"name":"Mr.J","count":1}

退订

2020-11-23 Thread



flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

2020-11-14 Thread 世钰
您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
create table kafka_table (
`log_id` string,
event_date timestamp(3),
process_time as PROCTIME(),
ts as event_date,
watermark for ts as ts - interval '1' second
) with (
'connector' = 'kafka',
'topic' = 'kafka_table',
'properties.bootstrap.servers' = '10.2.12.3:9092',
'properties.group.id' = 'tmp-log-consumer003',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
执行的sql是
select TUMBLE_START(kafka_table.event_date, INTERVAL '10' 
SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' 
SECOND),src_ip,count(dest_ip) from kafka_table group by 
TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip




select log_id,process_time,ts from kafka_table查询的表结构如下
表结构为
root
|-- log_id: STRING
|-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
|-- ts: TIMESTAMP(3) *ROWTIME*


输入数据为
log_id,process_time,ts
13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806

flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

2020-11-13 Thread 世钰
flink版本 flink1.11


flink sql连接kafka
create table kafka_table (
log_id string,
event_time bigint,
process_time as PROCTIME(),
ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
watermark for ts as ts - interval '1' second
) with (
'connector' = 'kafka',
'topic' = 'kafka_table',
'properties.bootstrap.servers' = '10.2.12.3:9092',
'properties.group.id' = 'tmp-log-consumer003',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)





使用窗口聚合的代码
val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL '10' 
SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' SECOND, 
INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group by 
HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' 
SECOND),kafka_table.src_ip")


相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
求问是什么原因不能触发窗口或者我的用法有什么问题吗

flink sql 函数FIRST_VALUE调用报错

2020-11-12 Thread 世钰
1. FLINK版本 flink1.11




2. 使用的是useBlinkPlanner




3.执行sql

SELECT FIRST_VALUE(kafka_table.src_ip) AS 
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS 
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' 
SECOND(2), INTERVAL '2' MINUTE(1)) AS 
__window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), 
INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS 
__origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' 
GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' 
MINUTE(1)),kafka_table.src_ip




4.报错信息

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Function class 
'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
 does not implement at least one method named 'merge' which is public, not 
abstract and (in case of table functions) not static.

at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)

at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)

at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)

at 
org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)

at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)


退订

2020-11-05 Thread
退订

退订

2020-10-28 Thread 国鹏
退订

关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined in streaming topology. Cannot generate StreamGraph.的问题

2020-10-25 Thread 世钰
关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined 
in streaming topology. Cannot generate StreamGraph.的问题




-| 程序内部使用Table API同时也有 Table转为 Datastream的场景。

-|程序内部有使用flinksql 读写kafka,从而执行sqlUpdate 










尝试使用新版api 只使用tableEnv.executeSql 从而不加dataStreamEnv.execute 
和tableEnv.execute 但是会出现程序执行一个开头就结束了而且没有异常。




求问新老API兼容吗?Table和Datastream同时存在的时候使用dataStreamEnv.execute 
还是tableEnv.execute?

Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-28 Thread 加燕
Flink batch 模式消费hdfs上的文件,并做了一个word count 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
java.lang.reflect.UndeclaredThrowableException: null
at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource) ~[?:?]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds the 
maximum akka framesize.
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
... 28 more
我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
请求帮助。

退订

2020-09-15 Thread 国鹏
退订

flink hive批量作业报FileNotFoundException

2020-09-14 Thread 佳宸
大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
版本是1.11.1
Caused by: java.io.FileNotFoundException: File
hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
does not exist.
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题


来自李国鹏的邮件

2020-08-31 Thread 国鹏
退订

来自李国鹏的邮件

2020-08-31 Thread 国鹏
Unsubscribe

退订

2020-08-13 Thread
退订

flink 1.11 udtf只能通过注解定义输出字段个数和类型吗?

2020-08-12 Thread
各位大佬你们好,我想请教一个问题:
flink 1.11 
udtf只能通过注解定义输出字段个数和类型吗,好像不能再通过flink1.10那样重写getResultType方法来定义输出字段类型了,flink1.11里使用getResultType会报错如下:
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
a signature to output mapping.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158)
... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a 
type inference from method:
public void 
com.skyon.main.TestFunction.eval(java.lang.String,java.lang.String,long)
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110)
... 24 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
data type from 'class org.apache.flink.types.Row' in generic class 
'org.apache.flink.table.functions.TableFunction' in class 
com.skyon.main.TestFunction. Please pass the required data type manually or 
allow RAW types.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:119)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:443)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:309)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164)
... 25 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a 
data type from a pure 'org.apache.flink.types.Row' class. Please use 
annotations to define field names and field types.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238)
... 32 more

 有什么办法动态定义udtf的输出字段个数和类型呢,谢谢了!

flink 1.11 udtf动态定义返回类型

2020-08-12 Thread
flink 1.11 udtf可以像1.10那样自定义返回类型不我希望可以像flink 1.10这样:

@Override
  public TypeInformation

Re: 维表实现无法加载配置文件

2020-08-04 Thread
可以用一个静态类加载资源,然后返回一个属性对象properties。

> 在 2020年8月4日,下午4:55,"guaishushu1...@163.com"  写道:
> 
> 维表的Function是集成TableFunction,这样就没办法加载配置文件,大佬们有没有什么好的方式呀???
> 
> 
> 
> guaishushu1...@163.com


Re: Flink sql 转义字符问题

2020-07-31 Thread
加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。

> 在 2020年7月31日,下午8:13,zilong xiao  写道:
> 
> SPLIT_INDEX(${xxx}, ';',
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread
改成update模式,然后也可以修改唯一主键为自然键

> 在 2020年7月31日,下午4:13,chenxuying  写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY()'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 



Re: Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread 佳宸
好的,谢谢~~~

JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:

> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
>public static void main(String[] arg) throws Exception{
>StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>String name= "myhive";
>String defaultDatabase = "default";
>String hiveConfDir =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
>String version = "3.1.2";
>
>HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>bsTableEnv.registerCatalog("myhive", hive);
>bsTableEnv.useCatalog("myhive");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>bsTableEnv.executeSql("CREATE TABLE topic_products (" +
>"  id BIGINT ," +
>"  order_id STRING," +
>"  amount DECIMAL(10, 2)," +
>"  create_time TIMESTAMP " +
>") WITH (" +
>" 'connector' = 'kafka'," +
>" 'topic' = 'order.test'," +
>" 'properties.bootstrap.servers' = 'localhost:9092'," +
>" 'properties.group.id' = 'testGroup'," +
>" 'scan.startup.mode' = 'earliest-offset', " +
>" 'format' = 'json'  " +
>")");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
>"  id BIGINT ," +
>"  order_id STRING," +
>"  amount DECIMAL(10, 2)" +
>"  )");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
>"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
>"id, " +
>"order_id, " +
>"amount " +
>"FROM topic_products");
>
>Table table1 = bsTableEnv.from("hive_sink_table_streaming");
>table1.executeInsert("print_table");
>}
> }
>


退订

2020-07-16 Thread 国鹏
退订

Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread 佳宸
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
public static void main(String[] arg) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String name= "myhive";
String defaultDatabase = "default";
String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
String version = "3.1.2";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
bsTableEnv.registerCatalog("myhive", hive);
bsTableEnv.useCatalog("myhive");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE topic_products (" +
"  id BIGINT ," +
"  order_id STRING," +
"  amount DECIMAL(10, 2)," +
"  create_time TIMESTAMP " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order.test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'json'  " +
")");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
"  id BIGINT ," +
"  order_id STRING," +
"  amount DECIMAL(10, 2)" +
"  )");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
"id, " +
"order_id, " +
"amount " +
"FROM topic_products");

Table table1 = bsTableEnv.from("hive_sink_table_streaming");
table1.executeInsert("print_table");
}
}


  1   2   >