回复:flink自动重启出错

2022-08-20 文章 Michael Ran
改过任务吗?



| |
greemqq...@163.com
|
|
邮箱:greemqq...@163.com
|




 回复的原邮件 
| 发件人 | Jason_H |
| 日期 | 2022年08月19日 11:52 |
| 收件人 | flink中文邮件组 |
| 抄送至 | |
| 主题 | flink自动重启出错 |
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state 
serializer must not be incompatible with the old state serializer
大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|

Re:Re: flink jdbc source oom

2022-03-31 文章 Michael Ran
这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join 
之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
>@Peihui  当前社区的 jdbc table source 实现了这些接口:
>ScanTableSource,
>LookupTableSource,
>SupportsProjectionPushDown,
>SupportsLimitPushDown
>
>其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
>projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
>filter/aggregate pushdown 满足前置过滤需求
>
>
>Best,
>Lincoln Lee
>
>
>r pp  于2022年3月31日周四 18:40写道:
>
>> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
>>


Re:回复:flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 文章 Michael Ran






因为  KafkaConnectorOptions  里面没有,所有WITH 参数里面不知道如何加入了








在 2022-03-22 18:22:44,"写虫师"  写道:
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2022年3月22日(星期二) 晚上6:21
>收件人:"user-zh@flink.apache.org"
>主题:Re:flink-sql,对于kafka 的一些额外参数如何配置
>
>
>
>Hi,
>partition.discovery.interval.ms 这个是Flink connector 
>kafka里面加上的,KafkaSourceOptions里面定义的,
>看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。
>
>
>
>
>
>在 2022-03-22 17:10:52,"Michael Ran" dear all :
> 
>目前用flink1.4 table api +kafka 的情况下,有各种警告,比如:
> 
>The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
>known config.
> 
>这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的?
> 
>有什么建议吗?
>感谢!


flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 文章 Michael Ran
dear all :
 目前用flink1.4   table api +kafka 的情况下,有各种警告,比如:
 The configuration 'partition.discovery.interval.ms' was supplied 
but isn't a known config.
 这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的?
 有什么建议吗?
感谢!

Re:Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-22 文章 Michael Ran
可以考虑存储层 局部更新
在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道:
>Cdc join
>
>> 2022年3月21日 14:01,JianWen Huang  写道:
>> 
>> 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。
>> 例子:
>> 变化前:
>> A流:
>> name  gender
>> a male
>> b male
>> c female
>> 
>> 纬度表B:
>> nameage
>> a   16
>> b17
>> 
>> 结果:
>> name   gender   age
>> a   male  16
>> b   male 17
>> 
>> 发生变化后:
>> 纬度表B:
>> nameage
>> a   16->17
>> b17->18
>> 
>> 结果:
>> name   gender   age
>> a   male  17
>> b   male  18
>> 
>> 目前我想到一个做法是将维度表做成流然后关联事实表,最后根据更新时间取top1最新sink到存储里。请问大家有别的更好做法吗


Re:flink sql jdbc sink事务提交问题

2022-02-15 文章 Michael Ran
jdbc 连接 mysql 的driver  记得默认就是AutoCommit。phoenix不太清楚
在 2022-02-15 13:25:07,"casel.chen"  写道:
>最近在扩展flink sql jdbc 
>connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。
>源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary()方法,里面根据connection.getAutoFlush()与否来执行connection.flush()操作。
>一开始我没有在phoenix jdbc 
>url上添加;autocommit=true参数,发现变化的数据并没有commit到数据库。后来添加了;autocommit=true参数后执行了connection.commit()方法才将数据提交成功。
>
>
>有几个疑问:
>1. 换成sink进mysql数据库就没有这个问题,难道不同数据库的jdbc sink行为会不一样么?
>2. connection autoflush参数在哪里设置?跟autocommit区别是什么?
>3. 
>buffer条数满了或interval周期达到又或者checkpoint时就会执行flush操作,里面执行的是JdbcBatchingOutputFormat.flush方法,这里我也没有找到connection.commit()操作,数据是如何提交到数据库的呢?不开启事务情况下,执行完statement.executeBatch()就会提交么?


Re:jdbc connector ??????????????????????????????????????????????????????????????

2022-02-15 文章 Michael Ran




?? 2022-02-14 15:40:11??"jianjianjianjianjianjianjianjian" 
<724125...@qq.com.INVALID> ??


  ??jdbc 
connector
  ??1.13 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat ??
   master ?? 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat  



1.13:


master ??







Re:数据库Table Schema 转换为 Flink Schema

2022-01-24 文章 Michael Ran
table api 里面有 catalogTable 的实现
在 2022-01-24 16:50:25,"WuKong"  写道:
>hi all:
>   大家有没有了解, 社区或者其他渠道可以提供 将 数据库的建表语句(比如 Mysql Create table ) 自动转换为 Flink 的 
> Schema 对象(org.apache.flink.table.api.Schema) ,求推荐
>
>
>
>---
>Best,
>WuKong


Re:Flink mysql cdc凌晨同步报错

2022-01-16 文章 Michael Ran
贴失败原因吧,这个看不出来
在 2022-01-13 09:37:59,"Fei Han"  写道:
>
>@all:
>Flink mysql cdc凌晨同步报错,流任务都失败了。报错如下:
>
>org.apache.flink.runtime.JobException: Recovery is suppressed by 
>FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
>backoffTimeMS=3)
>
>at 
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>
>at 
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>
>at 
>org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
>
>at 
>org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
>
>at 
>org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
>
>at 
>org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
>
>at 
>org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
>
>at 
>org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
>
>at sun.reflect.GeneratedMethodAccessor137.invoke(Unknown Source)
>
>at 
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.java:498)
>
>at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>
>at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>
>at 
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>
>at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>
>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
>at akka.actor.Actor.aroundReceive(Actor.scala:517)
>
>at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>加载更多


Re:Flink Kafka e2e exactly once问题询问

2021-12-30 文章 Michael Ran
没测试过,如果kafka 确定自身会丢掉超时事务消息的前提下,比如10分钟超时丢消息。1.flink 
发送消息A,进入第一阶段。2.flink 等待kafka 消息一阶段 ack信息  3.flink 
收到ack消息,发送二阶段确认消息,并进行chk异常: 
这个时候flink第二阶段消息确认,发送失败(同时flink应用因为各种原因挂了,超过10分钟)3.1 10分钟后,kakfa 
丢弃事务超时的消息3.2 flink 重启,重新提交二阶段的事务id (但是由于kakfa 
消息已经丢了,提交无效)猜测:当二阶段commit失败的时候,是否根据重发消息来解决?同时引入幂等保证
在 2021-12-31 11:31:49,"zilong xiao"  写道:
>看官方文档中有介绍说当kafka事务超时时,可能会出现数据丢失的情况,那就是说,Flink没办法完全保证端到端exactly
>once是么?想请教下社区大佬,我这么理解是正确的吗?一直都听说Flink 写kafka是可以保证端到端exactly once的,看到文档描述有点懵
>
>文档地址:
>https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance
>
>关键字:"If the time between Flink application crash and completed restart is
>larger than Kafka’s transaction timeout there will be data loss (Kafka will
>automatically abort transactions that exceeded timeout time)."


Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 Michael Ran
可以写两个insert 语句,后面用判断分开~。~
在 2021-12-29 16:40:39,"RS"  写道:
>Hi,
>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>
>
>比如:源数据有3个字段,a,b,c
>insert into table2
>select
>a,b,c
>from table1
>当b=null的时候,只希望写入a和c
>当c=null的时候,只希望写入a和b
>


flink 1.15 编译 dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.

2021-12-28 文章 Michael Ran
dear all :
有朋友遇到过编译flink 1.15 master  出现这个异常吗?




[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) on 
project flink-runtime: There are test failures.
[ERROR] 
[ERROR] Please refer to 
/Users/qqr/work/git/fork/flink/flink-runtime/target/surefire-reports for the 
individual test results.
[ERROR] Please refer to dump files (if any exist) [date].dump, 
[date]-jvmRun[N].dump and [date].dumpstream.
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn  -rf :flink-runtime



Re:kafka????????????????????

2021-12-24 文章 Michael Ran
clean install  ??
?? 2021-12-17 17:41:32??"??" <2572805...@qq.com.INVALID> ??
>??
>
>flinkKafka??
>
>??
>
>[ERROR] 
>org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReaderTest
> Time elapsed: 1.398 s <<< ERROR!
>java.lang.NoClassDefFoundError: 
>org/apache/flink/networking/NetworkFailuresProxy
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createProxy(KafkaTestEnvironment.java:241)
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:434)
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:136)
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:165)
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:152)
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:115)
> at 
>org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:107)
> at 
>org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.setup(KafkaSourceTestEnv.java:59)
> at 
>org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReaderTest.setup(KafkaPartitionSplitReaderTest.java:87)
> at 
>java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
>java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
>java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 
>java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
>org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> at 
>org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at 
>org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at 
>org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> at 
>org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126)
> at 
>org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:68)
> at 
>org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> at 
>org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> at 
>org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
>org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at 
>org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
>org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
>org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> at 
>org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> at 
>org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$11(ClassBasedTestDescriptor.java:397)
> at 
>org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
>org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:395)
> at 
>org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:209)
> at 
>org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:80)
> at 
>org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
> at 
>org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
>org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
> at 
>org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at 
>org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
> at 
>org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
>org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> at 
>org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> at 
>java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
> at 
>org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
> at 

Re:Flink SQL DECIMAL精度问题

2021-12-23 文章 Michael Ran
clickhouse  还提供 Decimal64 Decimal128 ,我也想知道38 这个是什么数据库的标准吗?
在 2021-12-23 19:58:24,"Ada Wong"  写道:
>最大精度为38,这个是有什么说法吗,为什么不是1000。如果我需要更高精度的DECIMAL我改怎么做?例如我需要DECIMAL(50, 18)


Re:Re:Re:sink jdbc超时问题(无心跳包)

2021-12-19 文章 Michael Ran
心跳没有,只有重试参数:sink.max-retries
在 2021-12-20 12:14:37,"Jeff"  写道:
>
>
>
>会有? 什么意思呢? 我现在用的是1.13.2没有呀,相关配置也没。
>
>
>
>
>
>
>
>
>
>
>
>在 2021-12-20 10:43:05,"Michael Ran"  写道:
>>会有判断连接是否有效,以及重试的操作
>>在 2021-12-20 11:39:23,"Jeff"  写道:
>>>sink jdbc长时间未有数据写入后,再写数据时会出现连接超时问题,请问flink jdbc connector是否有像连接池那样的心跳功能?


Re:sink jdbc超时问题(无心跳包)

2021-12-19 文章 Michael Ran
会有判断连接是否有效,以及重试的操作
在 2021-12-20 11:39:23,"Jeff"  写道:
>sink jdbc长时间未有数据写入后,再写数据时会出现连接超时问题,请问flink jdbc connector是否有像连接池那样的心跳功能?


Re:Flink1.14 table api & sql针对递增维度聚合 ttl是如何处理的

2021-12-12 文章 Michael Ran
你要的是 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/
 参数: table.exec.state.ttl这个吗?
在 2021-12-11 13:56:21,"guanyq"  写道:
>请大佬指导下:
>需求: 通过flink sql 统计每天各个省份的订单受理量,显然这种维度统计时递增,如何设置ttl,只想让维度存储1周的数据。
>维度递增很可能会导致内存溢出,请教下flink sql ttl 配置在官网哪里有说明么。
>
>
>
> 
>
>
>
>
>
> 


Re:Re: 关于异步 AsyncTableFunction CompletableFuture 的疑问?

2021-12-06 文章 Michael Ran
好的,谢谢,我这边尝试下异步保证顺序,我们这边有些场景
在 2021-12-07 14:17:51,"Caizhi Weng"  写道:
>Hi!
>
>1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
>
>
>正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。
>
>2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
>
>
>Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async
>operator。
>
>Michael Ran  于2021年12月7日周二 10:33写道:
>
>> deal all:
>> 目前在看table api 中,自定义的异步 join 方法 AsyncTableFunction#eval
>> 方法时,发现接口提供的是:
>> public void eval(CompletableFuture>
>> future,Object... keys) {...}
>>
>>
>> 目前遇到两个问题:
>>
>>
>> 1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
>> 2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
>>
>>
>> 有各位大佬方便介绍一下吗?或者更详细的文档说明之类的?
>> 非常感谢。
>>


关于异步 AsyncTableFunction CompletableFuture 的疑问?

2021-12-06 文章 Michael Ran
deal all:
目前在看table api 中,自定义的异步 join 方法 AsyncTableFunction#eval 方法时,发现接口提供的是:
public void eval(CompletableFuture> 
future,Object... keys) {...}


目前遇到两个问题:


1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?


有各位大佬方便介绍一下吗?或者更详细的文档说明之类的?
非常感谢。
  

Re:flink????????????????????

2021-12-02 文章 Michael Ran
jdbc scan
?? 2021-12-02 14:40:06??""  ??
>


Re:Re: flink sql中如何使用异步io关联维表?

2021-11-30 文章 Michael Ran
Hello,咨询一下,目前connector-hbase 的异步join,是能保证顺序的吗?
在 2021-03-05 11:10:41,"Leonard Xu"  写道:
>目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1]
>另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2]
>
>祝好
>
>[1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java#L618
>[2] https://github.com/apache/flink/pull/14684#pullrequestreview-604148209 
>
>
>
>
>> 在 2021年3月4日,14:22,HunterXHunter <1356469...@qq.com> 写道:
>> 
>> 定义一个 sourcetable
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-17 文章 Michael Ran
应该和OSS没关系吧,毕竟只是个存储。
我们CPU 你先看看消耗在哪个线程或者方法类呗



在 2021-10-08 16:34:47,"Lei Wang"  写道:



flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。





 
这个可能的原因是什么?会跟 OSS 有关吗? 


谢谢,
王磊

Re:flinksql有计划支持mysql catalog吗?

2021-10-17 文章 Michael Ran
https://github.com/apache/flink/pull/16962还没合并
在 2021-10-12 10:17:15,"赵旭晨"  写道:
>目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?


Re:Flink-1.12 sql 同一个Job 下 如何控制 多个SQL的执行顺序

2021-10-15 文章 Michael Ran
插入 tidb  接收binlog,再次写入
在 2021-10-14 15:59:54,"WuKong"  写道:
>Hi:
>   目前遇到一个问题,我想在一个Job下 ,有两个SQL 分步都是 读取同一个Source Kafka 数据, 一个是插入Tidb 落数据, 
> 另一个是写入下游Kafka, 目前想控制 先插入DB 然后再写入下游Kafka, 请问有什么方案可以实现这种方式?
>
>
>
>---
>Best,
>WuKong


Re:Flinksql关联维表数据延迟咨询

2021-09-21 文章 Michael Ran
先看看瓶颈在哪儿? join的话,理论上你cache全表之后就会快起来。sink 瓶颈,可以扩大并发数(sink端压力不大的情况)
在 2021-09-16 14:40:17,"邓 雪昭"  写道:
>各位老师好,
>   
> 我目前使用Flinksql构建了一个应用,数据源是kafka,关联了一张23w数据的维表(存放在Tidb),该维表和流中的数据关联会有一些发散(业务逻辑),使用了lookup.cache.maxprows=25,ttl=3600s,目前输出到kafka,延迟很严重,处理时间会领先事件时间几十分钟并且还会持续扩大,请问有什么好的解决办法吗?
>
>
>从 Windows 版邮件发送
>


Re:Re: flink sql streaming情况如何解决数据倾斜问题

2021-09-14 文章 Michael Ran
按 key1  key2 分开统计,然后合并一下
在 2021-09-08 11:04:59,"yidan zhao"  写道:
>我们流量大概4w的qps,如何根据key1+key2进行pv统计(SQL任务比较简单)。
>
>但是key2的分布比较极端,有些可能90%集中的。
>
>Shuo Cheng  于2021年9月7日周二 下午7:30写道:
>
>> 最好具体描述下什么场景的倾斜, sql 上也有一些解倾斜的手段
>>
>> On 9/7/21, yidan zhao  wrote:
>> > 如题,目前非sql情况本身实现灵活,有很多方案。
>> > 但是SQL情况下,倾斜严重,同时无解。有没有小伙伴解决过类似问题。
>> >
>> > 注意:sql,流任务,数据倾斜。
>> >
>>


Re:Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint

2021-08-13 文章 Michael Ran
batch 和数量小点呗 ~。~



在 2021-08-12 10:09:21,"周瑞"  写道:

您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决

Re:Re:Re:Re: Over窗口聚合性能调优问题

2021-08-10 文章 Michael Ran
图看不到,建议你展开算子,看看背压在什么地方



在 2021-08-10 20:27:51,"Wanghui (HiCampus)"  写道:

单个来看,GC并非很频繁,但是背压是HIGH

 

-邮件原件-
发件人: Michael Ran [mailto:greemqq...@163.com]
发送时间: 2021年8月10日 20:24
收件人: user-zh@flink.apache.org
主题: Re:Re: Over窗口聚合性能调优问题

 

看看GC 情况呢,后端写入速度呢? 有背压吗?

在 2021-07-30 19:44:19,"Tianwang Li"  写道:

>(小的)tumbling window + (大的)over window

>这样会不会好一些。

> 

> 

>Wanghui (HiCampus)  于2021年7月30日周五下午3:17写道:

> 

>> Hi all:

>>我在测试Over窗口时,当窗口是5秒~15s级别时,处理速度能够达到2000/s。

>> 但是当我把窗口调整为10分钟以上时,处理速度从2000开始急速下降,几分钟后下降至230/s。

>> 请问下:

>>Over窗口的性能该如何优化,因为我后续会将窗口调整为24小时,按照目前的情况来看,性能会下降很快。

>>我的测试节点配置:8C + 16G

>>Flink配置: taskmanager process memory: 8G Best regards WangHui

>> 

>> 

> 

>--

>**

> tivanli

>**

Re:Re: Over窗口聚合性能调优问题

2021-08-10 文章 Michael Ran
看看GC 情况呢,后端写入速度呢? 有背压吗?
在 2021-07-30 19:44:19,"Tianwang Li"  写道:
>(小的)tumbling window + (大的)over window
>这样会不会好一些。
>
>
>Wanghui (HiCampus)  于2021年7月30日周五 下午3:17写道:
>
>> Hi all:
>>我在测试Over窗口时,当窗口是5秒~15s级别时,处理速度能够达到2000/s。
>> 但是当我把窗口调整为10分钟以上时,处理速度从2000开始急速下降,几分钟后下降至230/s。
>> 请问下:
>>Over窗口的性能该如何优化,因为我后续会将窗口调整为24小时,按照目前的情况来看,性能会下降很快。
>>我的测试节点配置:8C + 16G
>>Flink配置: taskmanager process memory: 8G
>> Best regards
>> WangHui
>>
>>
>
>-- 
>**
> tivanli
>**


Re:回复:回复:flink sql 依赖隔离

2021-07-26 文章 Michael Ran
额,混用多个UDF没法,本身就依赖冲突了,一般公共的UDF我们都统一的。 私人的和 公共的冲突了- -再单独和他们讨论
在 2021-07-26 11:43:49,"silence"  写道:
>就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊
>
>
>----------
>发件人:Michael Ran 
>发送时间:2021年7月23日(星期五) 17:42
>收件人:user-zh ; silence 
>主 题:Re:回复:flink sql 依赖隔离
>
>建议上传的时候单独放,提交任务的时候  拉下来单独引用
>在 2021-07-23 11:01:59,"silence"  写道:
>>
>>这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
>>udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
>>目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
>>--
>>发件人:Michael Ran 
>>发送时间:2021年7月22日(星期四) 20:07
>>收件人:user-zh ; silence 
>>主 题:Re:flink sql 依赖隔离
>>
>>通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>>在 2021-07-05 14:06:53,"silence"  写道:
>>>请教大家目前flink sql有没有办法做到依赖隔离
>>>比如connector,format,udf(这个最重要)等,
>>>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>>>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>>
>


Re:Re:请教各位大佬,flink读取hdfs文件时的疑问

2021-07-26 文章 Michael Ran
你尝试下 用java 读取文件过程中,改文件名 或者移动文件。会异常



在 2021-07-26 15:19:39,"wanggaoliang" <18838915...@163.com> 写道:
















在 2021-07-26 14:56:00,"wanggaoliang" <18838915...@163.com> 写道:

flink读取hdfs文件时,FileInputFormat.java类中,acceptFile()方法默认过滤掉了以"_"和"."开头的文件,那如果读入了in-progress文件和.pending文件,
在执行open()方法时,突然in-progress或.pending文件状态发生了改变而导致原来的文件路径消失,会不会出现什么问题?或者是有没有可能出现这种情况?





 






 

Re:回复:flink sql 依赖隔离

2021-07-23 文章 Michael Ran
建议上传的时候单独放,提交任务的时候  拉下来单独引用
在 2021-07-23 11:01:59,"silence"  写道:
>
>这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
>udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
>目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
>----------
>发件人:Michael Ran 
>发送时间:2021年7月22日(星期四) 20:07
>收件人:user-zh ; silence 
>主 题:Re:flink sql 依赖隔离
>
>通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>在 2021-07-05 14:06:53,"silence"  写道:
>>请教大家目前flink sql有没有办法做到依赖隔离
>>比如connector,format,udf(这个最重要)等,
>>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>


Re:flink时态表:两个Hbase左关联有报错情况

2021-07-22 文章 Michael Ran
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar
在 2021-07-14 09:39:53,"xie_guo...@163.com"  写道:
>您好,有关flinkSQL时态表左关联时遇到了问题。
>具体场景:
>
> 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!
>
>2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
>2021-07-14 09:22:20.596 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
>LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
> joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code, 
>data1, data2, p, $f4, code0, data]) -> Calc(select=[code, 
>ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p, EXPR$4]) 
>(3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING to FAILED 
>with failure cause: java.util.concurrent.ExecutionException: 
>java.lang.NoClassDefFoundError: 
>org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
>at 
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
>at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.lang.NoClassDefFoundError: 
>org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
>at 
>org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
>at 
>org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
>at LookupFunction$3.close(Unknown Source
>
>ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。
>
>
>
>Sincerely,
>xie_guo...@163.com


Re:flink ??????????????

2021-07-22 文章 Michael Ran
??
?? 2021-07-13 17:31:19??"" <1510603...@qq.com.INVALID> ??
>Hi All??
>
>
>  ??Flink 
>checkpoint??2min??
>??2min??  ??
> 
>
>
>
>The program finished with the following exception:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> failed.
>   at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>   at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>Caused by: java.util.concurrent.TimeoutException
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)


Re:请教on yarn per job 作业采集日志进行监控方案

2021-07-22 文章 Michael Ran
简单的可以先监控任务状态,重启次数这种,消息延迟等这种能直接api拿到值的, 其他指标的比较麻烦,特别是task多了,算子多,还要合并
在 2021-07-21 11:32:31,"yihan xu"  写道:
>原本作业基本处于半裸奔的状态,最近线上出了一次小事故后,在考虑如何实时采集作业日志或者metric再配置告警。
>网上初步搜了一下,好像就是prometheus+grafana或者elk。
>
>请教各位大佬的项目目前都是用什么方式,我们小公司就我一个人搞flink,半路出家水平也有限,请大佬们推荐个易维护坑少点的方式?谢谢。
>
>发自我的iPhone
>
>
>发自我的iPhone


Re:Re: flink sql 依赖隔离

2021-07-22 文章 Michael Ran
我看阿里不是传到OSS,然后每个任务 image  拉取下来的时候顺便就把jar 拉进来了。完全隔离的,jar 也方便管理
在 2021-07-22 23:45:14,"Jeff Zhang"  写道:
>Zeppelin 支持依赖的动态加载
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2
>
>
>Michael Ran  于2021年7月22日周四 下午8:07写道:
>
>> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>> 在 2021-07-05 14:06:53,"silence"  写道:
>> >请教大家目前flink sql有没有办法做到依赖隔离
>> >比如connector,format,udf(这个最重要)等,
>> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>>
>
>
>-- 
>Best Regards
>
>Jeff Zhang


Re:flink sql 依赖隔离

2021-07-22 文章 Michael Ran
通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence"  写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划


Re:Re: flink大窗口性能问题

2021-07-22 文章 Michael Ran
并行度改大,窗口时间小点呗
在 2021-07-15 11:52:12,"Wanghui (HiCampus)"  写道:
>并行度增大也可以吗?
>
>
>
>On 2021/07/15 02:45:18, "Michael Ran" mailto:g...@163.com>> 
>wrote:
>
>> 要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少>
>
>> 在 2021-07-15 10:23:25,"Hui Wang" 
>> <46...@qq.com.INVALID<mailto:46...@qq.com.INVALID>> 写道:>
>
>> >flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优>
>
>>
>
>>


Re:flink大窗口性能问题

2021-07-14 文章 Michael Ran
要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少
在 2021-07-15 10:23:25,"Hui Wang" <463329...@qq.com.INVALID> 写道:
>flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优


Re:?????? flink sql??????????????????

2021-06-21 文章 Michael Ran
k8s ??
?? 2021-06-16 18:22:29??"??" <809097...@qq.com> ??
>FlinkSql WebIDE?? 
>FlinkSQLSQL??SqlCli??
> https://github.com/DataLinkDC/dlink
>
>
>
>
>----
>??: "todd": 2021??6??16??(??) 5:48
>??: "user-zh": Re: flink sql??
>
>
>
>Flink 
>apihttps://github.com/todd5167/flink-spark-submiter??
>
>spi
>
>??
> - ??Flink 
>client
>??
> - ??
> - ??
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 文章 Michael Ran
本来想从DeserializationFormat 拿到的,如果不能。后续SQL 能拿到也行
在 2021-06-17 14:41:55,"Jingsong Li"  写道:
>不能,除非你自己创建一个新的kafka connector。
>
>不过,
>kafka的offset、partition等信息是可以通过metadata的方式拿到的。
>
>你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?
>
>Best,
>Jingsong
>
>On Thu, Jun 17, 2021 at 2:35 PM Michael Ran  wrote:
>
>> dear all :
>> 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
>> 但是根据 “implements DeserializationFormatFactory,
>> SerializationFormatFactory”
>> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>>   有方法
>> deserialize(ConsumerRecord record,
>> Collector collector) 。
>> 包装了offset 的对象:ConsumerRecord
>>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>>
>
>
>
>-- 
>Best, Jingsong Lee


如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 文章 Michael Ran
dear all :
目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
但是根据 “implements DeserializationFormatFactory, 
SerializationFormatFactory” 
这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
有方法 
deserialize(ConsumerRecord record, Collector 
collector) 。
包装了offset 的对象:ConsumerRecord   
,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
  

Re:?????? Flink ????????join

2021-06-07 文章 Michael Ran
join
?? 2021-06-07 16:35:10??"Jason Lee"  ??
>
>
>??Flink
> SQL Join
>
>
>??
>| |
>JasonLee
>|
>|
>jasonlee1...@163.com
>|
>??
>
>
>??2021??02??25?? 14:40??Suhan ??
>benchao??joinrocketmqflink??kafka
> + rocket mq
>??flink??
>
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2021??2??25??(??) 12:00
>??:"user-zh"
>:Re: Flink join
>
>
>
>Hi,
>
> Benchao topic
>??
>
>Benchao Li 
> Hi??
>
> 
>Joinjoin??
>
> 
>join??topicjoin??
>
> ?? 
>  ??
> 
> 
> 
>??JOINjoin??10match??match6match
>  flink 
>SQL??UDFtimer
> 
>  ThanksRegards
>
>
>
> --
>
> Best,
> Benchao Li
>


Re:关于 flinksql 维表的问题

2021-05-24 文章 Michael Ran
可以设置ttl,但是做不到固定时间 更新
在 2021-05-22 16:08:58,"WeiXubin" <18925434...@163.com> 写道:
>我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前
>FlinkSQL 可以实现吗?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re:Re:flink sql写mysql中文乱码问题

2021-05-19 文章 Michael Ran



数据库的字段字符编码














在 2021-05-18 18:19:31,"casel.chen"  写道:
>我的URL连接串已经使用了  useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE
>> TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
>>VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格', 
>>   user_name VARCHAR(64) COMMENT '用户名称') WITH (   'connector' = 
>>'jdbc',   'url' = 
>>'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8',
>>   'username' = 'mysqluser',   'password' = 'mysqluser',   
>>'table-name' = 'jdbc_sink')
>>在 2021-05-18 11:55:46,"casel.chen"  写道:
>>>我的flink sql作业如下
>>>
>>>
>>>SELECT
>>>product_name,
>>>window_start,
>>>window_end,
>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>-- LOCALTIMESTAMP AS insert_time,
>>>'微支付事业部'AS bus_name
>>>FROM(
>>>
>>>
>>>mysql sink表的定义如下
>>>CREATE TABLE XXX (
>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>
>>>
>>>运行起来后写入mysql表的数据带有中文乱码 ??
>>>
>>>
>>>
>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>2021-05-17 18:02:25,010 INFO 
>>>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
>>>GroupAggregate(groupBy=[product_name, window_start, window_end], 
>>>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS 
>>>$f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS 
>>>product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd 
>>>HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT 
>>>_UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, 
>>>CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
>>>_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
>>>AS bus_name]) -> Sink: 
>>>Sink(table=[default_catalog.default_database.all_trans_5m_new], 
>>>fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
>>>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy 
>>>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>>GroupAggregate(groupBy=[product_name, window_start, window_end, id, 
>>>data_type, mer_cust_id, order_no, trans_date], select=[product_name, 
>>>window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, 
>>>MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
>>>window_start, window_end, trans_amt, order_no]) (1/1)#0 
>>>(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.


Re:Re: Table-api sql 预检查

2021-04-30 文章 Michael Ran
org.apache.flink.table.api.TableException: Unsupported operation: 
org.apache.flink.table.operations.ddl.CreateTableOperation
At 2021-04-30 10:19:49, "HunterXHunter" <1356469...@qq.com> wrote:
>tableEnv.explainSql("");
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Table-api sql 预检查

2021-04-29 文章 Michael Ran
从代码逻辑里面肯定能抓出来,就是觉得这个预检查的功能可以作为API 开放出来
在 2021-04-29 12:29:26,"Shengkai Fang"  写道:
>Hi.
>
>可以看看 TableEnvironment#execute逻辑,如果能够将sql 转化成 `Transformation`,那么语法应该没有问题。
>
>Best,
>Shengkai
>
>Michael Ran  于2021年4月29日周四 上午11:57写道:
>
>> dear all :
>> 用table api 提交多个SQL 的时候,有什么API 能提前检查SQL 的错误吗? 每次都是提交执行的时候才报错。
>> 理论上里面在SQL解析部分就能发现,有现成的API吗?还是说需要自己去抓那部分解析代码,然后封装出API。
>> 如果没有,希望能提供这个功能,blink 应该是有的。
>>
>>
>>
>>
>> Thanks !
>>


Table-api sql 预检查

2021-04-28 文章 Michael Ran
dear all :
用table api 提交多个SQL 的时候,有什么API 能提前检查SQL 的错误吗? 每次都是提交执行的时候才报错。
理论上里面在SQL解析部分就能发现,有现成的API吗?还是说需要自己去抓那部分解析代码,然后封装出API。
如果没有,希望能提供这个功能,blink 应该是有的。




Thanks !


Re:flink 背压问题

2021-04-28 文章 Michael Ran
不至于吧,中间有错误吧。。
在 2021-04-29 11:45:17,"Bruce Zhang"  写道:
>我的数据源每一秒发送一条数据,下游算子每六秒才能处理完成入库,我测试时使用的是一个并行度,数据发送完毕后,在库里只有前三条发送和后两条发送的数据,中间的数据全部丢失了。应该是背压机制的问题,这是什么原因呢


Re:?????? flink sql????kafka join??????????????????????

2021-04-22 文章 Michael Ran
??
?? 2021-04-22 11:21:55??""  ??
>Tidb??Tidb??TiDBstructured-streaming??
>??
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2021??4??22??(??) 10:50
>??:"user-zh"
>:Re: flink sqlkafka join??
>
>
>
>??SQLparse json??join
>SQL??join70s=3.8k3
>
>??JOIN??
>TiDB??
>useUnicode=truecharacterEncoding=UTF-8serverTimezone=Asia/ShanghairewriteBatchedStatements=true
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink mysql cdc????

2021-04-22 文章 Michael Ran
CDCbinlog
?? 2021-04-22 14:22:18??"" <1353637...@qq.com> ??
>??flink mysql cdc
>1.flink mysql 
>cdc??mysql??binlog??mysql
> 


Re:flink connector用户密码

2021-04-08 文章 Michael Ran
好像并不能- -,想支持的话,你在密码那里加个参数,支持一个加密解密的参数就行。重新打包
在 2021-04-02 16:58:30,"guoyb" <861277...@qq.com> 写道:
>比如jdbc connector MySQL
>
>
>create table xxx
>()
>with(
>" user name"=" root"
>," password"="123456"
>)
>;
>用户密码可以怎么配置,可以不用明文这种方式。


Re:flink 使用yarn部署,报错:Maximum Memory: 8192 Requested: 10240MB. Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values

2021-03-22 文章 Michael Ran
超过了 yarn 容器 配置吧
At 2021-03-20 10:57:23, "william" <712677...@qq.com> wrote:
>org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>deploy Yarn session cluster
>at
>org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:425)
>at
>org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:606)
>at
>org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:860)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>at
>org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at
>org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:860)
>Caused by:
>org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
>cluster does not have the requested resources for the TaskManagers
>available!
>Maximum Memory: 8192 Requested: 10240MB. Please check the
>'yarn.scheduler.maximum-allocation-mb' and the
>'yarn.nodemanager.resource.memory-mb' configuration values
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:提交两个SQL任务,其中一个不生效。

2021-03-10 文章 Michael Ran
StatementSet 用用
在 2021-03-11 09:37:27,"eriendeng"  写道:
>提交两个SQL,后面的SQL不生效,这是为什么呢?后面的看起来会变成前一个的HA。
>Job1:Kafka Topic1 -> Kafka Topic2
>Job2:Kafka Topic2 -> Postgre
>
>不是很明白这个原因,是不是有哪里没配置好呢?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-09 文章 Michael Ran
1.两套逻辑结果,只能定时任务做check2.同一套逻辑,就要具体分析了,只要不是一个人、一套代码逻辑出来的,都有可能出问题
在 2021-03-09 12:51:50,"Smile"  写道:
>对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。
>有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 文章 Michael Ran
看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
在 2021-03-09 14:57:43,"yidan zhao"  写道:
>而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>
>或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>
>yidan zhao  于2021年3月9日周二 下午2:56写道:
>
>> 好的,我会看下。
>> 然后我今天发现我好多个集群GC collector不一样。
>> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>> threads,还有一种是Mark Sweep Compact GC。
>> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>
>>
>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>
>>
>> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>
>>> Hi,
>>>
>>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>>
>>> Best,
>>> jjiey
>>>
>>> > 2021年3月8日 14:37,yidan zhao  写道:
>>> >
>>> >
>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>>> > leadership’ 错导致任务重启。
>>> >
>>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>> > 2021-03-08 14:31:40
>>> > org.apache.flink.runtime.io
>>> .network.netty.exception.RemoteTransportException:
>>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>> > CreditBasedPartitionRequestClientHandler.java:294)
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>> > CreditBasedPartitionRequestClientHandler.java:183)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> > .java:357)
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > NettyMessageClientDecoderDelegate.channelRead(
>>> > NettyMessageClientDecoderDelegate.java:115)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> > .java:357)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>>> > 1410)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>>> > AbstractEpollStreamChannel.java:792)
>>> >at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> > .processReady(EpollEventLoop.java:475)
>>> >at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> > .run(EpollEventLoop.java:378)
>>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> >at java.lang.Thread.run(Thread.java:748)
>>> > Caused by: org.apache.flink.runtime.io.network.partition.
>>> > ProducerFailedException: org.apache.flink.util.FlinkException:
>>> JobManager
>>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .enqueueAvailableReader(PartitionRequestQueue.java:108)
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .userEventTriggered(PartitionRequestQueue.java:170)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeUserEventTriggered(
>>> > AbstractChannelHandlerContext.java:346)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > 

Re:回复:消息积压如何优化

2021-03-04 文章 Michael Ran



一般a->b->c->d->e->f 算子。如果f 跟不上,会导致abcde 出现被压,所以看被压前一个洛。还有就是看了解算子,根据经验














在 2021-03-05 14:39:40,"allanqinjy"  写道:
>你好,
>
> 消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sink的出去的量是第三方flink
>  ui上应该也看不到,这样如何排查是哪个算子的具体问题?你们是有什么好的方法吗?
>
>
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2021年03月5日 14:22,Michael Ran 写道:
>看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
>在 2021-03-05 11:05:14,"allanqinjy"  写道:
>
>
>hi,
>由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>


Re:消息积压如何优化

2021-03-04 文章 Michael Ran
看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
在 2021-03-05 11:05:14,"allanqinjy"  写道:
>
>
>hi,
>由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>


Re:flink TableEnvironment.sqlUpdate不支持update 多表关联更新吗

2021-03-03 文章 Michael Ran






SQL 也不能这样吧- -











At 2021-03-03 16:43:49, "JackJia"  wrote:
>Hi 诸位同仁:
>诸同仁好,flink TableEnvironment.sqlUpdate是不是不支持update 多表关联更新?
>
>
>如下代码:
>bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " 
>+
>" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
>" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
>报错如下:
>Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
>parse failed. Encountered "," at line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
>at 
>org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
>at com.sir.BatchMain.main(BatchMain.java:17)
>Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
>line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
>at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
>at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
>at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
>at 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
>... 4 more
>Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
>line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
>at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
>... 6 more


Re:回复:编译Flink1.11的flink-runtime-web失败

2021-03-02 文章 Michael Ran



flink-test-utils-junit 单独编译下。 缺什么编译什么就行














在 2021-03-03 10:57:27,"Natasha"  写道:
>hi Michael,
>我拉取flink 1.11 realse分支后,可以看到flink-runtime-web中的版本就是1.11-SNAPSHOT。
>
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2021年3月3日(星期三) 上午10:50
>收件人:"user-zh"
>主题:Re:编译Flink1.11的flink-runtime-web失败
>
>
>
>为什么还会依赖 -SNAPSHOT 的jar。不是release 的 的版本吗?
>
>
>
>在 2021-03-03 10:34:23,"Natasha" 
>hi,all
>我在编译Flink1.11,由于每次到flink-runtime-web都失败,于是我cd flink-runtime-web进行单独编译,发现
>Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT,
>Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT
>依赖一直无法下载下来。请问有好的解决方法吗?
> 感谢你们在百忙之中看到我的邮件!


Re:Re: flink 1.12.2-rc2 被挖矿

2021-03-02 文章 Michael Ran
网络层面 不会直接到公网才对,是开了什么吧?
在 2021-03-02 13:04:41,"macdoor"  写道:
>我不是安全专家,不知道如何才能确认是 flink 的问题,但从现象看跟之前 flink 1.10
>遇到的问题非常类似,建议你们能有这方面的测试用例,也能把测试结果提供出来
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:????Flink1.11??flink-runtime-web????

2021-03-02 文章 Michael Ran
?? -SNAPSHOT ??jar??release ?? ??



?? 2021-03-03 10:34:23??"Natasha"  ??

hi??all
Flink1.11flink-runtime-web??cd 
flink-runtime-web??
Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT??
Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT



Re:flink on k8s日志时间戳时区问题

2021-02-19 文章 Michael Ran
k8s  设置的
在 2021-02-19 09:37:28,"casel.chen"  写道:
>目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
>
>
>2021-02-19 01:34:21,259 INFO  akka.event.slf4j.Slf4jLogger 
>[] - Slf4jLogger started
>2021-02-19 01:34:22,155 INFO  akka.remote.Remoting 
>[] - Starting remoting
>2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger 
>started
>2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting


Re:对于 Flink 可撤回流做聚合,当上游数据撤回时,最终聚合值变小了,怎么解决这种问题。

2021-01-28 文章 Michael Ran
以前做过,自定义sink,更新值小于 存储值的时候不更新
在 2021-01-25 16:00:28,"LakeShen"  写道:
>Hi 社区,
>
>之前遇到一个问题,在 Flink 里面,对于一个数据流(能够撤回的)进行聚合时,当这个数据流有大量撤回时,最终聚合值会变小。这种情况之前有看到说加一个
>mini batch 来减轻这种情况的影响,还有别的方法能够解决这个问题吗?
>
>Best,
>LakeShen


Re:flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 文章 Michael Ran
特定的的map也是需要类型的,如果你在乎类型建议里面统一以字符串的udtf实现,后续再进行转换
在 2021-01-21 18:35:18,"Jeff"  写道:
>hi all,
>
>
>有没有什么办法可以将json转成map呢?类似于str_to_map函数。
>
>
>版本:flink 1.11 
>planner: blink sql
>
>
>需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, 
>UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。


Re:Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 Michael Ran
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
在 2021-01-21 18:45:06,"张锴"  写道:
>import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>DateTimeBucketer}
>
>sink.setBucketer sink.setWriter用这种方式试试
>
>
>
>赵一旦  于2021年1月21日周四 下午6:37写道:
>
>> @Michael Ran
>> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>>
>> Michael Ran  于2021年1月21日周四 下午5:23写道:
>>
>> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
>> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> > >具体报错信息如下:
>> > >
>> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>> are
>> > >only supported for HDFS
>> > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> > >HadoopRecoverableWriter.java:61)
>> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> > >.java:260)
>> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> >
>> >
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> > >at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> > >at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> > >.initializeState(AbstractStreamOperator.java:264)
>> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> > >at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> > >StreamTask.java:501)
>> > >at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> > >.java:531)
>> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> > >at java.lang.Thread.run(Thread.java:748)
>> > >
>> > >
>> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> > >
>> > >> Recoverable writers on Hadoop are only supported for HDFS
>> > >>
>> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> > >>
>> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> > >>
>> > >>
>> > >>
>> >
>>


Re:Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 Michael Ran
这里应该是用了hdfs 的特定API吧,文件系统没兼容public 
HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
在 2021-01-21 17:18:23,"赵一旦"  写道:
>具体报错信息如下:
>
>java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
>only supported for HDFS
>at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>HadoopRecoverableWriter.java:61)
>at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>.createRecoverableWriter(HadoopFileSystem.java:210)
>at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>.java:260)
>at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink.initializeState(StreamingFileSink.java:412)
>at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>.tryRestoreFunction(StreamingFunctionUtils.java:185)
>at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>.restoreFunctionState(StreamingFunctionUtils.java:167)
>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>.initializeState(AbstractUdfStreamOperator.java:96)
>at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>.initializeOperatorState(StreamOperatorStateHandler.java:107)
>at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>.initializeState(AbstractStreamOperator.java:264)
>at org.apache.flink.streaming.runtime.tasks.OperatorChain
>.initializeStateAndOpenOperators(OperatorChain.java:400)
>at org.apache.flink.streaming.runtime.tasks.StreamTask
>.lambda$beforeInvoke$2(StreamTask.java:507)
>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>.runThrowing(StreamTaskActionExecutor.java:47)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>StreamTask.java:501)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>.java:531)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>at java.lang.Thread.run(Thread.java:748)
>
>
>赵一旦  于2021年1月21日周四 下午5:17写道:
>
>> Recoverable writers on Hadoop are only supported for HDFS
>>
>> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>>
>> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>>
>>
>>


Re:答复: Re:Flink Jdbc sink写入多表如何实现

2021-01-19 文章 Michael Ran
通过key 分成不同 view ,然后不同的view 写不同的表可以吧
在 2021-01-19 17:59:54,"范超"  写道:
>我这边自己的做法是,根据不同的行为时间,将source拆流,然后一个事件对应一个sink
>Sink的逻辑处理都是一致的,只不过配置的表不同。
>
>-邮件原件-
>发件人: hailongwang [mailto:18868816...@163.com] 
>发送时间: 2020年11月3日 星期二 0:21
>收件人: user-zh@flink.apache.org
>主题: Re:Flink Jdbc sink写入多表如何实现
>
>Hi,
>目前JDBC sink 不支持分库分表,只能自己实现一个 Sink。具体实现的话,即使 insert Statement 需要在 
> writeRecord 阶段根据你的数据的 key 进行生成。
>其中还需要将 key 和 statement 的对应关系缓存起来。
>之前我内部的版本也支持了这个需求,因为后来在 DB 层面支持分库分表,所以在升级版本时候去掉了。(个人觉得这个是不是 DB 层面应该支持的?)
>
>
>Best,
>Hailong  Wang
>
>在 2020-10-30 15:10:33,"张健"  写道:
>>hi,
>>
>>
>>我现在有个需求是一个source进来,然后根据数据里的字段 分发写入 
>>clickhouse多张表中(比如用户行为事件,上报进来有多种,不同的事件写入不同的表中),我想用Flink的Jdbc 
>>sink来做,但看接口貌似不能直接实现这样的功能。
>>
>>
>>这个是要自定义一个sink来做嘛?有没有什么简单的实现方式?
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>张健


Re:Re:Re:flinksql 消费kafka offset问题

2021-01-14 文章 Michael Ran
额,不用checkpoint 会比较麻烦。 以前自定义sink 的时候,会把消息信息到sink 位置进行提交。 上游source 
也得改造,拉取位置也得统一,比如走redis 数据库等等
在 2021-01-15 12:41:25,"air23"  写道:
>我的意思 是不使用checkpoint。
>使用'scan.startup.mode' = 'group-offsets' 去维护offset
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-01-15 11:35:16,"Michael Ran"  写道:
>>下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink 
>>是异步操作,告诉上游你sink 完成了,实际你sink失败了
>>在 2021-01-15 10:29:15,"air23"  写道:
>>>flink消费kafka 只能使用checkpoint去维护offset吗
>>>
>>>我这边使用'scan.startup.mode' = 'group-offsets'
>>>
>>>如果中间报错了 或者停止任务,但是我下游sink还没有完成,
>>>下次启动直接跳过这个报错的数据,会丢数据,谢谢回复


Re:flinksql 消费kafka offset问题

2021-01-14 文章 Michael Ran
下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink 
是异步操作,告诉上游你sink 完成了,实际你sink失败了
在 2021-01-15 10:29:15,"air23"  写道:
>flink消费kafka 只能使用checkpoint去维护offset吗
>
>我这边使用'scan.startup.mode' = 'group-offsets'
>
>如果中间报错了 或者停止任务,但是我下游sink还没有完成,
>下次启动直接跳过这个报错的数据,会丢数据,谢谢回复


Re:Re: 请问大神们flink-sql可以指定时间清除内存中的全部State吗?

2021-01-13 文章 Michael Ran
group by [时间字段]
我们也有类似场景,每天数据的时间是不一样的,这样不会导致今天的数据累加才对啊?

















在 2021-01-14 15:06:26,"Jark Wu"  写道:
>为啥不用天级别的tumble window? 自动就帮你清楚 state 了
>
>On Wed, 6 Jan 2021 at 13:53, 徐州州 <25977...@qq.com> wrote:
>
>> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStampcurrent_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new
>> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
>> |  select
>> |   TO_DATE(cast(doi.DeliveryTime as
>> String),'-MM-dd') as  days,
>> |   doi.UserId,
>> |   count(doi.Code) as   SendTime,
>> |   sum(doi.PayAmount / 100) as SendCashcharge,
>> |   sum(doi.PayAmount / 100 - ChargeAmount / 100 +
>> UseBalance / 100) as  SendCashuse,
>> |   sum(doi.CashMoney / 100)as  SendCash
>> |from dwd_order_info doi
>> |where doi.DeliveryTime cast(current_date AS
>> TIMESTAMP) and doi.OrderType = 29 and doi.Status = 50 and doi.Status
>> < 60
>> |group by TO_DATE(cast(doi.DeliveryTime as
>> String),'-MM-dd'), doi.UserId


Re:Re: flink sql 更新mysql字段

2020-10-19 文章 Michael Ran
我们是自定义SQL。  但是不同SQL 更新部分字段,会有锁冲突,是能单条减少冲突量。 批量会死锁
在 2020-09-28 21:36:11,"Leonard Xu"  写道:
>Hi
>
>Insert 到指定字段是个通用的需求,社区已经有一个issue[1] 在跟踪了,你可以关注下
>
>
>祝好
>Leonard
>[1] https://issues.apache.org/jira/browse/FLINK-18726 
> 
>
>> 在 2020年9月28日,17:46,lemon  写道:
>> 
>> hi各位:
>> 请问一下,如果mysql表中有20个字段,现在有多个insert into的语句分别更新指定字段,即同一条记录可能有多个insert语句去分别更新不同字段
>> 现在遇到的问题是,因为在insert into语句中需要将mysql中所有字段都带上,所以更新会覆盖其他字段的值。
>> 例如insert into mysql select a,b c from 
>> kafka,但是我只要更新a,b字段,c字段想保持原来的值,请问这种情况需要怎么操作?
>> flink1.10.1版本 blink
>
>


Re:Re: flink on yarn容器异常退出

2020-10-19 文章 Michael Ran
看看是不是任务出错,或者内存超了
在 2020-10-13 14:12:08,"Dream-底限"  写道:
>hi
>先前我是一个container申请四个core,经常遇到分配完applicationid然后任务处于standby后就挂掉了,现在申请四个container,每个container一个core后正常启动任务了
>
>Congxian Qiu  于2020年10月13日周二 下午1:12写道:
>
>> Hi
>> 容易异常退出是指 container 退出吗?可以看下 JM/TM log 是否有相应信息,如果没有,可以尝试从 yarn 侧看下日志为什么
>> container 退出了
>> Best,
>> Congxian
>>
>>
>> caozhen  于2020年10月12日周一 下午6:08写道:
>>
>> >
>> > 可以发下 "分配完applicationid后,容器经常异常退出"  产生的错误日志吗?
>> >
>> > 或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。
>> >
>> > 
>> >
>> > Dream-底限 wrote
>> > > hi
>> > > 我正在使用flink1.11.1 on
>> > >
>> >
>> yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>


Re:??????flink sql count distinct??????????

2020-10-19 文章 Michael Ran
 
?? 2020-10-19 08:03:46??"??"  ??
>??user_id
>
>
>
>| |
>??
>|
>|
>??xiongyun...@163.com
>|
>
>??  
>
>??2020??10??17?? 16:24??867127831 ??
>??flink sqldaugroupbycount distinct 
>user_id??table.optimizer.distinct-agg.split.enabled=true
>job??mysql
>
>
>  
> 
>2020-10-10 19:00:00 100
>2020-10-10 19:00:02 98
>2020-10-10 19:00:04 102
>2020-10-10 19:00:06 108
>2020-10-10 19:00:08 106
>2020-10-10 19:00:10 110
>
>
>sql??
>create table jdbc_sink(
>  date_str varchar ,
>  dau bigint,
>  PRIMARY KEY (date_str) NOT ENFORCED
>) with (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://xxx',
> 'table-name' = 'xxx',
> 'driver' = 'com.mysql.jdbc.Driver',
> 'username' = 'xxx',
> 'password' = 'xxx'
>);
>
>
>CREATE TABLE action_log_source (
> user_id varchar,
> event_time TIMESTAMP(3),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>) with (
> ...
>);
>
>
>INSERT INTO
> jdbc_sink
>SELECT
> day_str as date_str,
> COUNT(DISTINCT user_id) AS dau
>FROM (
> select
>   user_id as user_id,
>   date_format(event_time, '-MM-dd') as day_str
> from action_log_source
>)
>GROUP BY day_str


Re:Re: checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-10-09 文章 Michael Ran
好的,非常感谢。
在 2020-09-29 14:23:10,"王冶"  写道:
>Hi~ 按你的问题顺序回答如下:
> 1. Flink中的RocksDB是支持保存到hdfs的,且支持的非常好,将rocksdb的存储路径设置为hdfs路径即可。
> 2.
>in-flight的数据是保存在本地磁盘的,仅当checkpoint的时候,才会将本地的状态拷贝到hdfs。而且checkpoint本身不会因为远程拷贝影响计算速度。
> 3.
>多久备份一次,取决于你配置的checkpoint的间隔。每次checkpoint都会备份&远程拷贝。但请注意,默认配置下checkpoint会在作业停止后删除,这时候你需要手动触发savepoint,你当然也可以在作业运行过程中出发保存savepoint,savepoint的好处是不会随作业停止而删除,且可以让新作业基于savepoint启动,从而实现exactly-once或at-least的语义。
> 4. Flink提供多种状态后端,需要根据你的实际场景选择。但对于大状态和高可用场景,推荐rocksdb。具体的推荐还是多读下文档。
>
>文档:
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#rocksdb-state-backend-details
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/large_state_tuning.html
>
>祝好,
>By Soda
>
>
>On Tue, 29 Sep 2020 at 11:06, Michael Ran  wrote:
>
>> dear all :
>> 我们checkpoint 信息默认保存在rocksdb上,但是rocksdb
>> 是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。
>>
>>
>>问题是:
>>1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
>>2. 如果仅保存hdfs,那么性能可能跟不上
>>3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
>>4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的


Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 文章 Michael Ran
不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 速度本来就快不起来,每次都要生成文件。  
如果先写文件,文件写好了再进行一次load  就会快很多
在 2020-10-09 15:55:15,"Jingsong Li"  写道:
>Hi,
>是Hive表吧?
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的
>
>可以下载最新的1.11分支的Hive依赖来试下:
>https://repository.apache.org/snapshots/org/apache/flink/
>(比如你用hive-1.2.2依赖,你可以下载
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
>)
>
>Best,
>Jingsong
>
>On Fri, Oct 9, 2020 at 3:50 PM me  wrote:
>
>> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>>
>>
>>  原始邮件
>> 发件人: me
>> 收件人: user-zh
>> 发送时间: 2020年10月9日(周五) 15:34
>> 主题: flink1.11流式写入hive速度慢的问题
>>
>>
>> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
>> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
>> chaitin_test.printSchema()
>> tableEnv.executeSql("insert into chaitin_test select test from " +
>> chaitin_test)
>
>
>
>-- 
>Best, Jingsong Lee


Re:Re: Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-29 文章 Michael Ran
~.~ 不是有几百个star 嘛。海豚 这个到apache 社区会强大些
在 2020-09-29 16:45:30,"赵一旦"  写道:
>看了下,hera算了,虽然文档看起来还行,但是5个star,不敢用。
>海豚这个看起来还不错,可以试试看。
>
>Michael Ran  于2020年9月29日周二 上午10:43写道:
>
>> ~。~ hera、海豚都行
>> 在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com" <
>> chengyanan1...@foxmail.com> 写道:
>> >
>> >Apache DolphinScheduler 你值得拥有
>> >
>> >https://dolphinscheduler.apache.org/zh-cn/
>> >
>> >
>> >
>> >发件人: 赵一旦
>> >发送时间: 2020-09-28 20:47
>> >收件人: user-zh
>> >主题: Re: 了解下大家生产中都用什么任务调度系统呢
>> >感觉ooize成熟但不想用,xml写起来难受。
>> >azkaban也需要单独上传。
>> >
>> >我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
>> >
>> >孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
>> >
>> >> Airflow  oozie
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 发自我的iPhone
>> >>
>> >>
>> >> -- 原始邮件 --
>> >> 发件人: 赵一旦 > >> 发送时间: 2020年9月28日 19:41
>> >> 收件人: user-zh > >> 主题: 回复:了解下大家生产中都用什么任务调度系统呢
>> >>
>> >>
>> >>
>> >> 主要是指开源的调度系统。
>> >>
>> >> 公司有个系统感觉经常挂,想换个开源的自己搭建。
>> >> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
>> >> (2)在生产中长期应用,稳定,能满足大多数需求的。
>> >>
>> >> 希望大家推荐下。
>>


checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-09-28 文章 Michael Ran
dear all :
我们checkpoint 信息默认保存在rocksdb上,但是rocksdb 
是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。


   问题是:
   1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
   2. 如果仅保存hdfs,那么性能可能跟不上
   3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
   4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的

Re:Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-28 文章 Michael Ran
~。~ hera、海豚都行
在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com"  
写道:
>
>Apache DolphinScheduler 你值得拥有
>
>https://dolphinscheduler.apache.org/zh-cn/
>
>
> 
>发件人: 赵一旦
>发送时间: 2020-09-28 20:47
>收件人: user-zh
>主题: Re: 了解下大家生产中都用什么任务调度系统呢
>感觉ooize成熟但不想用,xml写起来难受。
>azkaban也需要单独上传。
> 
>我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
> 
>孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
> 
>> Airflow  oozie
>>
>>
>>
>>
>>
>> 发自我的iPhone
>>
>>
>> -- 原始邮件 --
>> 发件人: 赵一旦 > 发送时间: 2020年9月28日 19:41
>> 收件人: user-zh > 主题: 回复:了解下大家生产中都用什么任务调度系统呢
>>
>>
>>
>> 主要是指开源的调度系统。
>>
>> 公司有个系统感觉经常挂,想换个开源的自己搭建。
>> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
>> (2)在生产中长期应用,稳定,能满足大多数需求的。
>>
>> 希望大家推荐下。


Re:Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-28 文章 Michael Ran
有主键吗?  有的话不会触发delete 才对
在 2020-09-28 15:54:49,"Leonard Xu"  写道:
>
>
>> 在 2020年9月15日,16:52,LittleFall <1578166...@qq.com> 写道:
>> 
>> 谢谢,请问有相关的 issue 链接吗
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>To @LItteFall :
>
>没有对应的issue,因为是在修复changlog 
>issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。
>
>To @Michael Ran:
>
>update 怎么触发的 delete 哦?
>
>LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 
>格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 
>对应的处理会生成两条sql, 一条delete和一条insert. 
> 
>
>祝好
>Leonard
>[1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues
> 
><https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues>
> 


Re:Re:HistoryServer完成任务丢失的问题

2020-09-27 文章 Michael Ran
你的意思是,日志彻底消失了?完全找不到? 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的
在 2020-09-27 17:03:45,"xiao cai"  写道:
>是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。
>问题是cancel的那次job,并没有上传日志信息到归档目录里。
>
>
> 原始邮件 
>发件人: Michael Ran
>收件人: user-zh
>发送时间: 2020年9月27日(周日) 16:45
>主题: Re:HistoryServer完成任务丢失的问题
>
>
>history 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai"  写道: 
>>Hi: >flink 1.11.0 
>>我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history
> server中却找不到这个任务。同时我尝试了再yarn中kill 
>application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
>server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.


Re:??????flink sql count????

2020-09-27 文章 Michael Ran
??count     ??  having xxx  ,filter
?? 2020-09-27 17:01:06??"zya"  ??
>??sum??mysql??
>
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2020??9??27??(??) 4:59
>??:"user-zh"
>:Re:flink sql count
>
>
>
>??null 0?? ?? sum(if(name like '%',1 , 0))
>?? 2020-09-27 16:53:56??"zya" ??
>sqlcountcountcount
>??hive??count(if(name like '%',1 , null))??flink 
>sql??count??null
>flink1.10.1 blink
>nbsp;


Re:flink sql count????

2020-09-27 文章 Michael Ran
??null 0??  ??   sum(if(name like '%',1 , 0))
?? 2020-09-27 16:53:56??"zya"  ??
>??
>sqlcountcountcount
>??hive??count(if(name like '%',1 , null))??flink 
>sql??count??null
>flink1.10.1 blink
>


Re:HistoryServer完成任务丢失的问题

2020-09-27 文章 Michael Ran
history  记得是定时拉取的,有延迟过去
在 2020-09-27 16:40:27,"xiao cai"  写道:
>Hi:
>flink 1.11.0
>我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history 
>server中却找不到这个任务。同时我尝试了再yarn中kill 
>application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
>server又能看到。希望了解history serve相关原理的同学给予帮助。
>非常感谢。
>
>
>
>
>best,
>xiao.


Re:Flink SQL如何设置checkpoint的TTL

2020-09-27 文章 Michael Ran
table api 
有类似:tableEnv.getConfig().setIdleStateRetentionTime(Time.days(1), 
Time.days(2));
在 2020-09-27 15:47:11,"xiao cai"  写道:
>Hi:
>目前想了解下载Flink SQL下该如何设置checkpoint的TTL。
>非常感谢指教
>
>
>Best,
>xiao.


Re:Re: flink多流关联

2020-09-27 文章 Michael Ran
一般有个等待时间,比如join不上,你设置等待1小时没来,就xxx...
在 2020-09-27 15:30:58,"Dream-底限"  写道:
>hi、
>我想问一下就是多个流关联输出的时间点是所有维度全部关联上了才输出吗,比如abcd四个流,abc关联上了但这时d的数据还没有到,这个是不触发输出的吧
>
>Michael Ran  于2020年9月27日周日 下午2:38写道:
>
>> 会有相互等待,或者先后不一致的情况,要业务自己衡量等待时间
>> 在 2020-09-27 12:09:23,"Dream-底限"  写道:
>> >hi
>>
>> >我们这面想用flink在实时数仓上面做些事情,进行调研后发现数仓可能有多张表进行join、现flink有两种实现方案:第一种是流表lookup时态表,但时态表更新可能会延迟导致查询结果不准确,并且io可能过大;第二种是双流关联,但是如果说有五张表进行join的话,除了状态太大还有其他问题吗,或者说有多流相互等待的问题吗
>>


Re:Re:Re: Re: Flink SQL撤回流问题

2020-09-27 文章 Michael Ran
感觉这不是flink的问题,我们也有类似场景,dt 按天其实并不多,直接就没要id了,如果你非要id,而且数量变化巨大,那么用integer 
,当然还是有可能超。 auto  一般适用数据量不大的单表场景。分布式大数据量场景,都是自己设计id,或者不要id
在 2020-09-27 14:56:06,"xiao cai"  写道:
>Hi Ran:
>非常感谢,我试了insert into ON DUPLICATE KEY UPDATE 
>dt=“dt"的方式,确实是会出现update的始终是id=1,但是auto_increment 
>却一直增加的情况。感觉这样不是很合理,因为随着数据量的增加,迟早会出现数值越界的情形。
>
>
> 原始邮件 
>发件人: Michael Ran
>收件人: user-zh
>发送时间: 2020年9月27日(周日) 14:37
>主题: Re:Re: Re: Flink SQL撤回流问题
>
>
>没有传入id,始终是1 ? 那就是第一次insert update 之后,生成的1.后面都是insert into 
>table(dt,num) values(dt,新数量) ON DUPLICATE KEY UPDATE 
>dt=values(dt)你模拟下这个语句呢,看看id成为1 之后,是不是就不变了 在 2020-09-27 
>14:32:57,"xiao cai"  写道: >Hi lec ssmi: > 
>insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。 > 原始邮件 
>>发件人: lec ssmi >收件人: 
>flink-user-cn >发送时间: 2020年9月27日(周日) 14:25 >主题: Re: 
>Re: Flink SQL撤回流问题 > > >你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 
>kandy.wang  于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 
>你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT 
>ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 
> 写道: > > 
>>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 
>> >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > 
>>收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > 
>>主题: Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 
>而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > 
>source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > 
>insert into sink > > select dt,count(distinct id) from source group by dt; > > 
>> > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > 
>show > create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 
>原始邮件 > > 发件人: Michael Ran > 收件人: user-zh< > 
>user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > 
>SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  > 
>写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 
>越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >


Re:Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-27 文章 Michael Ran
额,不是5分钟窗口,10秒一个步长往前滑动吗? 我以为滚动是5分钟窗口 5分钟一输出呢。。
在 2020-09-27 14:43:57,"赵一旦"  写道:
>不是滑动窗口哈。是滚动窗口,每10秒触发一次输出。滑动窗口的化逻辑就变了。
>
>Michael Ran  于2020年9月27日周日 下午2:39写道:
>
>> 滑动窗口
>> 在 2020-09-27 13:25:37,"赵一旦"  写道:
>> >如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?
>>


Re:FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-27 文章 Michael Ran
滑动窗口
在 2020-09-27 13:25:37,"赵一旦"  写道:
>如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?


Re:flink多流关联

2020-09-27 文章 Michael Ran
会有相互等待,或者先后不一致的情况,要业务自己衡量等待时间
在 2020-09-27 12:09:23,"Dream-底限"  写道:
>hi
>我们这面想用flink在实时数仓上面做些事情,进行调研后发现数仓可能有多张表进行join、现flink有两种实现方案:第一种是流表lookup时态表,但时态表更新可能会延迟导致查询结果不准确,并且io可能过大;第二种是双流关联,但是如果说有五张表进行join的话,除了状态太大还有其他问题吗,或者说有多流相互等待的问题吗


Re:Re: Re: Flink SQL撤回流问题

2020-09-27 文章 Michael Ran
没有传入id,始终是1 ? 那就是第一次insert  update 之后,生成的1.后面都是insert into 
table(dt,num) values(dt,新数量)  ON DUPLICATE KEY UPDATE 
dt=values(dt)你模拟下这个语句呢,看看id成为1 之后,是不是就不变了
在 2020-09-27 14:32:57,"xiao cai"  写道:
>Hi lec ssmi:
>  insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。
> 原始邮件 
>发件人: lec ssmi
>收件人: flink-user-cn
>发送时间: 2020年9月27日(周日) 14:25
>主题: Re: Re: Flink SQL撤回流问题
>
>
>你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 kandy.wang 
> 于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 
>你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT 
>ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 
> 写道: > > 
>>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 
>> >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > 
>>收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > 
>>主题: Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 
>而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > 
>source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > 
>insert into sink > > select dt,count(distinct id) from source group by dt; > > 
>> > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > 
>show > create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 
>原始邮件 > > 发件人: Michael Ran > 收件人: user-zh< > 
>user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > 
>SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  > 
>写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 
>越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >


Re:Re:Re: Flink SQL撤回流问题

2020-09-27 文章 Michael Ran
其实你可以把id 
字段干掉,如果自增。如果dt是你的主键,那么第一条数据插入是(1,dt,数量)。第二条内存统计的时候是(1,dt,新数量)那么生成的upsert
 语句是 insert into table(id,dt,num) values(1,dt,新数量)  ON DUPLICATE 
KEY UPDATE dt=values(dt) 这样发现1是你写死的。dt 
不变。但是数量发生了改变。mysql逻辑处理的时候,(猜测是删除后插入replace等,或者能触发auto 的动作), 
也就是id,dt 不变,数量变的逻辑
在 2020-09-27 14:13:21,"xiao cai"  写道:
>Hi kandy.wang:
>忘记说明,我指定了dt为primary 
>key,按理说会按照dt做update,但是为何auto_increment会不断的变大呢,而id也没有变化,id字段值始终为1。还望解惑。
>
>
> 原始邮件 
>发件人: kandy.wang
>收件人: user-zh
>发送时间: 2020年9月27日(周日) 14:01
>主题: Re:Re: Flink SQL撤回流问题
>
>
>hi 你建mysql要指定主键,另外创建flink表时也要指定一下主键 PRIMARY KEY (id) NOT 
>ENFORCED,这样就会根据主键upsert了 在 2020-09-27 13:36:25,"xiao cai"  
>写道: 
>>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 
>>这是我很困惑的地方。 > > > 原始邮件 >发件人: lec ssmi >收件人: 
>flink-user-cn >发送时间: 2020年9月27日(周日) 13:06 >主题: Re: 
>Flink SQL撤回流问题 > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 
>xiao cai  于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: 
>kafka > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink 
>> select dt,count(distinct id) from source group by dt; > > > 
>这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show 
>create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 
>发件人: Michael Ran > 收件人: user-zh 
>> 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 
>2020-09-27 11:48:36,"xiao cai"  写道: >Hi: > >使用Flink 
>SQL撤回流写入MySQL,表的auto_increment > 
>越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。


Re:Flink SQL撤回流问题

2020-09-26 文章 Michael Ran
详细场景描述下呢
在 2020-09-27 11:48:36,"xiao cai"  写道:
>Hi:
>使用Flink SQL撤回流写入MySQL,表的auto_increment 
>越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。


Re:Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-26 文章 Michael Ran
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <1578166...@qq.com> 写道:
>Flink 版本:
>flink:1.11.1-scala_2.12
>连接器
>mysql-connector-java-8.0.21
>flink-sql-connector-kafka_2.12-1.11.1
>flink-connector-jdbc_2.12-1.11.1
>
>Flink SQL:
>
>CREATE TABLE source_user_name (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector' = 'kafka',   
>'topic' = 'test.username',
>'properties.bootstrap.servers' = 'kafka:9092',
>'properties.group.id' = 'test_flink_name_group',
>'format'='canal-json',
>'scan.startup.mode' = 'group-offsets'
>);
>
>CREATE TABLE test_flink_name_sink (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector.type' = 'jdbc',
>'connector.url' =
>'jdbc:mysql://host.docker.internal:3306/test?=true',
>'connector.table' = 'username',
>'connector.driver' = 'com.mysql.cj.jdbc.Driver',
>'connector.username' = 'root',
>'connector.password' = '',
>'connector.write.flush.max-rows' = '5000',
>'connector.write.flush.interval' = '1s'
>);
>
>insert into test_flink_name_sink (loan_no,name)
>select loan_no,name from source_user_name;
>
>
>外部 sql:
>
>CREATE TABLE username (
>loan_no int PRIMARY KEY,
>name varchar(10)
>);
>
>insert into username values (1,'a');
>
>架构是 mysql-canal-kafka-flink-mysql
>
>同时执行(一次输入两行)
>
>UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
>UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
>发现目标数据库中结果丢失,结果稳定复现。
>
>分析原因:
>
>```
>上游一个update下游会落地两个sql
>1.insert into after value
>2.delete before value
>而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
>如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
>这个时候就会触发问题
>insert batch结束之后数据变成了id:1,name:a
>再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
>第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
>```
>
>换成新版 JDBC 配置之后没有这个问题。
>
>请问这是已经发现的问题吗?有没有 issue 号
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-26 文章 Michael Ran
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <1578166...@qq.com> 写道:
>Flink 版本:
>flink:1.11.1-scala_2.12
>连接器
>mysql-connector-java-8.0.21
>flink-sql-connector-kafka_2.12-1.11.1
>flink-connector-jdbc_2.12-1.11.1
>
>Flink SQL:
>
>CREATE TABLE source_user_name (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector' = 'kafka',   
>'topic' = 'test.username',
>'properties.bootstrap.servers' = 'kafka:9092',
>'properties.group.id' = 'test_flink_name_group',
>'format'='canal-json',
>'scan.startup.mode' = 'group-offsets'
>);
>
>CREATE TABLE test_flink_name_sink (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector.type' = 'jdbc',
>'connector.url' =
>'jdbc:mysql://host.docker.internal:3306/test?=true',
>'connector.table' = 'username',
>'connector.driver' = 'com.mysql.cj.jdbc.Driver',
>'connector.username' = 'root',
>'connector.password' = '',
>'connector.write.flush.max-rows' = '5000',
>'connector.write.flush.interval' = '1s'
>);
>
>insert into test_flink_name_sink (loan_no,name)
>select loan_no,name from source_user_name;
>
>
>外部 sql:
>
>CREATE TABLE username (
>loan_no int PRIMARY KEY,
>name varchar(10)
>);
>
>insert into username values (1,'a');
>
>架构是 mysql-canal-kafka-flink-mysql
>
>同时执行(一次输入两行)
>
>UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
>UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
>发现目标数据库中结果丢失,结果稳定复现。
>
>分析原因:
>
>```
>上游一个update下游会落地两个sql
>1.insert into after value
>2.delete before value
>而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
>如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
>这个时候就会触发问题
>insert batch结束之后数据变成了id:1,name:a
>再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
>第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
>```
>
>换成新版 JDBC 配置之后没有这个问题。
>
>请问这是已经发现的问题吗?有没有 issue 号
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink canal-json 如何获取每条数据是updata insert delete

2020-09-26 文章 Michael Ran
我们也想用,能暴露一个出来不- -
在 2020-09-25 10:39:12,"Jark Wu"  写道:
>如果要抽取对应的 type,需要用 format = json, 把 json 的完整数据结构在 DDL 中声明出来(包括 type)。
>
>目前 canal-json 是自动将 changelog 转成了 Flink 的 insert/update/delete,而这个 change
>flag 目前是不对用户暴露的。
>
>Best,
>Jark
>
>On Fri, 25 Sep 2020 at 09:39, air23  wrote:
>
>> 你好
>> flink canal-json 如何获取每条数据是updata insert delete   ,我ddl插件kafka表 用对应的type取
>> 都是为null
>> 这个操作类型 有办法取到吗?谢谢
>>
>>


Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill

2020-08-12 文章 Michael Ran
Connection timed out: 
nb-bdh-hadoop.slave14/10.10.5.14:21913是不是连接超时,请求没发送过去?
在 2020-08-12 13:57:52,"魏烽"  写道:
>各位好:
>
>Flink1.10.0版本发现偶发任务失败后但是web ui仍然挂着该任务并没有停止,日志如下:
>
>请问有遇到一样的情况的嘛
>
>[INFO] 2020-07-28 16:34:00.938  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>2020-07-28 16:33:52,863 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Submitting application master application_1571540269403_52656
>
>2020-07-28 16:33:52,895 INFO  
>org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
>application application_1571540269403_52656
>
>2020-07-28 16:33:52,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Waiting for the cluster to be allocated
>
>2020-07-28 16:33:52,897 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Deploying cluster, current state ACCEPTED
>
>2020-07-28 16:34:00,938 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - YARN application has been deployed successfully.
>
>[INFO] 2020-07-28 16:36:04.622  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>2020-07-28 16:34:00,941 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Found Web Interface nb-bdh-hadoop.slave14:21913 of application 
>'application_1571540269403_52656'.
>
>Job has been submitted with JobID 54ac0f3db08f29022d9f3d51d797a724
>
>[INFO] 2020-07-28 16:36:04.623  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>
>
>The program finished with the following exception:
>
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: 
>org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>complete the operation. Number of retries has been exhausted.
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>
>at 
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
>Caused by: java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>complete the operation. Number of retries has been exhausted.
>
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
>at 
>org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
>at 
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
>at 
>org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:678)
>
>at com.nequal.bdh.cdp.IDMappingLauncher$.main(IDMappingLauncher.scala:140)
>
>at com.nequal.bdh.cdp.IDMappingLauncher.main(IDMappingLauncher.scala)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at 
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>at 
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.java:498)
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
>... 11 more
>
>Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
>Could not complete the operation. Number of retries has been exhausted.
>
>at 
>org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
>
>at 
>java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
>at 
>java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
>at 
>java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>at 
>java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>
>at 
>org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
>
>at 
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
>
>at 

Re:flink????kafka??????????

2020-07-30 文章 Michael Ran
checikpoint state complate ??
?? 2020-07-30 10:56:52??"??" <201782...@qq.com> ??
>flink1.11kafkagroup 
>offset??kafka tooloffset??flink


Re:Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-22 文章 Michael Ran
1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,2.with properties属性很重要 
,关系我自定义的一些参数设定。3.关于  catalog 这个东西,是不是只有1.11 版本才能从catalog  获取  with 
properties 哦? 1.10 you  有支持吗
在 2020-07-22 18:22:22,"godfrey he"  写道:
>tableEnv 中 可以通过
>tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
>如果要拿到properties,可以通过catalog的接口得到 [1]。
>如果要自定义实现source/sink,可以参考 [2]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html
>
>Best,
>Godfrey
>
>
>
>
>
>Michael Ran  于2020年7月22日周三 下午4:10写道:
>
>> dear all:
>>  我用flink 注册一张表:
>>   CREATE TABLE dim_mysql (
>> id int,  --
>> type varchar --
>> ) WITH (
>> 'connector' = 'jdbc',
>> 'url' = 'jdbc:mysql://localhost:3390/test',
>> 'table-name' = 'flink_test',
>> 'driver' = 'com.mysql.cj.jdbc.Driver',
>> 'username' = '',
>> 'password' = '',
>> 'lookup.cache.max-rows' = '5000',
>> 'lookup.cache.ttl' = '1s',
>> 'lookup.max-retries' = '3'
>> )
>> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
>> 以及属性,map 这种。
>> 我看阿里官方有blink 支持自定义sink:
>> publicabstractclassCustomSinkBaseimplementsSerializable{
>> protectedMap userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
>> protectedSet primaryKeys;// 您定义的主键字段名
>> protectedList headerFields;// 标记为header的字段列表
>> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
>> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑


Re:flink stream如何为每条数据生成自增主键

2020-07-22 文章 Michael Ran
id 生成器吧
在 2020-07-22 15:51:44,"tiantingting5...@163.com"  写道:
>
>flink stream如何为每条数据生成自增主键??时间戳貌似不行,同一时间戳可能会产生多条数据,无法区分数据的现后顺序。
>
>
>tiantingting5...@163.com


  1   2   >