回复:flink checkpoint配置hdfs问题

2019-12-24 文章 唐军亮
不要写地址 写nameservice

--原始邮件--
发件人:"LJY "

flink checkpoint配置hdfs问题

2019-12-24 文章 LJY
hi,各位好:
hdfs目前是配置了高可用, namenode分别为 nn1(active),nn2(standby)
flink-conf.yaml中的配置如下:
state.checkpoints.dir: hdfs://nn1的ip:8020/flink/flink-checkpoints/
然后发布一个job,等checkpoint生成后,把 nn1 namenode 杀掉。
nn2会自动切换成active,nn1无法访问。
job就会不停的restart。
   日志显示 failed on connection exception: java.net.ConnectException: Connection 
refused; 无法连接 nn1。
   请问checkpoint关于hdfs的高可用是如何配置的,state.checkpoints.dir 应该怎么写。
   



签名由 网易邮箱大师 定制

实现一个两阶段提交的ETL,数据从kafka到mysql,遇到的问题

2019-12-24 文章 卢伟楠

项目简述:从kafka取数据,每10秒一批,sink到mysql中的ETL


环境相关信息
flink运行模式:local
mysql的global variables中wait_timeout=28800
mysql客户端mysql-connector-java版本5.1.42



报错
org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[na:1.8.0_191]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[na:1.8.0_191]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[na:1.8.0_191]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[na:1.8.0_191]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191]
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na]
at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na]
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
... 7 common frames omitted
Caused by: java.sql.SQLException: Could not retrieve transaction read-only 
status from server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at 
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230)
 ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
   

Re: using thin jar to replace fat jar on yarn cluster mode

2019-12-24 文章 Jingsong Li
Hi zjfplayer,

First, your user fat jar should not contains any flink dependents, them
should be "provided".
If your goal is to reduce the deployment time of task, and you are
currently using session mode, you can consider putting jars directly under
lib.

Best,
Jingsong Lee

On Mon, Dec 23, 2019 at 3:24 PM Rui Li  wrote:

> Hi,
>
> I think you can try specifying dependent jars with the -C option[1] when
> you submit the job, and see if that meets your needs.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#usage
>
> On Mon, Dec 23, 2019 at 10:09 AM zjfpla...@hotmail.com <
> zjfpla...@hotmail.com> wrote:
>
>> Hi,
>> Does flink on yarn support using thin jar to replace fat jar?
>> I don't want the jar of each flink task to have hundreds of MB. I
>> want to put all the dependent packages in a single directory,and then
>> the size of each flink task jar will be tens of KB.
>>
>> --
>> zjfpla...@hotmail.com
>>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: 关于Window ProcessFunction数据丢失问题

2019-12-24 文章 Dian Fu
window算子会丢弃迟到数据,可以看看是不是这个原因。如果是的话,可以调整一下watermark生成策略,适当增加点延迟,可以看一下:BoundedOutOfOrdernessTimestampExtractor

> 在 2019年12月25日,上午10:39,1530130567 <1530130...@qq.com> 写道:
> 
> 大佬们好:
> 
> 最近使用window+processfunction处理数据,并加了watermark延迟处理。
> 
> 发现当数据input到达一定峰值后就会出现数据丢失的现象,请问是processfunction处理能力不够导致的吗?
> 具体数据处理情况可看下图:(如图片无法显示,请打开此链接查看)
> https://www.imageoss.com/image/sTn2U
> 
> 看了一下metric,确实是recordsIn>recordsOut
> 代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
> 代码如下:
> .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
> .process(new ProcessWindowFunction @Override
> public void process(Integer integer, Context context, Iterable elements, Collector for (Row element : elements) {
> out.collect(element);
> }
> }
> })



Re: Flink1.9批任务yn和ys对任务的影响

2019-12-24 文章 Xintong Song
Hi faaron,
Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。
根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 TM
中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。
Flink 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
的managed memory无法满足算子的资源需求了。

Thank you~

Xintong Song



On Wed, Dec 25, 2019 at 11:09 AM faaron zheng  wrote:

> 跑tpcds的query1: flink run -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g
> -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm
> 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to
> ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱:
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制


Flink1.9批任务yn和ys对任务的影响

2019-12-24 文章 faaron zheng
跑tpcds的query1: flink run -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g -ytm 
30g 任务可以正常执行 flink run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm 60g -ytm 
30g 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to 
ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制

Re: 关于flink窗口是否正确关闭的问题

2019-12-24 文章 Jary Zhen
使用基于EventTime 的 watermark处理数据通常会碰到两这么两种情况:
1.  数据因为乱序,迟到严重,会被丢弃,这个可以查看Side Out API [1]
2.
 
数据产生的事件时间比当前系统时间大,我称之为“超自然数据”,比如当前系统时间是10:37:55,但数据产生的事件时间可能是10:38:55,那么一旦有这类数据到达,将会使窗口提前触发计算,导致正常数据被当做迟到数据,因而被丢弃,这个处理方式是在assignWaterMark
之前过滤掉。
3. 建议: 如果是简单的ETL,尽量不要用EventTime 来处理数据

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html

On Wed, 25 Dec 2019 at 09:57, 1530130567 <1530130...@qq.com> wrote:

> 大佬好:
> 我昨天看了一下metric,确实是recordsIn>recordsOut
> 代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
> 代码如下:
>
> .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
> .process(new ProcessWindowFunction @Override
> public void process(Integer integer, Context context, Iterable elements, Collector for (Row element : elements) {
> out.collect(element);
> }
> }
> })
>
>
>
>
> -- 原始邮件 --
> 发件人: "jingjing bai" 发送时间: 2019年12月24日(星期二) 晚上9:18
> 收件人: "user-zh"
> 主题: Re: 关于flink窗口是否正确关闭的问题
>
>
>
> 窗口不会提前关闭,请查看下metircs是否有数据丢弃,
>
>
> 1530130567 <1530130...@qq.com> 于2019年12月24日周二 下午8:46写道:
>
> > 各位大佬好:
> >   最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
> >  
> 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
> >   举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
> >  
> >
> 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
> >   ps:我加大了并行度还是不行


????Window ProcessFunction????????????

2019-12-24 文章 1530130567
??


window+processfunctionwatermark??


??inputprocessfunction??
??
https://www.imageoss.com/image/sTn2U

metricrecordsIn>recordsOut
??window??processfunction??filter??
??
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
.process(new ProcessWindowFunction

Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-24 文章 Xintong Song
这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。

Thank you~

Xintong Song



On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote:

> 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下:
>
> org.apache.flink.util.FlinkException: The assigned slot
> bae00218c818157649eb9e3c533b86af_11 was removed.
>         at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
>         at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
>         at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
>         at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at
> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at
> akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


?????? ????flink??????????????????????

2019-12-24 文章 1530130567

??metricrecordsIn>recordsOut
??window??processfunction??filter??
??
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
.process(new ProcessWindowFunction

The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-24 文章 hiliuxg
??slotremove???CPU??FULL
 GC??

org.apache.flink.util.FlinkException: The assigned slot 
bae00218c818157649eb9e3c533b86af_11 was removed.
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at 
akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: 关于flink窗口是否正确关闭的问题

2019-12-24 文章 jingjing bai
窗口不会提前关闭,请查看下metircs是否有数据丢弃,


1530130567 <1530130...@qq.com> 于2019年12月24日周二 下午8:46写道:

> 各位大佬好:
>   最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
>   我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
>   举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
>  
> 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
>   ps:我加大了并行度还是不行


Re: CEP匹配乱序数据的问题

2019-12-24 文章 jingjing bai
CEP的 sql中order by ,  不会因为乱序导致不匹配。
在api中没用过,可以看看是否有对应的api

qishang zhong  于2019年12月23日周一 下午9:37写道:

> HI,大家好。
>
> 咨询一个问题,flink-training-exercises练习的工程里面
> com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution
>
> Pattern completedRides =
> Pattern.begin("start")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(TaxiRide ride) throws Exception {
> return ride.isStart;
> }
> })
> .next("end")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(TaxiRide ride) throws Exception {
> return !ride.isStart;
> }
> });
>
> 现在有一个类似的监控场景,也是需要超时后输出没有匹配到的数据,但是流的数据有可能产生乱序。
> 是不是就不能匹配例子中的Pattern?
> 如果我想乱序的数据也要匹配上,不作为超时输出有什么对应的解决方案吗?
>


????flink??????????????????????

2019-12-24 文章 1530130567

  ??flink stream api??ETL
  
1??tumble??watermark??10s??
  topic 5000/s,??topic4000/s
  
processfunction
  ps

回复: Flink1.9.1,TableApi如何读取Kafka08Json的数据

2019-12-24 文章 Chennet Steven
Kafka的一个value如下
{
  "BillID": "230c95c6-346c-4070-9b49-b3bbbf6691db",
  "BillCode": "201912230300118165",
  "UptTimeMs": 157709148,
  "SOC": 0.86,
  "HighestVoltage": 4.01980926514
}

其中  "UptTimeMs": 157709148 是到1970的毫秒值

代码修改如下
private static void FunA() throws Exception {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment ste = StreamTableEnvironment.create(env);

Kafka kafka08 = new Kafka()
.version("0.8")
.topic("BDP-ChargingMinuteMetric")
.startFromEarliest()
.property("zookeeper.connect", "def:2182/ ")
.property("bootstrap.servers", "abc:9095")
.property("group.id", "abc");


Schema schema = new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new 
Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
.field("BillID", Types.STRING)
.field("SOC", Types.DOUBLE)
.field("HighestVoltage", Types.DOUBLE);


TypeInformation[] types = new TypeInformation[]{Types.SQL_TIMESTAMP, 
Types.STRING, Types.DOUBLE, Types.DOUBLE};
String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", 
"HighestVoltage"};
TypeInformation typeInformation = new RowTypeInfo(types, fieldNames);
FormatDescriptor formatDescriptor = new 
Json().failOnMissingField(true).schema(typeInformation).deriveSchema();


ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
Table table = ste.sqlQuery("select * from SourceTable");

DataStream rowDataStream = ste.toAppendStream(table, Row.class);
rowDataStream.print();

ste.execute("ABC");
}

提示错误
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
 at 
org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
 at com.teld.demo.Kafka08App.FunA(Kafka08App.java:75)
 at com.teld.demo.Kafka08App.main(Kafka08App.java:23)
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON 
object.
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to deserialize JSON object.
 at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
 at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
 at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
 at 
org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:374)
Caused by: java.time.format.DateTimeParseException: Text '157708584' could 
not be parsed at index 0
 at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
 at 
java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
 at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
 at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
 at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
 at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$