这应该是时区差异导致的,flink的FROM_UNIXTIME用的是UTC时区,hive的FROM_UNIXTIME用的是系统时区。
On Thu, Sep 24, 2020 at 4:16 PM nashcen <2415370...@qq.com> wrote:
> Kafka 表 定义如下:
> CREATE TABLE `dc_ods`.`ods_dcpoints_prod_kafka_source` (
> `assetSpecId` STRING,
> `dcnum` STRING,
> `monitorType` STRING,
> `tagNo`
在datastream中针对事件时间而迟到的数据可以通过sideOutPut收集出来做二次处理补偿结果
但是针对flink sql 计算过程中因为延迟而丢弃的数据,应该如何获取呢 ?
特别是一些金融交易额的计算,希望数据延迟了也不会影响最终结果
谢谢
gygz...@163.com
环境:
tag release-1.11.2
commit fe361357
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Java version: 1.8.0_251, vendor: Oracle Corporation, runtime:
/Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name:
谢谢 tison 和 zilong xiao,将项目的scala版本改为2.11之后可以编译成功!
因为我一开始拉Flink版本是release-0.4,该版本使用的scala是2.10。后切到分支release-1.4后编译成功,确实和scala版本有关。
thx
在2020年9月23日 19:25,zilong xiao 写道:
Hi Natasha,
在mvn命令中加上这两个参数试试看 -Dscala-2.12 -Pscala-2.12
Natasha <13631230...@163.com> 于2020年9月23日周三 下午4:00写道:
Hi All,
preCommit??commit??
Hi
>看debug日志99%是CalcMergeRule , 我看blink用的是FlinkCalcMergeRule ,
> 在matches方法里加了些对none-deterministic表达式的过滤,,
> 于是我将CalcMergeRule替换成FlinkCalcMergeRule, 并在FlinkRuleSets里做了更新 ,
> 重跑后debug日志是99%是更新过的FlinkCalcMergeRule
虽然debug日志看是CalcMergeRule一直在触发,但替换CalcMergeRule后也没有改变,
推测是其他rule引起的。 有特别的需要要使用old
Hi
> 通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete
> hbase的某条rowkey数据,导致客户端查不到数据?
> 我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ?
是的,group by 算子会像下游 hbase sink发retract消息,hbase
sink处理retract消息的实现就是先delete再insert,所以去查hbase的时候就会碰到你说的有时查不到的情况。
祝好
Leonard
而且按照string无法接受"a":a,bigint在 "t":"as"情况下会为null。
这么来看,bigint反而比string还通用了,可以将非法数据通过null录入进来。
string方式反而丢失部分信息了还。
赵一旦 于2020年9月25日周五 上午10:57写道:
> 今天做个测试,发现一些类型的特点,想确认下。
>
> 目前来看,kafka数据的2个配置,(1)不存在字段设置null(2)解析错误忽略。
>
>
> 发现如下几个特征
> (1)顶层字段字符串情况,实际数据为 "a": "a" 合法,"a":12不合法。
> (2)非顶层字段,比如d map,d中的字段
今天做个测试,发现一些类型的特点,想确认下。
目前来看,kafka数据的2个配置,(1)不存在字段设置null(2)解析错误忽略。
发现如下几个特征
(1)顶层字段字符串情况,实际数据为 "a": "a" 合法,"a":12不合法。
(2)非顶层字段,比如d map,d中的字段 "b": 12则是合法的。
(3)t字段为bigint类型,并且由此衍生了eventtime。
如果数据为 t: abc 则数据直接非法被忽略。
如果数据为t: "abc",则t被转为null?
如果要抽取对应的 type,需要用 format = json, 把 json 的完整数据结构在 DDL 中声明出来(包括 type)。
目前 canal-json 是自动将 changelog 转成了 Flink 的 insert/update/delete,而这个 change
flag 目前是不对用户暴露的。
Best,
Jark
On Fri, 25 Sep 2020 at 09:39, air23 wrote:
> 你好
> flink canal-json 如何获取每条数据是updata insert delete ,我ddl插件kafka表 用对应的type取
我觉得是个挺好的需求,有点类似于 Kafka 的 multi topic 功能,可以先建个 issue 收集下大家的需求。
Best,
Jark
On Thu, 24 Sep 2020 at 17:26, Peihui He wrote:
> Hi, all
>
> 测试发现flink sql jdbc mysql 的table-name 不能通过正则来读取多个表,这些表按月份划分的。
> 后续会支持不?
>
> Best Wishes.
>
Hi,
connect API 目前有很多问题。建议使用 DDL 的方式注册表。
Best,
Jark
On Thu, 24 Sep 2020 at 16:39, marble.zh...@coinflex.com.INVALID
wrote:
> hi,
>
> 在用sql api时遇到下面这个exception,
> Processing time attribute 'proctime' is not of type SQL_TIMESTAMP.
>
> 我是用这个方式map出来的,
>
你好
flink canal-json 如何获取每条数据是updata insert delete ,我ddl插件kafka表 用对应的type取 都是为null
这个操作类型 有办法取到吗?谢谢
这个水印比正常北京时间多八小时的问题,我是在1.11刚发布的时候测试发现的,我看了看源码,我的理解是把从sql里获取的东八区的时间,当成了utc时间来处理,所以会比北京时间多八小时。
BestJun
-- 原始邮件 --
发件人: Joker
这个情况现在是支持的,可以用类似于这种写法:
```SQL
CREATE TABLE MyTable (
a11 INT,
a12 VARCHAR,
a13 ROW
) WITH (...)
```
Roc Marshal 于2020年9月24日周四 下午7:54写道:
> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
> {
> "a11":1,
> "a12":"1",
> "a13":{
> "a21":1,
>
请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
{
"a11":1,
"a12":"1",
"a13":{
"a21":1,
"a22":1,
"a23":"1"}
}
比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?
谢谢
是的,如果手动减去八小时,使用eventtime落地的时候,就成了utc时区的值,但是你可以在SQL逻辑中纠正到东八区。CONVERT_TZ(DATE_FORMAT(event_time,'-MM-dd
HH:mm:ss'),'UTC','GMT+8:00’)。 现在 引入另一个问题,这种情况下watermark为何不需要纠正就自动加了8小时?
在2020年09月24日 19:34,Jun Zhang<825875...@qq.com> 写道:
Hi jack
Hi jack
如果我手动减去八小时,那么是不是使用eventtime落地的时候,就成了utc时区的值,比如现在是北京时间10点,那么我落地的时间将会是两点,对于使用东八区的人来说,会产生误解。
BestJun
-- 原始邮件 --
发件人: Jark Wu
代码在IDEA运行成功,打成jar包,提交到yarn运行报错。一开始以为是少包,后来把所有依赖包都打了进来,全局dependency.scope设为compile,依然报错。
启动命令:
nohup \
$FLINK_HOME/bin/flink run \
--class
com.athub.dcpoints.scala.connector.table.hive.OdsDcpointsProdKafkaFlinkHiveApp
\
--target yarn-per-job \
--jobmanager yarn-cluster \
--yarnjobManagerMemory
您好,请问pyflink如何提交作业到CDP集群中运行,有没有示例演示,感谢!
您好,请问pyflink如何提交作业到集群运行,有没有示例演示,感谢!
Hi, all
测试发现flink sql jdbc mysql 的table-name 不能通过正则来读取多个表,这些表按月份划分的。
后续会支持不?
Best Wishes.
磊哥,我想再多问一个问题。
若topic只有一个分区的情况下。。
我这边压了一下,网卡流量大概是30Mbit/s,不知道如何提高这个消费速度才好,压测程序是个很简单的source,并丢弃的处理。
-邮件原件-
发件人: 范超
发送时间: 2020年9月24日 星期四 10:49
收件人: user-zh@flink.apache.org
主题: 答复: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
感谢磊哥,后来发现确实是这个问题导致。
??utc??
----
??:
"user-zh"
hi,
在用sql api时遇到下面这个exception,
Processing time attribute 'proctime' is not of type SQL_TIMESTAMP.
我是用这个方式map出来的,
tEnv.connect(getPulsarDescriptor(inTopic))
.withSchema( new Schema()
.field("marketId", DataTypes.BIGINT())
+1 同样遇到这样的问题,Hive的小时分区和事件时间对不上,差了8个小时
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Kafka 表 定义如下:
CREATE TABLE `dc_ods`.`ods_dcpoints_prod_kafka_source` (
`assetSpecId` STRING,
`dcnum` STRING,
`monitorType` STRING,
`tagNo` STRING,
`value` STRING,
* `updateTime` BIGINT,
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(`updateTime` / 1000,'-MM-dd
HH:mm:ss')),*
WATERMARK
我们一般提升作业吞吐能力的步骤就是看作业的反压情况,
- 如果作业完全没有反压,说明此时处理能力大于上游数据产生速度
- 如果作业有反压,就具体看下反压的是哪个算子,存在什么瓶颈。比如网络IO、磁盘IO、CPU;
当然,有时候内存问题也会表现为CPU现象,比如GC比较严重
范超 于2020年9月24日周四 上午10:48写道:
> 谢谢Benchao哥回复。
>
> 这几天一直忙着压测这个问题。
> 经多轮压测(先灌满kafka数据),再去消费。
> 发现确实是您说的问题中的第三个情况
> 由于kafka的topic只开了一个partition
>
>
FROM_UNIXTIME 接收的第一个参数是 unix time 值, 可以理解为UTC时区的值。
所以如果你的 long 值是北京时间下得到的,那么需要自己手动减一个8时区。
Best,
Jark
On Thu, 24 Sep 2020 at 13:54, Joker wrote:
> 不好意思,插入个问题。ts AS TO_TIMESTAMP(FROM_UNIXTIME(create_time / 1000,
> '-MM-dd HH:mm:ss')) ,我按此方式生成事件时间列,发现watermark一直比北京时间多8小时,比如create_time
>
这个目前还不能,但是在1.12是可以的,已经在这个issue[1] 中添加了这个功能
[1] https://issues.apache.org/jira/browse/FLINK-18555
ang <806040...@qq.com> 于2020年9月24日周四 上午11:19写道:
> 感谢benchao,请问下这部分只能通过config来设置吗,有没有可以直接在sql中设置的配置项
>
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>
hi,
在用table window时报下面的exception, 需要在groupBy里增加什么吗? window W已经指定的proctime字段pt了。
报Exception, Caused by: org.apache.flink.table.api.ValidationException: A
group window expects a time attribute for grouping in a stream environment.
Table outTable = tEnv.from(tableName)
没有搜索路径,需要用绝对路径
赵一旦 于2020年9月24日周四 下午3:22写道:
> 看了你文章,有jars。想继续问下,jars必须完全路径嘛,有没有什么默认的搜索路径,我简单写jar名字的。不想写死路径。
>
> 赵一旦 于2020年9月24日周四 下午3:17写道:
>
> > 这就有点麻烦了,公司机器一般不允许连接外部网络的。
> >
> > Jeff Zhang 于2020年9月24日周四 下午3:15写道:
> >
> >> flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
>
两个办法:
1. 用私有的maven 仓库
2. 自己打jar包,用 flink.exection.jars 来指定
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s#3BNYl
赵一旦 于2020年9月24日周四 下午3:17写道:
> 这就有点麻烦了,公司机器一般不允许连接外部网络的。
>
> Jeff Zhang 于2020年9月24日周四 下午3:15写道:
>
> > flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
> >
看了你文章,有jars。想继续问下,jars必须完全路径嘛,有没有什么默认的搜索路径,我简单写jar名字的。不想写死路径。
赵一旦 于2020年9月24日周四 下午3:17写道:
> 这就有点麻烦了,公司机器一般不允许连接外部网络的。
>
> Jeff Zhang 于2020年9月24日周四 下午3:15写道:
>
>> flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
>> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
>>
>>
这就有点麻烦了,公司机器一般不允许连接外部网络的。
Jeff Zhang 于2020年9月24日周四 下午3:15写道:
> flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
>
>
> 赵一旦 于2020年9月24日周四 下午3:09写道:
>
> > 通过zeppelin写sql,之前sql-client可行的sql总是报错。
> >
> >
flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
赵一旦 于2020年9月24日周四 下午3:09写道:
> 通过zeppelin写sql,之前sql-client可行的sql总是报错。
>
> zeppelin配置的FLINK_HOME的目录中lib下是有相关包的,但远程remote集群没有。
> 之前sql-client方式是基于-l方式指定的。
>
>
而且,官网connector中的kafka的flink-sql-connector-kafka_2.11-1.11.2.jar是干嘛用的。
我发现我自己sql-client方式创建kafka流失表,以及查询等都用不到这个依赖。
用的是:flink-connector-kafka_2.12-1.10.0.jar
flink-connector-kafka-base_2.12-1.10.0.jar .
赵一旦 于2020年9月24日周四 下午3:08写道:
> 通过zeppelin写sql,之前sql-client可行的sql总是报错。
>
>
通过zeppelin写sql,之前sql-client可行的sql总是报错。
zeppelin配置的FLINK_HOME的目录中lib下是有相关包的,但远程remote集群没有。
之前sql-client方式是基于-l方式指定的。
zeppelin情况下,貌似有个flink.execution.packages,但是并没说明这个指定的包去哪找的?是zeppelin配置的FLINK_HOME中的lib嘛?我lib中有包,但还是报错。
+1
nashcen <2415370...@qq.com> 于2020年9月24日周四 下午1:09写道:
> +1
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
应该和 flink 的 JM/TM/rocksdb 没有直接关系。不排除反复起停任务给 NM 造成了一定的压力。建议你去 hadoop
社区的邮件列表问问看。
Thank you~
Xintong Song
On Thu, Sep 24, 2020 at 11:52 AM superainbower
wrote:
> Hi, 大家好
> 我有个flink任务在yarn上跑,statebackend是rocksdb,由于是测试,所以一段时间内我反复起停了任务,后来我发现在Yarn集群的NodeManger出现GC时间超出阈值报警(没有其他错误日志),此时我查看对应节点的
>
40 matches
Mail list logo