Re: Re: flink kryo exception

2021-02-07 Thread
yes, but I use stop not cancel, which also stop and cancel the job together.

Yun Gao  于2021年2月8日周一 上午11:59写道:

> Hi yidan,
>
> One more thing to confirm: are you create the savepoint and stop the job
> all together with
>
>  bin/flink cancel -s [:targetDirectory] :jobId
>
> command ?
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*赵一旦 
> *Send Date:*Sun Feb 7 16:13:57 2021
> *Recipients:*Till Rohrmann 
> *CC:*Robert Metzger , user 
> *Subject:*Re: flink kryo exception
>
>> It also maybe have something to do with my job's first tasks. The second
>> task have two input, one is the kafka source stream(A), another is
>> self-defined mysql source as broadcast stream.(B)
>> In A: I have a 'WatermarkReAssigner', a self-defined operator which add
>> an offset to its input watermark and then forward to downstream.
>> In B: The parallelism is 30, but in my rich function's implementation,
>> only the subtask-0 will do mysql query and send out records, other subtasks
>> do nothing. All subtasks will send max_watermark - 86400_000 as the
>> watermark.
>> Since both the first task have some self-defined source or
>> implementation, I do not know whether the problem have something to do with
>> it.
>>
>> 赵一旦  于2021年2月7日周日 下午4:05写道:
>>
>>> The first problem is critical, since the savepoint do not work.
>>> The second problem, in which I changed the solution, removed the 'Map'
>>> based implementation before the data are transformed to the second task,
>>> and this case savepoint works.  The only problem is that, I should stop the
>>> job and remember the savepoint path, then restart job with the savepoint
>>> path. And now it is : I stop the job, then the job failed and restart
>>> automatically with the generated savepoint.  So I do not need to restart
>>> the job anymore, since what it does automatically is what I want to do.
>>>
>>> I have some idea that maybe it is also related to the data? So I am not
>>> sure that I can provide an example to reproduces the problem.
>>>
>>> Till Rohrmann  于2021年2月6日周六 上午12:13写道:
>>>
>>>> Could you provide us with a minimal working example which reproduces
>>>> the problem for you? This would be super helpful in figuring out the
>>>> problem you are experiencing. Thanks a lot for your help.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:
>>>>
>>>>> Yeah, and if it is different, why my job runs normally.  The problem
>>>>> only occurres when I stop it.
>>>>>
>>>>> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>>>>>
>>>>>> Are you 100% sure that the jar files in the classpath (/lib folder)
>>>>>> are exactly the same on all machines? (It can happen quite easily in a
>>>>>> distributed standalone setup that some files are different)
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>>>>>>
>>>>>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>>>>>>
>>>>>>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
>>>>>>>> which can lead to corrupted data when using UC)
>>>>>>>> Can you tell us a little bit about your environment? (How are you
>>>>>>>> deploying Flink, which state backend are you using, what kind of job (I
>>>>>>>> guess DataStream API))
>>>>>>>>
>>>>>>>> Somehow the process receiving the data is unable to deserialize it,
>>>>>>>> most likely because they are configured differently (different 
>>>>>>>> classpath,
>>>>>>>> dependency versions etc.)
>>>>>>>>
>>>>>>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>>>>>>>
>>>>>>>>> I do not think this is some code related problem anymore, maybe it
>>>>>>>>> is some bug?
>>>>>>>>>
>>>>>>>>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>>>>>>>>
>>>>>>>>>

Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

2021-02-07 Thread
keyedStream? key不同可能是。

谌祖安  于2021年2月7日周日 下午6:00写道:

> 您好!
>
>
> 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
>  在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
> 请问是哪里写错了吗?   和   flink官网中 state.update(current);有什么不同吗?
>
> 以下为代码:
>  private MapState map;  //定义map
>   @Override
> public void processElement(MicsTrainPracticalDetail value, Context ctx,
> Collector out) throws Exception {
>
> MicsTrainPractical current = map.get(value.getTrainNumber());
>
>  System.out.println(map.isEmpty());  //
> 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
> Long departTime =
> DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
> Long arrivalTime =
> DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
> if (current == null) {   //如果map中没有获取到string相同的数据,则新建一条数据put进去
> MicsTrainPractical actual = new MicsTrainPractical();
> actual.setTrainId(value.getTrainNumber());
> actual.setCarId(value.getCarId());
> actual.setStartStationId(value.getStationId());
> actual.setStartPracticalArrivalTime(arrivalTime);
> actual.setEndPracticalArrivalTime(arrivalTime);
> actual.setStartPracticalDepartTime(departTime);
> actual.setEndPracticalDepartTime(departTime);
> actual.setEndStationId(value.getStationId());
> actual.setStopTime(0L);
> actual.setDs(value.getDs());
> actual.setIsInsert(true);
> actual.setTargetStationId(value.getTargetStationId());
> out.collect(actual);
> map.put(value.getTrainNumber(), actual);//向map中写入数据
> return;
> } else {   //如果map有获取到string相同的数据,则转换数据后写入map
>
> MicsTrainPractical actual = new MicsTrainPractical();
> actual.setTrainId(value.getTrainNumber());
> actual.setCarId(value.getCarId());
> actual.setStartStationId(current.getStartStationId());
> actual.setEndStationId(value.getStationId());
>
> actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
>
> actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
> actual.setEndPracticalArrivalTime(arrivalTime);
> actual.setEndPracticalDepartTime(departTime);
> actual.setStopTime(current.getStopTime() + Math.abs(departTime
> - arrivalTime));
> actual.setDs(value.getDs());
> actual.setIsInsert(false);
> actual.setTargetStationId(value.getTargetStationId());
> current.setEndStationId(actual.getEndStationId());
>
> current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
>
> current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
> current.setStopTime(actual.getStopTime());
> current.setIsInsert(actual.getIsInsert());
>
> MicsTrainSectionInfo trainSectionInfo = new
> MicsTrainSectionInfo();
> trainSectionInfo.setTrainId(actual.getTrainId());
> trainSectionInfo.setCarId(actual.getCarId());
> trainSectionInfo.setStartStationId(current.getEndStationId());
> trainSectionInfo.setEndStationId(actual.getEndStationId());
>
> trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
>
> trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());
>
> map.put(value.getTrainNumber(), actual);
> out.collect(actual);
> ctx.output(outputTagSectionFlow, trainSectionInfo);
> }
> }
>
>
>
>
> 谌祖安
> 智能轨道交通业务群 / 产品事业部 / 开发经理
> Intelligent RailTransportation BG /Development Manager
> 广东省广州市天河区新岑四路2号佳都智慧大厦
> PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District,
> Guangzhou, Guangdong
> E shenz...@pcitech.com
> M 86-18680458868
> www.pcitech.com
>


Re: flink kryo exception

2021-02-07 Thread
It also maybe have something to do with my job's first tasks. The second
task have two input, one is the kafka source stream(A), another is
self-defined mysql source as broadcast stream.(B)
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an
offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only
the subtask-0 will do mysql query and send out records, other subtasks do
nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation,
I do not know whether the problem have something to do with it.

赵一旦  于2021年2月7日周日 下午4:05写道:

> The first problem is critical, since the savepoint do not work.
> The second problem, in which I changed the solution, removed the 'Map'
> based implementation before the data are transformed to the second task,
> and this case savepoint works.  The only problem is that, I should stop the
> job and remember the savepoint path, then restart job with the savepoint
> path. And now it is : I stop the job, then the job failed and restart
> automatically with the generated savepoint.  So I do not need to restart
> the job anymore, since what it does automatically is what I want to do.
>
> I have some idea that maybe it is also related to the data? So I am not
> sure that I can provide an example to reproduces the problem.
>
> Till Rohrmann  于2021年2月6日周六 上午12:13写道:
>
>> Could you provide us with a minimal working example which reproduces the
>> problem for you? This would be super helpful in figuring out the problem
>> you are experiencing. Thanks a lot for your help.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:
>>
>>> Yeah, and if it is different, why my job runs normally.  The problem
>>> only occurres when I stop it.
>>>
>>> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>>>
>>>> Are you 100% sure that the jar files in the classpath (/lib folder) are
>>>> exactly the same on all machines? (It can happen quite easily in a
>>>> distributed standalone setup that some files are different)
>>>>
>>>>
>>>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>>>>
>>>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>>>>
>>>>>
>>>>>
>>>>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>>>>
>>>>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
>>>>>> which can lead to corrupted data when using UC)
>>>>>> Can you tell us a little bit about your environment? (How are you
>>>>>> deploying Flink, which state backend are you using, what kind of job (I
>>>>>> guess DataStream API))
>>>>>>
>>>>>> Somehow the process receiving the data is unable to deserialize it,
>>>>>> most likely because they are configured differently (different classpath,
>>>>>> dependency versions etc.)
>>>>>>
>>>>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>>>>>
>>>>>>> I do not think this is some code related problem anymore, maybe it
>>>>>>> is some bug?
>>>>>>>
>>>>>>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>>>>>>
>>>>>>>> Hi all, I find that the failure always occurred in the second task,
>>>>>>>> after the source task. So I do something in the first chaining task, I
>>>>>>>> transform the 'Map' based class object to another normal class object, 
>>>>>>>> and
>>>>>>>> the problem disappeared.
>>>>>>>>
>>>>>>>> Based on the new solution, I also tried to stop and restore job
>>>>>>>> with savepoint (all successful).
>>>>>>>>
>>>>>>>> But, I also met another problem. Also this problem occurs while I
>>>>>>>> stop the job, and also occurs in the second task after the source 
>>>>>>>> task. The
>>>>>>>> log is below:
>>>>>>>> 2021-02-05 16:21:26
>>>>>>>> java.io.EOFException
>>>>>>>> at org.apache.flink.core.memory.DataInputDeserializer
>>>>>>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>>>>>>> at org.apache.flink.types.StringV

Re: 关于1.12新增的initialize阶段时间较长问题

2021-02-07 Thread
It also maybe have something to do with my job's first tasks. The second
task have two input, one is the kafka source stream(A), another is
self-defined mysql source as broadcast stream.(B)
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an
offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only
the subtask-0 will do mysql query and send out records, other subtasks do
nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation,
I do not know whether the problem have something to do with it.

赵一旦  于2021年2月7日周日 下午4:00写道:

> 截图也没办法反应动态变化的过程。
>
> 目前是10机器的Standalone集群,状态在5G左右。通过flink-client端提交任务,然后web-ui刷新就一直转圈,过一会(几十秒大概)就OK啦,然后刚刚OK一瞬间会有很多个处于Initialize状态的任务,然后慢慢(10s内吧)没掉。
>
> flink-client端的话,有时候正常提交完成,有时候出现报错(类似说是重复任务的)。
>
>
> zilong xiao  于2021年2月7日周日 下午3:25写道:
>
>> 有截图吗?
>>
>> 赵一旦  于2021年2月7日周日 下午3:13写道:
>>
>> > 这个问题现在还有个现象,我提交任务,web
>> > UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。
>> >
>> > 目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?
>> >
>> > 赵一旦  于2021年1月26日周二 上午10:51写道:
>> >
>> > >
>> 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
>> > >
>> >
>>
>


Re: flink kryo exception

2021-02-07 Thread
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map'
based implementation before the data are transformed to the second task,
and this case savepoint works.  The only problem is that, I should stop the
job and remember the savepoint path, then restart job with the savepoint
path. And now it is : I stop the job, then the job failed and restart
automatically with the generated savepoint.  So I do not need to restart
the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not
sure that I can provide an example to reproduces the problem.

Till Rohrmann  于2021年2月6日周六 上午12:13写道:

> Could you provide us with a minimal working example which reproduces the
> problem for you? This would be super helpful in figuring out the problem
> you are experiencing. Thanks a lot for your help.
>
> Cheers,
> Till
>
> On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:
>
>> Yeah, and if it is different, why my job runs normally.  The problem only
>> occurres when I stop it.
>>
>> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>>
>>> Are you 100% sure that the jar files in the classpath (/lib folder) are
>>> exactly the same on all machines? (It can happen quite easily in a
>>> distributed standalone setup that some files are different)
>>>
>>>
>>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>>>
>>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>>>
>>>>
>>>>
>>>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>>>
>>>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
>>>>> which can lead to corrupted data when using UC)
>>>>> Can you tell us a little bit about your environment? (How are you
>>>>> deploying Flink, which state backend are you using, what kind of job (I
>>>>> guess DataStream API))
>>>>>
>>>>> Somehow the process receiving the data is unable to deserialize it,
>>>>> most likely because they are configured differently (different classpath,
>>>>> dependency versions etc.)
>>>>>
>>>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>>>>
>>>>>> I do not think this is some code related problem anymore, maybe it is
>>>>>> some bug?
>>>>>>
>>>>>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>>>>>
>>>>>>> Hi all, I find that the failure always occurred in the second task,
>>>>>>> after the source task. So I do something in the first chaining task, I
>>>>>>> transform the 'Map' based class object to another normal class object, 
>>>>>>> and
>>>>>>> the problem disappeared.
>>>>>>>
>>>>>>> Based on the new solution, I also tried to stop and restore job with
>>>>>>> savepoint (all successful).
>>>>>>>
>>>>>>> But, I also met another problem. Also this problem occurs while I
>>>>>>> stop the job, and also occurs in the second task after the source task. 
>>>>>>> The
>>>>>>> log is below:
>>>>>>> 2021-02-05 16:21:26
>>>>>>> java.io.EOFException
>>>>>>> at org.apache.flink.core.memory.DataInputDeserializer
>>>>>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue
>>>>>>> .java:783)
>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>>>>>> .deserialize(StringSerializer.java:75)
>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>>>>>> .deserialize(StringSerializer.java:33)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>>>>> .deserialize(PojoSerializer.java:411)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>>>>> .deserialize(PojoSerializer.java:411)
>>>>>>> at org.apache.flink.streaming.runtime.streamrecord.
>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202
>>>>>>> )
>>>>>>> at org.apache.flink.streaming.runtime.streamrecord.
>>>>>>> Str

Re: 关于1.12新增的initialize阶段时间较长问题

2021-02-07 Thread
截图也没办法反应动态变化的过程。
目前是10机器的Standalone集群,状态在5G左右。通过flink-client端提交任务,然后web-ui刷新就一直转圈,过一会(几十秒大概)就OK啦,然后刚刚OK一瞬间会有很多个处于Initialize状态的任务,然后慢慢(10s内吧)没掉。

flink-client端的话,有时候正常提交完成,有时候出现报错(类似说是重复任务的)。


zilong xiao  于2021年2月7日周日 下午3:25写道:

> 有截图吗?
>
> 赵一旦  于2021年2月7日周日 下午3:13写道:
>
> > 这个问题现在还有个现象,我提交任务,web
> > UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。
> >
> > 目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?
> >
> > 赵一旦  于2021年1月26日周二 上午10:51写道:
> >
> > > 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
> > >
> >
>


Re: 关于1.12新增的initialize阶段时间较长问题

2021-02-06 Thread
这个问题现在还有个现象,我提交任务,web
UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。

目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?

赵一旦  于2021年1月26日周二 上午10:51写道:

> 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
>


Re: flink kryo exception

2021-02-05 Thread
Yeah, and if it is different, why my job runs normally.  The problem only
occurres when I stop it.

Robert Metzger  于2021年2月5日周五 下午7:08写道:

> Are you 100% sure that the jar files in the classpath (/lib folder) are
> exactly the same on all machines? (It can happen quite easily in a
> distributed standalone setup that some files are different)
>
>
> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>
>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>
>>
>>
>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>
>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
>>> which can lead to corrupted data when using UC)
>>> Can you tell us a little bit about your environment? (How are you
>>> deploying Flink, which state backend are you using, what kind of job (I
>>> guess DataStream API))
>>>
>>> Somehow the process receiving the data is unable to deserialize it, most
>>> likely because they are configured differently (different classpath,
>>> dependency versions etc.)
>>>
>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>>
>>>> I do not think this is some code related problem anymore, maybe it is
>>>> some bug?
>>>>
>>>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>>>
>>>>> Hi all, I find that the failure always occurred in the second task,
>>>>> after the source task. So I do something in the first chaining task, I
>>>>> transform the 'Map' based class object to another normal class object, and
>>>>> the problem disappeared.
>>>>>
>>>>> Based on the new solution, I also tried to stop and restore job with
>>>>> savepoint (all successful).
>>>>>
>>>>> But, I also met another problem. Also this problem occurs while I stop
>>>>> the job, and also occurs in the second task after the source task. The log
>>>>> is below:
>>>>> 2021-02-05 16:21:26
>>>>> java.io.EOFException
>>>>> at org.apache.flink.core.memory.DataInputDeserializer
>>>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:
>>>>> 783)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>>>> .deserialize(StringSerializer.java:75)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>>>> .deserialize(StringSerializer.java:33)
>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>>> .deserialize(PojoSerializer.java:411)
>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>>> .deserialize(PojoSerializer.java:411)
>>>>> at org.apache.flink.streaming.runtime.streamrecord.
>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>>> at org.apache.flink.streaming.runtime.streamrecord.
>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>>> at org.apache.flink.runtime.plugable.
>>>>> NonReusingDeserializationDelegate.read(
>>>>> NonReusingDeserializationDelegate.java:55)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>> .emitNext(StreamTaskNetworkInput.java:145)
>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>> .processInput(StreamOneInputProcessor.java:67)
>>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>>>> .processInput(StreamTwoInputProcessor.java:92)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .processInput(StreamTask.java:372)
>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .runMailboxLoop(StreamTask.java:575)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>> StreamTask.java:539)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>> at org.apache.fl

Re: flink kryo exception

2021-02-05 Thread
Flink1.12.0; only using aligned checkpoint; Standalone Cluster;



Robert Metzger  于2021年2月5日周五 下午6:52写道:

> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which
> can lead to corrupted data when using UC)
> Can you tell us a little bit about your environment? (How are you
> deploying Flink, which state backend are you using, what kind of job (I
> guess DataStream API))
>
> Somehow the process receiving the data is unable to deserialize it, most
> likely because they are configured differently (different classpath,
> dependency versions etc.)
>
> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>
>> I do not think this is some code related problem anymore, maybe it is
>> some bug?
>>
>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>
>>> Hi all, I find that the failure always occurred in the second task,
>>> after the source task. So I do something in the first chaining task, I
>>> transform the 'Map' based class object to another normal class object, and
>>> the problem disappeared.
>>>
>>> Based on the new solution, I also tried to stop and restore job with
>>> savepoint (all successful).
>>>
>>> But, I also met another problem. Also this problem occurs while I stop
>>> the job, and also occurs in the second task after the source task. The log
>>> is below:
>>> 2021-02-05 16:21:26
>>> java.io.EOFException
>>> at org.apache.flink.core.memory.DataInputDeserializer
>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:
>>> 783)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>> .deserialize(StringSerializer.java:75)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>> .deserialize(StringSerializer.java:33)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>> at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>> at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>> at org.apache.flink.runtime.plugable.
>>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate
>>> .java:55)
>>> at org.apache.flink.runtime.io.network.api.serialization.
>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:145)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>> .processInput(StreamTwoInputProcessor.java:92)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:372)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:186)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:575)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:539)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> It is also about serialize and deserialize, but not related to kryo this
>>> time.
>>>
>>>
>>> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>>>
>>>> From these snippets it is hard to tell what's going wrong. Could you
>>>> maybe give us a minimal example with which to reproduce the problem?
>>>> Alternatively, have you read through Flink's serializer documentation [1]?
>>>> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>>>>
>>>> The stack trace looks as if the job fails deserializing some key of
>>>> your MapRecord map.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>>>>
>>>

Re: flink kryo exception

2021-02-05 Thread
Hi all, I find that the failure always occurred in the second task, after
the source task. So I do something in the first chaining task, I transform
the 'Map' based class object to another normal class object, and the
problem disappeared.

Based on the new solution, I also tried to stop and restore job with
savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the
job, and also occurs in the second task after the source task. The log is
below:
2021-02-05 16:21:26
java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(
DataInputDeserializer.java:321)
at org.apache.flink.types.StringValue.readString(StringValue.java:783)
at org.apache.flink.api.common.typeutils.base.StringSerializer
.deserialize(StringSerializer.java:75)
at org.apache.flink.api.common.typeutils.base.StringSerializer
.deserialize(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this
time.


Till Rohrmann  于2021年2月3日周三 下午9:22写道:

> From these snippets it is hard to tell what's going wrong. Could you maybe
> give us a minimal example with which to reproduce the problem?
> Alternatively, have you read through Flink's serializer documentation [1]?
> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>
> The stack trace looks as if the job fails deserializing some key of your
> MapRecord map.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>
> Cheers,
> Till
>
> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>
>> Some facts are possibly related with these, since another job do not meet
>> these expectations.
>> The problem job use a class which contains a field of Class MapRecord,
>> and MapRecord is defined to extend HashMap so as to accept variable json
>> data.
>>
>> Class MapRecord:
>>
>> @NoArgsConstructor
>> @Slf4j
>> public class MapRecord extends HashMap implements 
>> Serializable {
>> @Override
>> public void setTimestamp(Long timestamp) {
>> put("timestamp", timestamp);
>> put("server_time", timestamp);
>> }
>>
>> @Override
>> public Long getTimestamp() {
>> try {
>> Object ts = getOrDefault("timestamp", 
>> getOrDefault("server_time", 0L));
>> return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
>> } catch (Exception e) {
>> log.error("Error, MapRecord's timestamp invalid.", e);
>> return 0L;
>> }
>> }
>> }
>>
>> Class UserAccessLog:
>>
>> public class UserAccessLog extends AbstractRecord {
>> private MapRecord d;  // I think this is related to the problem...
>>
>> ... ...
>>
>> }
>>
>>
>> 赵一旦  于2021年2月3日周三 下午6:43写道:
>>
>>> Actually the exception is different every time I stop the job.
>>> Such as:
>

Re: flink kryo exception

2021-02-05 Thread
I do not think this is some code related problem anymore, maybe it is some
bug?

赵一旦  于2021年2月5日周五 下午4:30写道:

> Hi all, I find that the failure always occurred in the second task, after
> the source task. So I do something in the first chaining task, I transform
> the 'Map' based class object to another normal class object, and the
> problem disappeared.
>
> Based on the new solution, I also tried to stop and restore job with
> savepoint (all successful).
>
> But, I also met another problem. Also this problem occurs while I stop the
> job, and also occurs in the second task after the source task. The log is
> below:
> 2021-02-05 16:21:26
> java.io.EOFException
> at org.apache.flink.core.memory.DataInputDeserializer
> .readUnsignedByte(DataInputDeserializer.java:321)
> at org.apache.flink.types.StringValue.readString(StringValue.java:783)
> at org.apache.flink.api.common.typeutils.base.StringSerializer
> .deserialize(StringSerializer.java:75)
> at org.apache.flink.api.common.typeutils.base.StringSerializer
> .deserialize(StringSerializer.java:33)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> .read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:372)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:575)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
> It is also about serialize and deserialize, but not related to kryo this
> time.
>
>
> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>
>> From these snippets it is hard to tell what's going wrong. Could you
>> maybe give us a minimal example with which to reproduce the problem?
>> Alternatively, have you read through Flink's serializer documentation [1]?
>> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>>
>> The stack trace looks as if the job fails deserializing some key of your
>> MapRecord map.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>>
>>> Some facts are possibly related with these, since another job do not
>>> meet these expectations.
>>> The problem job use a class which contains a field of Class MapRecord,
>>> and MapRecord is defined to extend HashMap so as to accept variable json
>>> data.
>>>
>>> Class MapRecord:
>>>
>>> @NoArgsConstructor
>>> @Slf4j
>>> public class MapRecord extends HashMap implements 
>>> Serializable {
>>> @Override
>>> public void setTimestamp(Long timestamp) {
>>> put("timestamp", timestamp);
>>> put("server_time", timestamp);
>>> }
>>>
>>> @Override
>>> public Long getTimestamp() {
>>> try {
>>> Object ts = getOrDefault("timestamp", 
>>> getOrDefault("server_time", 0L));
>>> return ((Number) 
>>> Optional.ofNullable(ts).orElse(0L)).longValue();
>>> } catch (Exception e) {
>>> log.

Re: flink kryo exception

2021-02-03 Thread
Some facts are possibly related with these, since another job do not meet
these expectations.
The problem job use a class which contains a field of Class MapRecord, and
MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:

@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp",
getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}

Class UserAccessLog:

public class UserAccessLog extends AbstractRecord {
private MapRecord d;  // I think this is related to the problem...

... ...

}


赵一旦  于2021年2月3日周三 下午6:43写道:

> Actually the exception is different every time I stop the job.
> Such as:
> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
> The stack as I given above.
>
> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
> 2021-02-03 18:37:24
> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
> at java.util.ArrayList.get(ArrayList.java:433)
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:135)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> .read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:372)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:575)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
> (3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered
> class ID: 96
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 96
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:135)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.de

Re: flink kryo exception

2021-02-03 Thread
 you show us the job you are trying to resume? Is it a SQL job or a
> DataStream job, for example?
>
> From the stack trace, it looks as if the class g^XT is not on the class
> path.
>
> Cheers,
> Till
>
> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦  wrote:
>
>> I have a job, the checkpoint and savepoint all right.
>> But, if I stop the job using 'stop -p', after the savepoint generated,
>> then the job goes to fail. Here is the log:
>>
>> 2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task
>>  [] - ual_ft_uid_subid_SidIncludeFilter ->
>> ual_ft_uid_subid_Default
>> PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor ->
>> ual_ft_uid_subid_EmptyUidFilter (17/30)#0
>> (46abce5d1148b56094726d442df2fd9c) switched
>> from RUNNING to FAILED.
>>
>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
>> Caused by: java.lang.ClassNotFoundException: g^XT
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> ~[?:1.8.0_251]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>> ~[?:1.8.0_251]
>>

flink kryo exception

2021-02-03 Thread
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then
the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task
   [] - ual_ft_uid_subid_SidIncludeFilter ->
ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor ->
ual_ft_uid_subid_EmptyUidFilter (17/30)#0
(46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_251]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
~[?:1.8.0_251]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
~[?:1.8.0_251]
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 22 more


Re: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-02-02 Thread
我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。

℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:

> 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
>
>
>
>
> --原始邮件------
> 发件人: "赵一旦" 发送时间: 2021年2月3日(星期三) 中午1:24
> 收件人: "user-zh" 主题: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?
>
>
>
> 如题,按照flink对POJO的定义,感觉还是比较严格的。
>
> 我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。


关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-02-02 Thread
如题,按照flink对POJO的定义,感觉还是比较严格的。
我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。


Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-02-02 Thread
@Kezhu Wang  Hi.
最近序列化相关问题遇到好多,如上这个是因为LongAdder非public,这个简单覆盖倒是也能解决。

但是我还遇到好多关于kryo序列化问题,比如我停任务(stop -p)的时候,会在保存点成功的瞬间报错,如何开始进入restarting状态。
报的是kryo的错误:
2021-02-03 11:00:54
com.esotericsoftware.kryo.KryoException: Unable to find class: eU
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(
DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: eU
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader
.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at org.apache.flink.util.ChildFirstClassLoader
.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(
FlinkUserCodeClassLoader.java:49)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.
FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(
FlinkUserCodeClassLoaders.java:168)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(
DefaultClassResolver.java:136)
... 22 more

目前看下来比较复杂,想问问你说的通过自定义type-serializer的实现。 我不是很清楚,基于
SimpleVersionedSerializer/avro/protobuf
的实现,哪个是可以通用直接都替换现有flink实现(比如类似json这种schema,不需要人工指定任何schema定义就可以完成序列化和反序列化的),哪个是适用于特殊对象特殊处理的(比如需要人工实现序列化和反序列化)。

以及有什么简单的示例吗。

Kezhu Wang  于2021年1月31日周日 下午3:09写道:

> 自定义 state  的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
> etc.
>
> 复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。
>
> On January 31, 2021 at 11:29:25, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 这个问题有人知道吗?
> 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
>
> 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。
>
>
> 赵一旦  于2021年1月28日周四 下午6:03写道:
>
> > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> > 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
> >
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at org.apache.flink.streaming.api.operators.
> > StreamTaskStateInitializerImpl.streamOperatorStateContext(
> > StreamTaskStateInitializerImpl.java:235)
> > at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > .initializeState(AbstractStreamOperator.java:248)
> > at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > .initializeStateAndOpenOperators(OperatorChain.java:400)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask
> > .lambda$beforeInvoke$2(StreamTask.java:507)
&

请问在richFunction中如何获取该function输入的元素类型呢?

2021-02-02 Thread
如题,在RichFunction中如何获取输入元素类型。TypeInformation。
目前这部分信息封在transformation中,在function层面貌似没有。
function中需要用到,如果可以获取,可以省略一个传参。


stop -p停并生产保存点,报错kyro错误。

2021-02-02 Thread
如题,报错如下。
2021-02-02 20:44:19
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
95
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
近期遇到过不少跟序列化有关的错误。

想问下,(1)如上错误原因。(2)关于自定义序列化器,有什么类似于POJO序列化器一样的不需要用户指定schema的序列化器吗?
比如只需要注册一个实现类,不需要定义任何schema的。


Re: 关于配置关联初始化方案的实现问题

2021-01-31 Thread
我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。


javenjiangfsof  于2021年2月1日周一 下午1:40写道:

> DataStream API,像下面这样
> ```
> val list = ...   //i use jdbc to get the init data
> val dimensionInitStream = env.fromCollection(list)
> val dimension =
> dimensionStream.union(dimensionInitStream).broadcast(descriptor)
> mainStream.connect(dimensionStream)
> ...
> ```
> 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置
>
> 在 2021年2月1日 13:30,赵一旦 写道:
>
>
> FlinkSQL ? javenjiangfsof  于2021年2月1日周一
> 上午11:40写道: > Hi 社区的各位 > >
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> > + >
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
> > 1.初始化通过jdbc获取,通过fromCollection处理后,union >
> cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
> > >
> 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
> > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ >
> liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢


Re: 关于配置关联初始化方案的实现问题

2021-01-31 Thread
FlinkSQL ?

javenjiangfsof  于2021年2月1日周一 上午11:40写道:

> Hi 社区的各位
>
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> +
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
>   1.初始化通过jdbc获取,通过fromCollection处理后,union
> cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
>
> 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
>   3.更好的方案???
>
>
>   目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+
> liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举?
>   希望能看到各位的回复,感谢


Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-30 Thread
这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。

赵一旦  于2021年1月28日周四 下午6:03写道:

> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:235)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:248)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5
> /30) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:316)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:155)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:540)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:100)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:178)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:299)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: com.google.common.hash.LongAdder
> Serialization trace:
> bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> bits (com.google.common.hash.BloomFilter)
> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> FieldSerializer.java:547)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.runtime.state.he

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 Thread
所有分区无数据,为什么还期望watermark推进呢?目的是啥。貌似没啥需要计算的呀。

LakeShen  于2021年1月28日周四 下午7:42写道:

> 如果是窗口类聚合,可以尝试一下自定义窗口 Trigger
>
> Best,
> LakeShen
>
> 林影  于2021年1月28日周四 下午5:46写道:
>
> > Hi, Jessica.J.Wang
> > 开源flink看起来没这个功能哈,文档翻了一遍没找到
> >
> > Jessica.J.Wang  于2021年1月28日周四 下午5:25写道:
> >
> > > 你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
> > >
> > >
> >
> https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


Re: reduce函数的trigger问题

2021-01-28 Thread
此处的trigger你可以理解成是计算结果的输出时机,你的reduce计算还是增量的哈。

xiaolail...@163.com  于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
> .window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception {
> return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
> }
> });
>
> 使用的trigger是:
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> xiaolail...@163.com
>


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread
不是,flink是提供了richReduce,但不支持基于window的richReduce。
基于window的reduce只支持简单reduce,具体需要做自定义状态计算的,按照注释要求使用windowFunction和ProcessWindowFunction去做呀。

一直都是这样,1.12也是的哈。

Kezhu Wang  于2021年1月29日周五 上午11:40写道:

> reduceFunction 是和 ReducingStateDescriptor 配合的, 并不是
> “window的”。“WindowOperator” 的 function 是 “InternalWindowFunction”,这个可以是
> “RichFunction”。
>
> Flink 中的 state 除了可以绑定 key,也可以绑定 namespace,只是 namespace
> 并没有直接暴露给用户。如果需要的话,可以自己写个 WindowOperator,暴露个 WindowFunction。
>
> Interface WindowFunction {
> // You could do incremental aggregation here.
> void processElement(Context context, Window window, Element element);
>
> void fireWindow(Context context, Window window);
> }
>
> interface WindowedRuntimeContext {
>  State getWindowedState(StateDescriptor descriptor).
> }
>
> 把所有 window concepts 交给 WindowOperator,WindowFunction 作具体业务。
>
> On January 28, 2021 at 20:26:47, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
>
> 但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
> 如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。
>
>
>


为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread
问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。


关于flink-shaded-xxx的问题

2021-01-28 Thread
如题,我想知道flink shade了多个包,比如jackson,guava等。
其目的是(1)flink用到这些,为了避免冲突所以shade。还是(2)flink推荐用户直接使用flink shade好的这些包?

如上,我想知道是否“推荐”用户直接使用flink
shade的这些包。还是我们自己去依赖自己的包,比如我当前就用到了jackson,以及guava(我直接用了最新的30-jre的版本)。


检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-28 Thread
如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
报错堆栈如下,关键错误是什么无法访问public修饰的成员?

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:235)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5/
30) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:316)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:155)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:540)
at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:100)
at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(
StateBackend.java:178)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:299)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: com.google.common.hash.LongAdder
Serialization trace:
bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
bits (com.google.common.hash.BloomFilter)
at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:113)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.
KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(
KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readKeyGroupStateData(HeapRestoreOperation.java:299)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readStateHandleStateData(HeapRestoreOperation.java:260)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
HeapRestoreOperation.java:160)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder

Re: 咨询关于flink究竟使用的是log4j1还是log4j2.

2021-01-25 Thread
好的。

zilong xiao  于2021年1月26日周二 下午2:13写道:

> Hi
>
>
> flink从1.11开始应该支持log4j,logback,log4j2了,1.11之前的版本只支持前两者,log4j2也是可以用.properties配置的,现在1.12里的默认配置就是log4j2
>
> 祝好~
>
> 赵一旦  于2021年1月26日周二 下午1:27写道:
>
> >
> >
> 网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。
> >
>


咨询关于flink究竟使用的是log4j1还是log4j2.

2021-01-25 Thread
网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。


关于1.12新增的initialize阶段时间较长问题

2021-01-25 Thread
如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。


Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-24 Thread
这个其实还是挺乱的,我看了下hive-storage-api貌似也不是肯定没用。
我基于flink的data-stream-api的filesink方式写hive,orc格式文件。引入的是flink-orc包,内部依赖hive-storage-api中。这个我刚刚尝试去除换成hive-exec等,结果不行,因为少了部分类,比如
MapColumnVector等。
不过之前测试过写入没问题。所以看样子我data-stream写hive的时候是需要依赖flink-orc包,也就简介引入了hive-storage-api包,这是必须的。

但flink-sql-client启动的时候-l指定的部分,不可以有flink-sql-orc的包。因为和flink-connector-hive中包冲突?

相当于不同地方使用还不一样。

赵一旦  于2021年1月25日周一 下午1:44写道:

> 基于这个回答,还有另一个英文email中有个人说我的hive-common和hive-exec不一致的问题。
>
> 我分析下来,我本身没依赖任何hive-common、hive-exec。唯一引入可能是flink-sql-connector-hive-1.2.2_2.11_1.12.0中的,我看了pom,其中orc的依赖部分是去除了hive的依赖的。然后我还单独引了一个flink-sql-orc的包。刚刚分析了下,本社flink-sql-orc这个包是为了引入orc相关依赖,而这个包中是没有去除orc的传递依赖hive的。我目前尝试了个方案,居然OK了,拿出来大家分析下能确定原因最好,不能就算了,反正问题估计是可以解决了我。
>
>
> 解决方式是直接不再依赖flink-sql-orc那个包。因为本身我按照官方文档加了flink-sql-connector-hive的包,同时我分析了这个包内已经shade了orc的包。因为并不需要单独搞一个flink-sql-orc的包。刚刚初步试了下,没问题。还没继续多实验。
>
>
> 赵一旦  于2021年1月25日周一 下午12:59写道:
>
>> 我hive版本应该是1.2.1,我看spark部分依赖的1.2.1的包。
>> 此外,关于hive配置。此处我需要问下,flink集群需要有hive的依赖嘛是?我flink集群本身没加任何hive依赖。
>>
>> 只是在flink的sql-client启动的时候通过-l参数指定了部分包,这个包是基于flink官网文档给的那个flink-sql-connector-hive-1.2.2。
>>
>> 此外,在flink-client端的conf中(即classpath中)加了hive-site.xml配置,内部也仅指定了最基础的一些关于metastore的连接信息。
>>
>> Rui Li  于2021年1月25日周一 上午11:32写道:
>>
>>> 你好,
>>>
>>> 关于分区字段的filter,flink与hive的隐式类型转换规则不同,建议在写where条件时按照分区字段的类型来指定常量。
>>> 关于读ORC的异常,请问你的hive版本是多少呢?另外hive配置中是否指定过hive.txn.valid.txns参数?
>>>
>>> On Sun, Jan 24, 2021 at 6:45 AM 赵一旦  wrote:
>>>
>>> > 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
>>> > 其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:
>>> >
>>> > 2021-01-24 04:41:24,952 ERROR
>>> > org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
>>> > FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
>>> > uncaught exception. Stopping the process...
>>> >
>>> > java.util.concurrent.CompletionException:
>>> > org.apache.flink.util.FlinkRuntimeException: Failed to start the
>>> > operator coordinators
>>> > at
>>> >
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>> > ~[?:1.8.0_251]
>>> > at
>>> >
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>> > ~[?:1.8.0_251]
>>> > at
>>> >
>>> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
>>> > ~[?:1.8.0_251]
>>> > at
>>> >
>>> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
>>> > ~[?:1.8.0_251]
>>> > at
>>> >
>>> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
>>> > ~[?:1.8.0_251]
>>> > at
>>> >
>>> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> >
>>> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> >
>>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> >
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> >
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> >
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> >
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at akka.japi.pf
>>> .UnitCaseStatement.apply(CaseStatements.scala:26)
>>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at akka.japi.pf
>>> .UnitCaseStatement.apply(CaseStatements.scala:21)
>>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>>> > at
>>> > scala.PartialFunction$class.applyOrElse(PartialFunction.scal

Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-24 Thread
基于这个回答,还有另一个英文email中有个人说我的hive-common和hive-exec不一致的问题。
我分析下来,我本身没依赖任何hive-common、hive-exec。唯一引入可能是flink-sql-connector-hive-1.2.2_2.11_1.12.0中的,我看了pom,其中orc的依赖部分是去除了hive的依赖的。然后我还单独引了一个flink-sql-orc的包。刚刚分析了下,本社flink-sql-orc这个包是为了引入orc相关依赖,而这个包中是没有去除orc的传递依赖hive的。我目前尝试了个方案,居然OK了,拿出来大家分析下能确定原因最好,不能就算了,反正问题估计是可以解决了我。

解决方式是直接不再依赖flink-sql-orc那个包。因为本身我按照官方文档加了flink-sql-connector-hive的包,同时我分析了这个包内已经shade了orc的包。因为并不需要单独搞一个flink-sql-orc的包。刚刚初步试了下,没问题。还没继续多实验。


赵一旦  于2021年1月25日周一 下午12:59写道:

> 我hive版本应该是1.2.1,我看spark部分依赖的1.2.1的包。
> 此外,关于hive配置。此处我需要问下,flink集群需要有hive的依赖嘛是?我flink集群本身没加任何hive依赖。
>
> 只是在flink的sql-client启动的时候通过-l参数指定了部分包,这个包是基于flink官网文档给的那个flink-sql-connector-hive-1.2.2。
>
> 此外,在flink-client端的conf中(即classpath中)加了hive-site.xml配置,内部也仅指定了最基础的一些关于metastore的连接信息。
>
> Rui Li  于2021年1月25日周一 上午11:32写道:
>
>> 你好,
>>
>> 关于分区字段的filter,flink与hive的隐式类型转换规则不同,建议在写where条件时按照分区字段的类型来指定常量。
>> 关于读ORC的异常,请问你的hive版本是多少呢?另外hive配置中是否指定过hive.txn.valid.txns参数?
>>
>> On Sun, Jan 24, 2021 at 6:45 AM 赵一旦  wrote:
>>
>> > 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
>> > 其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:
>> >
>> > 2021-01-24 04:41:24,952 ERROR
>> > org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
>> > FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
>> > uncaught exception. Stopping the process...
>> >
>> > java.util.concurrent.CompletionException:
>> > org.apache.flink.util.FlinkRuntimeException: Failed to start the
>> > operator coordinators
>> > at
>> >
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> > ~[?:1.8.0_251]
>> > at
>> >
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> > ~[?:1.8.0_251]
>> > at
>> >
>> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
>> > ~[?:1.8.0_251]
>> > at
>> >
>> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
>> > ~[?:1.8.0_251]
>> > at
>> >
>> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
>> > ~[?:1.8.0_251]
>> > at
>> >
>> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> >
>> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> >
>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> >
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> >
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> >
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> >
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
>> > at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:26)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:21)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at akka.japi.pf
>> > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at
>> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> > [flink-dists-extended_2.11-1.12.0.jar:?]
>> > at akka.actor.Actor$class.aroundRec

Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-24 Thread
我hive版本应该是1.2.1,我看spark部分依赖的1.2.1的包。
此外,关于hive配置。此处我需要问下,flink集群需要有hive的依赖嘛是?我flink集群本身没加任何hive依赖。
只是在flink的sql-client启动的时候通过-l参数指定了部分包,这个包是基于flink官网文档给的那个flink-sql-connector-hive-1.2.2。
此外,在flink-client端的conf中(即classpath中)加了hive-site.xml配置,内部也仅指定了最基础的一些关于metastore的连接信息。

Rui Li  于2021年1月25日周一 上午11:32写道:

> 你好,
>
> 关于分区字段的filter,flink与hive的隐式类型转换规则不同,建议在写where条件时按照分区字段的类型来指定常量。
> 关于读ORC的异常,请问你的hive版本是多少呢?另外hive配置中是否指定过hive.txn.valid.txns参数?
>
> On Sun, Jan 24, 2021 at 6:45 AM 赵一旦  wrote:
>
> > 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
> > 其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:
> >
> > 2021-01-24 04:41:24,952 ERROR
> > org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
> > FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
> > uncaught exception. Stopping the process...
> >
> > java.util.concurrent.CompletionException:
> > org.apache.flink.util.FlinkRuntimeException: Failed to start the
> > operator coordinators
> > at
> >
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> > ~[?:1.8.0_251]
> > at
> >
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> > ~[?:1.8.0_251]
> > at
> > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
> > ~[?:1.8.0_251]
> > at
> >
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
> > ~[?:1.8.0_251]
> > at
> >
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
> > ~[?:1.8.0_251]
> > at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> > ~[flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.japi.pf
> > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > [flink-dists-extended_2.11-1.12.0.jar:?]
> > at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoin

FlinkSQL submit query and then the jobmanager failed.

2021-01-24 Thread
As the title, my query sql is very simple, it just select all columns from
a hive table(version 1.2.1; orc format).  When the sql is submitted, after
several seconds, the jobmanager is failed. Here is the Jobmanager's log.
Does anyone can help to this problem?

2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
uncaught exception. Stopping the process...

java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
~[?:1.8.0_251]
at 
org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dists-extended_2.11-1.12.0.jar:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
start the operator coordinators
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at 
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
   

Re: Flink 1.11 checkpoint compatibility issue

2021-01-24 Thread
I think you need provide all the parallelism information, such like the
operator info 'Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096'.
What is the parallelism, the maxparallism maybe be generated from the
parallelism you have set.

Arvid Heise  于2021年1月22日周五 下午11:03写道:

> Hi Lu,
>
> if you are using data stream API make sure to set manual uids for each
> operator. Only then migrating of savepoints to other major versions of
> Flink is supported. [1]
>
> Best,
>
> Arvid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> On Fri, Jan 22, 2021 at 3:45 PM Matthias Pohl 
> wrote:
>
>> Hi Lu,
>> thanks for reaching out to the community, Lu. Interesting observation.
>> There's no change between 1.9.1 and 1.11 that could explain this behavior
>> as far as I can tell. Have you had a chance to debug the code? Can you
>> provide the code so that we could look into it more closely?
>> Another thing: Are you using the TableAPI in your job? There might be
>> some problems with setting the maxParallelism in the TableAPI.
>>
>> Keep in mind that you could use the State Processor API [1] to adjust the
>> maxParallelism per Operator in a Savepoint.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#modifying-savepoints
>>
>> On Fri, Jan 22, 2021 at 12:49 AM Lu Niu  wrote:
>>
>>> Hi,
>>>
>>> We recently migrated from 1.9.1 to flink 1.11 and notice the new job
>>> cannot consume from savepoint taken in 1.9.1. Here is the list of operator
>>> id and max parallelism of savepoints taken in both versions. The only code
>>> change is version upgrade.
>>>
>>> savepoint 1.9.1:
>>> ```
>>> Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
>>> Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
>>> Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
>>> Id: d003b5c018424b83b771743563711891, maxparallsim: 900
>>> Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
>>> Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
>>> Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
>>> Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
>>> Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
>>> Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
>>> Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
>>> ```
>>>
>>> savepoint 1.11:
>>> ```
>>> Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
>>> Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
>>> Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
>>> Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
>>> Id: d003b5c018424b83b771743563711891, maxparallsim: 900
>>> Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
>>> Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
>>> Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
>>> Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
>>> Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
>>> Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
>>> Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
>>> ```
>>>
>>> In the code we use env.setMaxParallsim(900). it is strange that
>>> savepoint 1.9.1 has different max parallelism for different operators and
>>> we don't know where 1024 and 4096 come from. Here I want to ask the
>>> community is it possible these are set by flink itself?
>>>
>>> Best
>>> Lu
>>>
>>


FlinkSQL读取orc表,SQL提交就会导致JobManager进程报错并退出。

2021-01-23 Thread
大佬门帮忙分析下,从日志以及实验过程来看,和ORC有关。

SQL很简单,随意一个select就可以。
异常日志如下:

2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
uncaught exception. Stopping the process...

java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
~[?:1.8.0_251]
at 
org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dists-extended_2.11-1.12.0.jar:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
start the operator coordinators
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at 
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
at 
org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57)
~[?:?]
at 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$Context.(OrcInputFormat.java:421)
~[?:?]

Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-23 Thread
此外,还有我发现Parquet格式是可以的,顺便看了下FlinkStreamConnector种,FileSink的ParquetBulkFomart。
然后文档讲到ParquetAvroWriters,这种格式写的文件对应hive表怎么创建?貌似默认stored as
parquet的话,不带任何avro的信息呀。

赵一旦  于2021年1月24日周日 上午6:45写道:

> 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
> 其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:
>
> 2021-01-24 04:41:24,952 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: 
> Thread 'flink-akka.actor.default-dispatcher-2' produced an uncaught 
> exception. Stopping the process...
>
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
> coordinators
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722) 
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) 
> ~[?:1.8.0_251]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dists-extended_2.11-1.12.0.jar:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
> operator coordinators
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
>  ~[flink-dists-extend

Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-23 Thread
补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:

2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
uncaught exception. Stopping the process...

java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
~[?:1.8.0_251]
at 
org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dists-extended_2.11-1.12.0.jar:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
start the operator coordinators
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at 
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
at 
org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57)
~[?:?]
at 

FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-23 Thread
SQL很简单,如下:

select * from test2
where `dt`=20210124 and `hour`=02 and `supply_id`=2027
limit 1000;

提交之后很快就finished,并且没有查询到任何数据。

但实际是有数据的,通过spark-sql确认过相同的语句可以查询到数据。

看了JM和TM的日志中有No more splits available。

目前来看貌似一个split也没有。这个应该是1.12的新sourceAPI。

不清楚是不是bug,还是有什么使用注意点呢?


Re: Re:Flink 并行度问题

2021-01-23 Thread
on yarn是另一回事情,yarn自身有自身对资源的衡量方式,就是vcore。
你设置你的作业25并行,默认yarn可能就是按照1个并发对应1个vcore,就这么个意思。肯定有参数可以调整的,你可以让1个并发对应10个vcore都没问题。
就是一种衡量方式而已。

赵一旦  于2021年1月23日周六 下午8:20写道:

> 按照你的理解,别说服务器了,你自己的笔记本电脑都别工作了。你电脑才几核,开机后进程数量就超过CPU数了,这些进程难道不工作了?
>
> 多线程、多线程,不是说N核的就只有N个线程呀。
>
> 你standalone,每个机器想设置多少个slot都没问题,和你的机器CPU没关系懂了嘛。“没关系”。
>
> Jacob <17691150...@163.com> 于2021年1月23日周六 下午4:35写道:
>
>> 谢谢回复~
>>
>> 在我的理解中,并行度数量不应该超过CPU的数量的。
>>
>>
>>
>> -
>> Thanks!
>> Jacob
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: Re:Flink 并行度问题

2021-01-23 Thread
按照你的理解,别说服务器了,你自己的笔记本电脑都别工作了。你电脑才几核,开机后进程数量就超过CPU数了,这些进程难道不工作了?

多线程、多线程,不是说N核的就只有N个线程呀。

你standalone,每个机器想设置多少个slot都没问题,和你的机器CPU没关系懂了嘛。“没关系”。

Jacob <17691150...@163.com> 于2021年1月23日周六 下午4:35写道:

> 谢谢回复~
>
> 在我的理解中,并行度数量不应该超过CPU的数量的。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-23 Thread
非常感谢,这个封装真好,直接整个类都可以拿来用了。

Kezhu Wang  于2021年1月23日周六 下午6:00写道:

>
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java#L259
>
>
> On January 23, 2021 at 13:49:23, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
>
> 对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。
>
>
> serverTimeColumnVector.vector[rowId] = ele.getTimestamp();
>
> MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];
>


Re: 根据业务需求选择合适的flink state

2021-01-23 Thread
r[Long]("min-state",classOf[Long]))
> maxTsState =getRuntimeContext.getState(new
> ValueStateDescriptor[Long]("max-state",classOf[Long]))
>   }
>
>   override def process(key: Tuple, context: Context, elements:
> Iterable[MinMaxTemp], out: Collector[CloudliveWatcher]): Unit = {
> val minTs: Long = minTsState.value() //取出上一个时间戳最小值
> val maxTs: Long = maxTsState.value() //取出上一个时间戳最大值
>
> val device_type = 0
> val net_opretor = ""
> val net_type = ""
> val area = ""
> val plat_form = ""
> val network_operator = ""
> val role = 0
> val useragent = ""
> val currentDate = DateUtil.currentDate
> val created_time = currentDate
> val modified_time = currentDate
> var id =0L
> var courseId =0L
> var partnerId =0L
> var ip =""
> var customerId =0L
> var courseNumber =0L
> var nickName =""
> var liveType =0
> var uid =""
> var eventTime =0L
> var min =0L
> var max =0L
> var join_time =""
> var leave_time =""
> var duration =0L
> var duration_time =""
> val iterator: Iterator[MinMaxTemp] = elements.iterator
> if (iterator.hasNext) {
>   val value: MinMaxTemp = iterator.next()
>   id = value.id
>   courseId= value.courseId
>   partnerId = value.partnerId
>   ip = value.ip
>   customerId = value.customerId
>   courseNumber = value.courseNumber
>   nickName = value.nickName
>   liveType = value.liveType
>   uid = value.uid
>   minTsState.update(value.mineventTime) //更新最小时间戳
>   maxTsState.update(value.maxeventTime)  //更新最大时间戳
> }
> join_time = DateUtil.convertTimeStamp2DateStr(minTs,
> DateUtil.SECOND_DATE_FORMAT)
> leave_time = DateUtil.convertTimeStamp2DateStr(maxTs,
> DateUtil.SECOND_DATE_FORMAT)
> duration = (maxTs - minTs) / 1000  //停留多少秒
> duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> minTsState.clear()
> maxTsState.clear()
>
> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>   , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
> CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>   , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
>   }
> }
>
>
>
>
>
> 赵一旦  于2021年1月21日周四 下午8:38写道:
>
> > 我表达的方法是按照session
> > window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。
> >
> > 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。
> >
> >
> >
> 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。
> >
> > 赵一旦  于2021年1月21日周四 下午8:28写道:
> >
> > > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
> > >
> > > 张锴  于2021年1月21日周四 下午6:25写道:
> > >
> > >>
> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> > >>
> > >>
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > >> 下面是我的部分代码逻辑:
> > >>
> > >> val ds = dataStream
> > >>   .filter(_.liveType == 1)
> > >>   .keyBy(1, 2)
> > >>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > >>   .process(new myProcessWindow()).uid("process-id")
> > >>
> > >> class myProcessWindow() extends
> > >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > >> TimeWindow] {
> > >>
> > >>   override def process(key: Tuple, context: Context, elements:
> > >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > >> = {
> > >> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > >> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> > >>
> > >> val currentDate = DateUtil.currentDate
> > >> val created_time = currentDate
> > >> val modified_time = currentDate
> > >>  。。。
> > >>
> > >> val join_time: String =
> > >> DateUtil.convertTimeStamp2DateStr(startTime,
> > >> DateUtil.SECOND_D

求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-22 Thread
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。

serverTimeColumnVector.vector[rowId] = ele.getTimestamp();

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 Thread
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


赵一旦  于2021年1月23日周六 下午1:42写道:

> 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
>
> 张锴  于2021年1月21日周四 下午7:35写道:
>
>> @赵一旦
>> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>>
>> 张锴  于2021年1月21日周四 下午7:13写道:
>>
>> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>> >
>> > 赵一旦  于2021年1月21日周四 下午7:05写道:
>> >
>> >> @Michael Ran; 嗯嗯,没关系。
>> >>
>> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>> >>
>> >>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>> >>
>> >> Michael Ran  于2021年1月21日周四 下午7:01写道:
>> >>
>> >> >
>> >> >
>> >>
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
>> >> > 在 2021-01-21 18:45:06,"张锴"  写道:
>> >> > >import
>> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> >> > >DateTimeBucketer}
>> >> > >
>> >> > >sink.setBucketer sink.setWriter用这种方式试试
>> >> > >
>> >> > >
>> >> > >
>> >> > >赵一旦  于2021年1月21日周四 下午6:37写道:
>> >> > >
>> >> > >> @Michael Ran
>> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> >> > >>
>> >> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
>> >> > >>
>> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs)
>> {...}
>> >> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> >> > >> > >具体报错信息如下:
>> >> > >> > >
>> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> >> > Hadoop
>> >> > >> are
>> >> > >> > >only supported for HDFS
>> >> > >> > >at
>> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> >> > >> > >HadoopRecoverableWriter.java:61)
>> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> >
>> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> >> > >> > >.java:260)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> >> > >> > >at
>> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> >> > >> > >at
>> >> > >>
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> >> > >> > >at
>> >> > >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> >> > >> > >StreamTask.java:501)
>> >> > >> > >at
>> >> > >> >
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> >> > >> > >.java:531)
>> >> > >> > >at
>> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> >> > >> > >at
>> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> >> > >> > >at java.lang.Thread.run(Thread.java:748)
>> >> > >> > >
>> >> > >> > >
>> >> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> >> > >> > >
>> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> >> > >> > >>
>> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> >> > >> > >>
>> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >
>>
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 Thread
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。

张锴  于2021年1月21日周四 下午7:35写道:

> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴  于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦  于2021年1月21日周四 下午7:05写道:
> >
> >> @Michael Ran; 嗯嗯,没关系。
> >>
> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
> >>
> >>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
> >>
> >> Michael Ran  于2021年1月21日周四 下午7:01写道:
> >>
> >> >
> >> >
> >>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> >> > 在 2021-01-21 18:45:06,"张锴"  写道:
> >> > >import
> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >> > >DateTimeBucketer}
> >> > >
> >> > >sink.setBucketer sink.setWriter用这种方式试试
> >> > >
> >> > >
> >> > >
> >> > >赵一旦  于2021年1月21日周四 下午6:37写道:
> >> > >
> >> > >> @Michael Ran
> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >> > >>
> >> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> >> > >>
> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >> > >> > >具体报错信息如下:
> >> > >> > >
> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> >> > Hadoop
> >> > >> are
> >> > >> > >only supported for HDFS
> >> > >> > >at
> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >> > >> > >HadoopRecoverableWriter.java:61)
> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >> > >.java:260)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >> > >at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >> > >at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >> > >at
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >> > >at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >> > >StreamTask.java:501)
> >> > >> > >at
> >> > >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >> > >.java:531)
> >> > >> > >at
> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >> > >at
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >> > >at java.lang.Thread.run(Thread.java:748)
> >> > >> > >
> >> > >> > >
> >> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> >> > >> > >
> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >> > >>
> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >> > >>
> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >> > >>
> >> > >> > >>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> >
> >>
> >
>


Re: Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。

2021-01-22 Thread
已解决。重改写了flink源码覆盖了这部分限制就可以了。

赵一旦  于2021年1月22日周五 上午10:17写道:

> 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗?
>
>
> 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。
>
> 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。
>
> 报错是只有hdfs才支持recoverableWriter。
>
> 有人知道如何解吗?
>


Re: Flink 并行度问题

2021-01-22 Thread
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。
比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。

Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:

> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
> topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
>
>
> 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
>
> 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。

2021-01-21 Thread
如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗?

我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。

不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。

报错是只有hdfs才支持recoverableWriter。

有人知道如何解吗?


Re: 根据业务需求选择合适的flink state

2021-01-21 Thread
我理解你要的最终mysql结果表是:
直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);

如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。


如上按照我的方案就可以实现哈。

xuhaiLong  于2021年1月22日周五 上午10:03写道:

> 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试?
>
>
> 在2021年1月21日 18:24,张锴 写道:
> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> 下面是我的部分代码逻辑:
>
> val ds = dataStream
> .filter(_.liveType == 1)
> .keyBy(1, 2)
> .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> .process(new myProcessWindow()).uid("process-id")
>
> class myProcessWindow() extends
> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> TimeWindow] {
>
> override def process(key: Tuple, context: Context, elements:
> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> = {
> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>
> val currentDate = DateUtil.currentDate
> val created_time = currentDate
> val modified_time = currentDate
> 。。。
>
> val join_time: String =
> DateUtil.convertTimeStamp2DateStr(startTime,
> DateUtil.SECOND_DATE_FORMAT)
> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> DateUtil.SECOND_DATE_FORMAT)
> val duration = (endTime - startTime) / 1000  //停留多少秒
> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
> CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦  于2020年12月28日周一 下午7:12写道:
>
> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> 能描述一下用session window的考虑吗
>
> Akisaya  于2020年12月28日周一 下午5:00写道:
>
> 这个可以用 session window 吧
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_...@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
>
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
>
>
>
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>
>
>
>
>


Re: 根据业务需求选择合适的flink state

2021-01-21 Thread
我表达的方法是按照session
window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。

实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。

然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。

赵一旦  于2021年1月21日周四 下午8:28写道:

> 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
>
> 张锴  于2021年1月21日周四 下午6:25写道:
>
>> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>>
>> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
>> 下面是我的部分代码逻辑:
>>
>> val ds = dataStream
>>   .filter(_.liveType == 1)
>>   .keyBy(1, 2)
>>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>>   .process(new myProcessWindow()).uid("process-id")
>>
>> class myProcessWindow() extends
>> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
>> TimeWindow] {
>>
>>   override def process(key: Tuple, context: Context, elements:
>> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
>> = {
>> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
>> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>>
>> val currentDate = DateUtil.currentDate
>> val created_time = currentDate
>> val modified_time = currentDate
>>  。。。
>>
>> val join_time: String =
>> DateUtil.convertTimeStamp2DateStr(startTime,
>> DateUtil.SECOND_DATE_FORMAT)
>> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
>> DateUtil.SECOND_DATE_FORMAT)
>> val duration = (endTime - startTime) / 1000  //停留多少秒
>> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
>> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
>> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
>> join_time, leave_time, created_time, modified_time
>>   , liveType, plat_form, duration, duration_time,
>> network_operator, role, useragent, uid, eventTime))
>>
>> CloudliveWatcher(id, partnerId, courseId, customerId,
>> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
>> join_time, leave_time, created_time, modified_time
>>   , liveType, plat_form, duration, duration_time,
>> network_operator, role, useragent, uid, eventTime)
>>
>> }
>>
>>
>> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>>
>>
>>
>>
>> 赵一旦  于2020年12月28日周一 下午7:12写道:
>>
>> > 按直播间ID和用户ID分组,使用session
>> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>> >
>> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>> >
>> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>> >
>> >
>> > 张锴  于2020年12月28日周一 下午5:35写道:
>> >
>> > > 能描述一下用session window的考虑吗
>> > >
>> > > Akisaya  于2020年12月28日周一 下午5:00写道:
>> > >
>> > > > 这个可以用 session window 吧
>> > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>> > > >
>> > > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
>> > > >
>> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>> > > > >
>> > > > >
>> > > > >
>> > > > > news_...@163.com
>> > > > >
>> > > > > 发件人: 张锴
>> > > > > 发送时间: 2020-12-28 13:35
>> > > > > 收件人: user-zh
>> > > > > 主题: 根据业务需求选择合适的flink state
>> > > > > 各位大佬帮我分析下如下需求应该怎么写
>> > > > >
>> > > > > 需求说明:
>> > > > >
>> > >
>> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>> > > > >
>> > > > > 我的想法:
>> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
>> Time中的分钟数
>> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>> > > > >
>> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>> > > > >
>> > > > > flink 版本1.10.1
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: 根据业务需求选择合适的flink state

2021-01-21 Thread
我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。

张锴  于2021年1月21日周四 下午6:25写道:

> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> 下面是我的部分代码逻辑:
>
> val ds = dataStream
>   .filter(_.liveType == 1)
>   .keyBy(1, 2)
>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>   .process(new myProcessWindow()).uid("process-id")
>
> class myProcessWindow() extends
> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> TimeWindow] {
>
>   override def process(key: Tuple, context: Context, elements:
> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> = {
> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>
> val currentDate = DateUtil.currentDate
> val created_time = currentDate
> val modified_time = currentDate
>  。。。
>
> val join_time: String =
> DateUtil.convertTimeStamp2DateStr(startTime,
> DateUtil.SECOND_DATE_FORMAT)
> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> DateUtil.SECOND_DATE_FORMAT)
> val duration = (endTime - startTime) / 1000  //停留多少秒
> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>   , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
> CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>   , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦  于2020年12月28日周一 下午7:12写道:
>
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > > 能描述一下用session window的考虑吗
> > >
> > > Akisaya  于2020年12月28日周一 下午5:00写道:
> > >
> > > > 这个可以用 session window 吧
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > > >
> > > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > > >
> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > > >
> > > > >
> > > > >
> > > > > news_...@163.com
> > > > >
> > > > > 发件人: 张锴
> > > > > 发送时间: 2020-12-28 13:35
> > > > > 收件人: user-zh
> > > > > 主题: 根据业务需求选择合适的flink state
> > > > > 各位大佬帮我分析下如下需求应该怎么写
> > > > >
> > > > > 需求说明:
> > > > >
> > >
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > > >
> > > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > > >
> > > > > 我的想法:
> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
> Time中的分钟数
> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > > >
> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > > >
> > > > > flink 版本1.10.1
> > > > >
> > > >
> > >
> >
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
@Michael Ran; 嗯嗯,没关系。

@张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。

Michael Ran  于2021年1月21日周四 下午7:01写道:

>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> 在 2021-01-21 18:45:06,"张锴"  写道:
> >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >DateTimeBucketer}
> >
> >sink.setBucketer sink.setWriter用这种方式试试
> >
> >
> >
> >赵一旦  于2021年1月21日周四 下午6:37写道:
> >
> >> @Michael Ran
> >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >>
> >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> >>
> >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >> > >具体报错信息如下:
> >> > >
> >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop
> >> are
> >> > >only supported for HDFS
> >> > >at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >> > >HadoopRecoverableWriter.java:61)
> >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >.java:260)
> >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> >
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >at
> >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >at
> >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >at
> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >StreamTask.java:501)
> >> > >at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >.java:531)
> >> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >at java.lang.Thread.run(Thread.java:748)
> >> > >
> >> > >
> >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> >> > >
> >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >>
> >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >>
> >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >>
> >> > >>
> >> > >>
> >> >
> >>
>


Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
@Michael Ran
然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。

Michael Ran  于2021年1月21日周四 下午5:23写道:

> 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >具体报错信息如下:
> >
> >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
> >only supported for HDFS
> >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >HadoopRecoverableWriter.java:61)
> >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >.createRecoverableWriter(HadoopFileSystem.java:210)
> >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >.java:260)
> >at org.apache.flink.streaming.api.functions.sink.filesystem.
>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >.initializeState(AbstractUdfStreamOperator.java:96)
> >at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >.initializeState(AbstractStreamOperator.java:264)
> >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >.lambda$beforeInvoke$2(StreamTask.java:507)
> >at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >.runThrowing(StreamTaskActionExecutor.java:47)
> >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >StreamTask.java:501)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >.java:531)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >at java.lang.Thread.run(Thread.java:748)
> >
> >
> >赵一旦  于2021年1月21日周四 下午5:17写道:
> >
> >> Recoverable writers on Hadoop are only supported for HDFS
> >>
> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >>
> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >>
> >>
> >>
>


Re: Re: flink监控

2021-01-21 Thread
每个节点,即进程,直接监控进程的cpu,内存就可以。没有更小的粒度。
通信的话看进程的io读写,网络读写等吧。此外flink的rest api可以获取flink web
ui能看到的全部信息,比如节点之间已发送records数量等。


penguin.  于2021年1月18日周一 上午10:55写道:

>
> 那请问对于每个节点的CPU、内存使用率以及节点之间的通信量如何进行实时监控获取数据呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-18 10:15:22,"赵一旦"  写道:
> >slot好像只是逻辑概念,监控意义不大,没有资源隔离。
> >
> >penguin.  于2021年1月15日周五 下午5:06写道:
> >
> >> Hi,
> >> flink集群中,能对TaskManager的每个TaskSlot进行监控吗?比如每个slot的cpu和内存使用率之类的指标。
> >>
> >>
> >> penguin
>


Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。

赵一旦  于2021年1月21日周四 下午5:18写道:

> 具体报错信息如下:
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS
> at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> HadoopRecoverableWriter.java:61)
> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> .createRecoverableWriter(HadoopFileSystem.java:210)
> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> .java:260)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:
> 270)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.initializeState(StreamingFileSink.java:412)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .tryRestoreFunction(StreamingFunctionUtils.java:185)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .restoreFunctionState(StreamingFunctionUtils.java:167)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> .initializeOperatorState(StreamOperatorStateHandler.java:107)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:264)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
>
> 赵一旦  于2021年1月21日周四 下午5:17写道:
>
>> Recoverable writers on Hadoop are only supported for HDFS
>>
>> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>>
>> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>>
>>
>>


Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
具体报错信息如下:

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
only supported for HDFS
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
HadoopRecoverableWriter.java:61)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
.createRecoverableWriter(HadoopFileSystem.java:210)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
.java:260)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.initializeState(StreamingFileSink.java:412)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.tryRestoreFunction(StreamingFunctionUtils.java:185)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.restoreFunctionState(StreamingFunctionUtils.java:167)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.initializeOperatorState(StreamOperatorStateHandler.java:107)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:264)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)


赵一旦  于2021年1月21日周四 下午5:17写道:

> Recoverable writers on Hadoop are only supported for HDFS
>
> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>
> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>
>
>


Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
Recoverable writers on Hadoop are only supported for HDFS

如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。

使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。


flink整合hive问题

2021-01-20 Thread
我将线上的hive-site文件复制到了flink的conf中,并且相关jar都放好。
使用flink的sql-client的-l方式将相关jar指定,启动了sql-client。

然后catalog,databases,tables等信息都能查询到了。

但是select * from xxTable where dt=''; 就会有问题。
看了flink集群报错,这个错误直接导致flink的standalonesession进程失败会。
报错如下:


2021-01-21 13:43:42,818 INFO  org.apache.hadoop.fs.bos.BaiduBosFileSystem
   [] - re-open at specific locaition: 0
...skipping...
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
at
org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57)
~[?:?]
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$Context.(OrcInputFormat.java:421)
~[?:?]
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:983)
~[?:?]
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
~[?:?]
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:86)
~[?:?]
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.enumerateSplits(HiveSourceFileEnumerator.java:57)
~[?:?]
at
org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:140)
~[flink-table_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.connectors.hive.HiveSource.createEnumerator(HiveSource.java:115)
~[?:?]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more


Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

2021-01-17 Thread
If you changed the consumer group in your new job, the group id will be the
new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint
you specified no matter whether the group id is the original one?

Rex Fenley  于2021年1月18日周一 下午12:53写道:

> Hello,
>
> When using the Kafka consumer connector, if we restore a from a checkpoint
> or savepoint using a differently named consumer group than the one we
> originally ran a job with will it still pick up exactly where it left off
> or are you locked into using the same consumer group as before?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: flink 设置broadcastStream 的MapStateDescriptor

2021-01-17 Thread
key和value都是你自己设置的,看你需要设置什么类型哈。这个不是强制的。
你的map state的key和value在具体业务场景下需要什么类型,那个地方就设置什么类型的TypeInformation,懂吧。

smq <374060...@qq.com> 于2021年1月18日周一 下午12:18写道:

> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 明启 孙 <374060...@qq.com
> 发送时间: 2021年1月18日 11:30
> 收件人: user-zh  主题: 转发:flink 设置broadcastStream 的MapStateDescriptor
>
>
>
>
> 大家好:
>
>  MapStateDescriptor (String name,
> TypeInformation valueTypeInfo)中的第二个参数key值得是什么,其中value指的是状态值,在网上看到key大部分用的是String类型,不知道这个key是根据什么指定的。
>
> 
>
> smq
>
> 


Re: flink滑动窗口输出结果的问题

2021-01-17 Thread
我看你还写到 “每分钟触发统计一次结果”,你是不是做了自定义trigger啥的,导致逻辑不对了。

默认情况就可以实现你要的效果,不要自定义trigger哈这里。

赵一旦  于2021年1月18日周一 下午3:52写道:

> 补充下,是针对每个key,每1min输出一个结果。采用1h窗口,1min滑动窗口。
>
>
>
> 赵一旦  于2021年1月18日周一 下午3:51写道:
>
>> 你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
>> 不是很清楚你表达的最早什么的是什么含义。
>>
>> 基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。
>>
>> eriendeng  于2021年1月18日周一 上午11:42写道:
>>
>>> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
>>> by然后再用window时间筛选你要的数据。
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>>


Re: flink滑动窗口输出结果的问题

2021-01-17 Thread
补充下,是针对每个key,每1min输出一个结果。采用1h窗口,1min滑动窗口。



赵一旦  于2021年1月18日周一 下午3:51写道:

> 你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
> 不是很清楚你表达的最早什么的是什么含义。
>
> 基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。
>
> eriendeng  于2021年1月18日周一 上午11:42写道:
>
>> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
>> by然后再用window时间筛选你要的数据。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: flink滑动窗口输出结果的问题

2021-01-17 Thread
你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
不是很清楚你表达的最早什么的是什么含义。

基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。

eriendeng  于2021年1月18日周一 上午11:42写道:

> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
> by然后再用window时间筛选你要的数据。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink滑动窗口输出结果的问题

2021-01-17 Thread
从你的描述来看,你说的貌似就是sliding window呀。
9-10,9.01-10.01...

marble.zh...@coinflex.com.INVALID 
于2021年1月15日周五 下午5:45写道:

> 大家好, 我现在的场景需求是,窗口size是1个小时,每分钟触发统计一次结果。比如现在是10点钟,则统计9点到10点的结果。
> 下一分钟则在10:01分时触发统计9:01到10:01的结果。
>
> 如果用Sliding window, 比如.timeWindow(Time.hours(1L), Time.minutes(1)),
> 则会输出60/1=60个结果集,这不是我想要的结果,我只想要当前时间往前一个小时的结果。
>
> 除了在window function api做逻辑过虑外,还有什么方法可以实现这种场景?
>
> 滚动窗口的话不适合,它每次是输出整点的,比如从9点到10点,然后就跳到10点到11点,也不符合我的业务要求。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink监控

2021-01-17 Thread
slot好像只是逻辑概念,监控意义不大,没有资源隔离。

penguin.  于2021年1月15日周五 下午5:06写道:

> Hi,
> flink集群中,能对TaskManager的每个TaskSlot进行监控吗?比如每个slot的cpu和内存使用率之类的指标。
>
>
> penguin


Re: Re: sql-client 连接hive报错 TTransportException

2021-01-15 Thread
Hi,问题解决了。不清楚你为啥需要启动hiveServer2呢?貌似不需要。flink只需要用到hms吧。

RS  于2020年10月30日周五 上午9:57写道:

> Hi,
>
> 谢谢,应该是HMS的问题, 原来是需要配置remote的HMS,之前都是local模式
> 我执行了一下流程:
> 1. 清理了旧的数据库和数据目录
> 2. 重新初始化 schematool -dbType mysql -initSchema
> 3. 启动hive --service metastore, 成功监听端口9083端口
> 4. 启动hiveserver2, hiveserver2一直在重试,没有监听1端口
>
>
> 然后hiveserver2启动失败, hive版本3.1.2, 请问下这个问题如何解决呢?
>
>
> 2020-10-29T18:53:35,602  WARN [main] server.HiveServer2: Error starting
> HiveServer2 on attempt 1, will retry in 6ms
> java.lang.RuntimeException: Error initializing notification event poll
> at
> org.apache.hive.service.server.HiveServer2.init(HiveServer2.java:275)
> ~[hive-service-3.1.2.jar:3.1.2]
> at
> org.apache.hive.service.server.HiveServer2.startHiveServer2(HiveServer2.java:1036)
> [hive-service-3.1.
> 2.jar:3.1.2]
> at
> org.apache.hive.service.server.HiveServer2.access$1600(HiveServer2.java:140)
> [hive-service-3.1.2.jar:
> 3.1.2]
> at
> org.apache.hive.service.server.HiveServer2$StartOptionExecutor.execute(HiveServer2.java:1305)
> [hive-s
> ervice-3.1.2.jar:3.1.2]
> at
> org.apache.hive.service.server.HiveServer2.main(HiveServer2.java:1149)
> [hive-service-3.1.2.jar:3.1.2]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_261]at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_261]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_261]at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_261]
> at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
> [hadoop-common-3.3.0.jar:?]at
> org.apache.hadoop.util.RunJar.main(RunJar.java:236)
> [hadoop-common-3.3.0.jar:?]
> Caused by: java.io.IOException: org.apache.thrift.TApplicationException:
> Internal error processing get_current_notificationEventId
> at
> org.apache.hadoop.hive.metastore.messaging.EventUtils$MSClientNotificationFetcher.getCurrentNotificat
> ionEventId(EventUtils.java:75) ~[hive-exec-3.1.2.jar:3.1.2]
> at
> org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll.(NotificationEventPoll.java:103
> ) ~[hive-exec-3.1.2.jar:3.1.2]
> at
> org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll.initialize(NotificationEventPoll.java
> :59) ~[hive-exec-3.1.2.jar:3.1.2]
> at
> org.apache.hive.service.server.HiveServer2.init(HiveServer2.java:273)
> ~[hive-service-3.1.2.jar:3.1.2]
> ... 10 more
> Caused by: org.apache.thrift.TApplicationException: Internal error
> processing get_current_notificationEventIdat
> org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
> ~[hive-exec-3.1.2.jar:3.
> 1.2]at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
> ~[hive-exec-3.1.2.jar:3.1.2]
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_current_notificationEventId(
> ThriftHiveMetastore.java:5575) ~[hive-exec-3.1.2.jar:3.1.2]
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_current_notificationEventId(Thrif
> tHiveMetastore.java:5563) ~[hive-exec-3.1.2.jar:3.1.2]
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getCurrentNotificationEventId(HiveMetaStoreClient.java:2723)
> ~[hive-exec-3.1.2.jar:3.1.2]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_261]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_261]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_261]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_261]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:212)
> ~[h
>
>
>
>
>
>
>
>
> 在 2020-10-27 19:58:32,"Rui Li"  写道:
> >你好,我看log里连接的是1端口,这个是HS2的端口吧?Flink的HiveCatalog需要连接的是HMS,可以启动一个HMS再试试哈。
> >
> >On Tue, Oct 27, 2020 at 9:57 AM RS  wrote:
> >
> >> Hi, 请教下
> >> 我尝试使用sql-client连接hive,  hive正常, 使用beeline -u jdbc:hive2://x.x.x.x:1
> >> 可以正常连接
> >>
> >>
> >> sql-client-defaults.yaml配置内容:
> >> tables: []
> >> functions: []
> >> catalogs:
> >> - name: myhive
> >>   type: hive
> >>   hive-conf-dir: /home/hive/flink-1.11.1/conf
> >>   default-database: default
> >> execution:
> >>   planner: blink
> >>   type: streaming
> >>   time-characteristic: event-time
> >>   periodic-watermarks-interval: 200
> >>   result-mode: table
> >>   max-table-result-rows: 100
> >>   parallelism: 1
> >>   max-parallelism: 128
> >>   min-idle-state-retention: 0
> >>   max-idle-state-retention: 0
> >>   restart-strategy:
> >> type: fallback
> >> deployment:
> >>   response-timeout: 5000
> >>   gateway-address: ""
> >>   gateway-port: 0
> >>
> >>
> >> 然后启动sql-client报错
> >> $./bin/sql-client.sh embedded
> >>
> >>
> >> 最后的报错信息:
> >> Exception in 

Re: 回复: 请教个Flink checkpoint的问题

2021-01-14 Thread
Evan说的这个是一个设置,但也仅影响cancel那个命令,stop还是会删除。这个点其实做的不是很好,不清楚为啥,之前Q过,没人鸟。。。
所以按照我的经验,如果是需要停止并基于保存点重启,那还好。如果计划基于检查点重启,无比提前备份检查点,然后停任务,然后复制备份回去。
在或者,直接cancel,不用stop。

Evan  于2021年1月14日周四 下午6:49写道:

> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
>  If you choose to retain externalized checkpoints on cancellation you have
> to handle checkpoint clean up manually when you cancel the job as well
> (terminating with job status JobStatus#CANCELED).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>
> 如回答有误,请指正。
>
>
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 18:02
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 代码如下:
> streamEnv.enableCheckpointing(5 * 60 * 1000);
> CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
> checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
> checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
> checkPointConfig.setMaxConcurrentCheckpoints(1);
> checkPointConfig.setTolerableCheckpointFailureNumber(3);
> checkPointConfig
>
> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
> try {
>   StateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointPath);
>   streamEnv.setStateBackend(rocksDBStateBackend);
>
>
>
> yinghua...@163.com
>
> 发件人: Evan
> 发送时间: 2021-01-14 17:55
> 收件人: user-zh
> 主题: 回复: 请教个Flink checkpoint的问题
> 代码图挂掉了,看不到代码
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> yinghua...@163.com
>


Re: Re: 请教个Flink checkpoint的问题

2021-01-14 Thread
机制就是这样的。如下是我之前做过的测试。
启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明
WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
通过命令取消任务:flink cancel ${jobId} 保留 OK
通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
通过命令停止任务(基于默认保存点目录):flink stop ${jobId} *不*保留 *注意别被特点坑*
通过命令停止任务并生成保存点:flink stop -p ${savepointDir} ${jobId} *不*保留 *注意别被特点坑 *

yinghua...@163.com  于2021年1月14日周四 下午6:23写道:

> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory
> //出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能
>
>
>
>
>
> yinghua...@163.com
>
> 发件人: tison
> 发送时间: 2021-01-14 18:04
> 收件人: user-zh
> 主题: Re: 请教个Flink checkpoint的问题
> 没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
> Best,
> tison.
>
>
> Evan  于2021年1月14日周四 下午5:56写道:
>
> > 代码图挂掉了,看不到代码
> >
> >
> >
> >
> > 发件人: yinghua...@163.com
> > 发送时间: 2021-01-14 17:26
> > 收件人: user-zh
> > 主题: 请教个Flink checkpoint的问题
> >
> >
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
> >
> >
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> >
> >
> > yinghua...@163.com
> >
>


Re: Flink webui 查询任务信息报错500

2021-01-13 Thread
好的,我找到了这个参数。不过这个参数表达啥含义知道吗,我看10MB不是个小数字感觉。

Evan  于2021年1月14日周四 下午1:54写道:

> 有这样一个参数“akka.framesize” ,可以在你启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上:
>
> akka.framesize
> "10485760b"StringMaximum size of messages which are sent between the
> JobManager and the TaskManagers. If Flink fails because messages exceed
> this limit, then you should increase it. The message size requires a
> size-unit specifier.
>
> 参考:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
>
>
>
>
>
> 发件人: 赵一旦
> 发送时间: 2021-01-14 11:38
> 收件人: user-zh
> 主题: Flink webui 查询任务信息报错500
> 报错500,开发者工具展示的异常信息如下。
>  side:↵org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
> The method requestJob's result size 19811407 exceeds the maximum size
> 10485760 .↵ at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:363)↵
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:337)↵
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)↵
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)↵
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)↵
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)↵
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)↵
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)↵
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)↵
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)↵
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)↵
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)↵
> at akka.dispatch.OnComplete.internal(Future.scala:264)↵ at
> akka.dispatch.OnComplete.internal(Future.scala:261)↵ at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)↵ at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)↵ at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)↵ at
>
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)↵
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)↵
> at
>
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)↵
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)↵ at
>
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)↵
> at
>
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)↵
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)↵ at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)↵ at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)↵ at
>
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)↵
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)↵
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)↵ at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)↵
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)↵ at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)↵
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)↵
> at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)↵↵End
> of exception on server side>"
>
> 想知道这个是什么情况,以及需要调整什么参数呢?
>


Flink webui 查询任务信息报错500

2021-01-13 Thread
报错500,开发者工具展示的异常信息如下。
"

想知道这个是什么情况,以及需要调整什么参数呢?


Re: flink1.11.1 如何让多个log4j配置文件生效

2021-01-13 Thread
个人观点:
这个应该不可以,你提交的任务最终实际是打包给tm去执行的,使用的是tm的日志配置,而不是你自己的配置。
你自己那个配置仅仅用于本地调试启动的时候有效。

nicygan  于2021年1月13日周三 上午9:55写道:

> dear all:
>  我的flink任务提交到yarn运行,
>  默认生效的是日志配置是flink/conf中的log4j.properties。
>  但我的应用jar包中还有一个log4j2.xml,这里面配置了KafkaAppend,要把日志发送到kafka。
>  我要如果设置,才能让这两个配置文件都生效呢?
>  哪位大侠有配置经验。
>
>
>
> thanks
> by nicygan
>


Re: 为什么我这个打印的是上次的watermark,而是用的是这次的watermark呢,为什么打印的不是这次的呢

2021-01-12 Thread
描述还是不清晰。
watermark是定期生成,你获取的时候不一定已经更新。

何宗谨  于2021年1月13日周三 上午10:20写道:

>
> 允许的时间间隔是3秒,每次打印的都是上一个时间戳的watermark,但是使用的好像还是这次的
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月13日(星期三) 上午10:02
> *收件人:* "user-zh";
> *主题:* Re: 为什么我这个打印的是上次的watermark,而是用的是这次的watermark呢,为什么打印的不是这次的呢
>
> 图挂了。
>
> 何宗谨  于2021年1月13日周三 上午9:20写道:
>
> >
> >
> >
> >
> >
>


Re: 为什么我这个打印的是上次的watermark,而是用的是这次的watermark呢,为什么打印的不是这次的呢

2021-01-12 Thread
图挂了。

何宗谨  于2021年1月13日周三 上午9:20写道:

>
>
>
>
>


Re: flink编译报错

2021-01-12 Thread
设置下镜像可以。

Yun Tang  于2021年1月12日周二 下午5:37写道:

> Hi,
>
> 国内网络环境不太好,其实问题是node.js 安装有问题,可以考虑单独安装一下node
> js和npm,如果还是不行,在不需要webui的前提下,可以编译时候加上profile “-Pskip-webui-build”
> 来skip掉该部分的编译。
>
> 祝好
> 唐云
> 
> From: Ruguo Yu 
> Sent: Tuesday, January 12, 2021 14:00
> To: user-zh@flink.apache.org 
> Subject: Re: flink编译报错
>
> 试下这个命令
> mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=2.7.6
> -Dinclude-hadoop -Dscala-2.11 -T2C
> 其中,-Dhaoop.version 为 hadoop 版本号
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-08 Thread
使用Flink的rest
api不可以嘛。我是standalone集群,写个python脚本,写了个list为expected_jobs,如果发现集群没这个job就报警。

Yun Tang  于2021年1月8日周五 上午10:53写道:

> 因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。
>
>
> 另外,不建议使用YARN的application状态来判断Flink作业状态,因为如果Flink作业配置了重试策略,即使作业不断进行failover,整个YARN的application状态仍然是RUNNING,并不能发现问题。
>
> 祝好
> 唐云
> 
> From: bradyMk 
> Sent: Thursday, January 7, 2021 16:38
> To: user-zh@flink.apache.org 
> Subject: Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
>
> 好的,我研究一下,谢谢指导~
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


检查点无错误,但是日志中有错误,不清楚是什么原理呢?

2021-01-08 Thread
检查点无错误,但检查点配置的后端sdk有报错,所以不清楚这个错误究竟有没有影响。下面是报错堆栈,帮忙分析下这个是写检查点数据的过程吗?如果是的话,404是什么意思。找不到?找不到啥。。。

com.baidubce.BceServiceException: Not Found (Status Code: 404; Error Code:
null; Request ID: 624d3468-8d7b-46f7-be5d-750c9039893d)
at
com.baidubce.http.handler.BceErrorResponseHandler.handle(BceErrorResponseHandler.java:59)
~[bce-java-sdk-0.10.82.jar:?]
at com.baidubce.http.BceHttpClient.execute(BceHttpClient.java:243)
~[bce-java-sdk-0.10.82.jar:?]
at
com.baidubce.AbstractBceClient.invokeHttpClient(AbstractBceClient.java:189)
~[bce-java-sdk-0.10.82.jar:?]
at
com.baidubce.services.bos.BosClient.getObjectMetadata(BosClient.java:1189)
~[bce-java-sdk-0.10.82.jar:?]
at
com.baidubce.services.bos.BosClient.getObjectMetadata(BosClient.java:1171)
~[bce-java-sdk-0.10.82.jar:?]
at
org.apache.hadoop.fs.bos.BosNativeFileSystemStore.retrieveMetadata(BosNativeFileSystemStore.java:531)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_251]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_251]
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
~[hadoop-common-2.7.5.jar:?]
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
~[hadoop-common-2.7.5.jar:?]
at org.apache.hadoop.fs.bos.$Proxy22.retrieveMetadata(Unknown
Source) ~[?:?]
at
org.apache.hadoop.fs.bos.BaiduBosFileSystem.getFileStatus(BaiduBosFileSystem.java:252)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425)
~[hadoop-common-2.7.5.jar:?]
at
org.apache.hadoop.fs.bos.BaiduBosFileSystem.create(BaiduBosFileSystem.java:163)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
~[hadoop-common-2.7.5.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:891)
~[hadoop-common-2.7.5.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:788)
~[hadoop-common-2.7.5.jar:?]
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:149)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:389)
~[flink-dist_2.11-1.12.0.jar:1.1
2.0]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:254)
~[flink-dist_2.11-1.12.0.jar:1.12
.0]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:230)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.io.DataOutputStream.write(DataOutputStream.java:107)
~[?:1.8.0_251]
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
~[?:1.8.0_251]
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:397)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:392)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.shaded.netty4.io.netty.buffer.SlicedByteBuf.getBytes(SlicedByteBuf.java:26)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.shaded.netty4.io.netty.buffer.ReadOnlyByteBuf.getBytes(ReadOnlyByteBuf.java:264)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:136)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:226)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:133)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeOutput(ChannelStateCheckpointWriter.java:125)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at

Flink1.12触发保存点时失败

2021-01-07 Thread
报错信息如下:
java.lang.IllegalArgumentException: Can not set long field
com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:167)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:171)
at sun.reflect.UnsafeLongFieldAccessorImpl.set(
UnsafeLongFieldAccessorImpl.java:80)
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:409)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)


根据堆栈找到报错位置代码为:

try {
   for (int i = 0; i < numFields; i++) {
  boolean isNull = source.readBoolean();

  if (fields[i] != null) {
 if (isNull) {
fields[i].set(target, null); // 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
 } else {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
 }
  } else if (!isNull) {
 // read and dump a pre-existing field value
 fieldSerializers[i].deserialize(source);
  }
   }
} catch (IllegalAccessException e) {
   throw new RuntimeException("Error during POJO copy, this should not
happen since we check the fields before.", e);
}


Re: Re: Re: Re: Re: 修改flink的任务调度

2021-01-06 Thread
没有的。

penguin.  于2021年1月7日周四 下午1:04写道:

> 赵一旦:
> 所以目前是否有办法来实现在提交任务后,将这个任务的subtask调度到指定机器的某个slot来执行呢。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-07 12:57:35,"赵一旦"  写道:
> >不一样的哈。不是一个层次的东西。
> >调度平台指的是在指定时间自动帮你提交某个任务,或者每天定时提交某个任务等。
> >
> >后者是flink内部的机制,指提交任务后,这个任务的每个subtask应该使用哪台机器哪个slot去执行。
> >
> >penguin.  于2021年1月7日周四 下午12:50写道:
> >
> >> 赵一旦:
> >> 你说的任务调度平台是指通过这种平台来完全控制flink中的task到具体某个节点的调度吗?
> >> 我想的是flink自己内部的task到节点的调度。比如说通过修改flink现在的调度部分的代码来实现。
> >> 是不是这两种都可以用来实现 根据我们自己的需求来决定将task具体调度哪个节点中。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-01-07 12:24:42,"赵一旦"  写道:
> >> >你说的是任务调度有2层含义。一种任务调度平台(这个很常见)。还是flink自身的task的schedule,这个是很复杂。
> >> >
> >> >penguin.  于2021年1月7日周四 上午10:32写道:
> >> >
> >> >>
> >> >>
> >> >>
> >> >> 我在知网的一篇论文中看到有作者做的flink任务调度,但是发了邮件很久也没人回复。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-01-07 10:21:27,"赵一旦"  写道:
> >> >> >是的,之前有人给过我回复,说当前flink的调度信息不足,导致无法做到很理想的调度。
> >> >> >
> >> >> >penguin.  于2021年1月7日周四 上午10:11写道:
> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >>
> 我在jira上看到好像有人在做,但是好像无法获取到更多的信息。也不知道他们是怎么做的。主要应该是找到进行任务调度那块的代码,不过源码注释好像很少,很困难。
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2021-01-06 13:06:20,"赵一旦"  写道:
> >> >> >> >我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。
> >> >> >> >
> >> >> >> >penguin.  于2021年1月6日周三 上午11:15写道:
> >> >> >> >
> >> >> >> >> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
>


Re: Re: Re: Re: 修改flink的任务调度

2021-01-06 Thread
不一样的哈。不是一个层次的东西。
调度平台指的是在指定时间自动帮你提交某个任务,或者每天定时提交某个任务等。

后者是flink内部的机制,指提交任务后,这个任务的每个subtask应该使用哪台机器哪个slot去执行。

penguin.  于2021年1月7日周四 下午12:50写道:

> 赵一旦:
> 你说的任务调度平台是指通过这种平台来完全控制flink中的task到具体某个节点的调度吗?
> 我想的是flink自己内部的task到节点的调度。比如说通过修改flink现在的调度部分的代码来实现。
> 是不是这两种都可以用来实现 根据我们自己的需求来决定将task具体调度哪个节点中。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-07 12:24:42,"赵一旦"  写道:
> >你说的是任务调度有2层含义。一种任务调度平台(这个很常见)。还是flink自身的task的schedule,这个是很复杂。
> >
> >penguin.  于2021年1月7日周四 上午10:32写道:
> >
> >>
> >>
> >>
> >> 我在知网的一篇论文中看到有作者做的flink任务调度,但是发了邮件很久也没人回复。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-01-07 10:21:27,"赵一旦"  写道:
> >> >是的,之前有人给过我回复,说当前flink的调度信息不足,导致无法做到很理想的调度。
> >> >
> >> >penguin.  于2021年1月7日周四 上午10:11写道:
> >> >
> >> >>
> >> >>
> >>
> 我在jira上看到好像有人在做,但是好像无法获取到更多的信息。也不知道他们是怎么做的。主要应该是找到进行任务调度那块的代码,不过源码注释好像很少,很困难。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-01-06 13:06:20,"赵一旦"  写道:
> >> >> >我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。
> >> >> >
> >> >> >penguin.  于2021年1月6日周三 上午11:15写道:
> >> >> >
> >> >> >> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>


Re: Re: Re: 修改flink的任务调度

2021-01-06 Thread
你说的是任务调度有2层含义。一种任务调度平台(这个很常见)。还是flink自身的task的schedule,这个是很复杂。

penguin.  于2021年1月7日周四 上午10:32写道:

>
>
>
> 我在知网的一篇论文中看到有作者做的flink任务调度,但是发了邮件很久也没人回复。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-07 10:21:27,"赵一旦"  写道:
> >是的,之前有人给过我回复,说当前flink的调度信息不足,导致无法做到很理想的调度。
> >
> >penguin.  于2021年1月7日周四 上午10:11写道:
> >
> >>
> >>
> 我在jira上看到好像有人在做,但是好像无法获取到更多的信息。也不知道他们是怎么做的。主要应该是找到进行任务调度那块的代码,不过源码注释好像很少,很困难。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-01-06 13:06:20,"赵一旦"  写道:
> >> >我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。
> >> >
> >> >penguin.  于2021年1月6日周三 上午11:15写道:
> >> >
> >> >> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
> >>
> >>
> >>
> >>
> >>
>


Re: Re: 修改flink的任务调度

2021-01-06 Thread
是的,之前有人给过我回复,说当前flink的调度信息不足,导致无法做到很理想的调度。

penguin.  于2021年1月7日周四 上午10:11写道:

>
> 我在jira上看到好像有人在做,但是好像无法获取到更多的信息。也不知道他们是怎么做的。主要应该是找到进行任务调度那块的代码,不过源码注释好像很少,很困难。
>
>
>
>
>
>
>
>
>
> 在 2021-01-06 13:06:20,"赵一旦"  写道:
> >我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。
> >
> >penguin.  于2021年1月6日周三 上午11:15写道:
> >
> >> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
>
>
>
>
>


reason for endless backpressure

2021-01-06 Thread
I've had a problem many times. When the task suddenly continues to back
pressure, the back pressure node will no longer send any records unless the
task is restarted. But I can confirm that it's not due to high pressure.
During the back pressure period, the CPU utilization of the machine is all
reduced, but not increased.

At present, I initially suspect that it has something to do with the
Internet. Does anyone know the principle? Can network jitter cause this
phenomenon?


Re: 请教关于是否有除了计算压力以外的反压原因。比如‘网络’?

2021-01-06 Thread
我比较倾向于是网络原因。但flink的日志目前无法很明显反映和确认。期望有人从flink反压机制角度考虑下,有没有因为网络“抖动”,比如长连接断开等问题导致反压的case。而且这种情况是否会自动恢复呢?从我的几次经验来看我不重启就不恢复。。。

赵一旦  于2021年1月6日周三 下午11:43写道:

> 如题,反压的原因,不考虑计算压力大,并行度不合理等问题。
> 比如是否可能和网络也有关呢?
>
> 考虑如下case,A->B->C这么一个拓扑,我A(source)结点反压100%,数据彻底不再发送,但B和C都不反压。但是B、C都是非常简单(不可能存在性能问题)。那这还有什么解释吗?
>
> 比如,A和B之间网络是否可能出问题呢?
>
> 此外,从机器cpu等监控来看,出现反压后,cpu
> idle提升,即反压到cpu利用率直接降低,且cpu在附近实际无升高的迹象。因此不会是瞬间有压力来导致反压。
> 我当前怀疑和网络有关,有人知道如何确认吗。这种case是否有可能自动恢复呢。
>
> 我最近貌似遇到过好几次类似的case,就是反压到直接不发送数据,整个任务彻底停滞。最终解决方式:1
> 停任务(而且每次停任务都会有1个task长期处于canceling最终导致tm失败) 2 停ok并且重启tm后,重启任务。任务运行恢复正常。
>
> 从如上来看,也更进一步证明了不是压力问题,否则为什么我重启就没问题了。不重启则是“一直”反压停滞。
>
>
>
>
>
>


请教关于是否有除了计算压力以外的反压原因。比如‘网络’?

2021-01-06 Thread
如题,反压的原因,不考虑计算压力大,并行度不合理等问题。
比如是否可能和网络也有关呢?
考虑如下case,A->B->C这么一个拓扑,我A(source)结点反压100%,数据彻底不再发送,但B和C都不反压。但是B、C都是非常简单(不可能存在性能问题)。那这还有什么解释吗?

比如,A和B之间网络是否可能出问题呢?

此外,从机器cpu等监控来看,出现反压后,cpu
idle提升,即反压到cpu利用率直接降低,且cpu在附近实际无升高的迹象。因此不会是瞬间有压力来导致反压。
我当前怀疑和网络有关,有人知道如何确认吗。这种case是否有可能自动恢复呢。

我最近貌似遇到过好几次类似的case,就是反压到直接不发送数据,整个任务彻底停滞。最终解决方式:1
停任务(而且每次停任务都会有1个task长期处于canceling最终导致tm失败) 2 停ok并且重启tm后,重启任务。任务运行恢复正常。

从如上来看,也更进一步证明了不是压力问题,否则为什么我重启就没问题了。不重启则是“一直”反压停滞。


Re: flink timestamp 解析问题

2021-01-05 Thread
可以看下文档去,配置忽略解析错误。

air23  于2021年1月6日周三 上午10:41写道:

> 你好 这边使用flink sql有如下问题;
>
>
>
>
>
>
> CREATE TABLE source1 (
> id BIGINT   ,
> username STRING ,
> password STRING  ,
> AddTime TIMESTAMP  ,
> origin_table STRING METADATA FROM 'value.table' VIRTUAL
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'plink_canal',
> 'properties.bootstrap.servers' = '',
> 'properties.group.id' = 'canal1',
> 'scan.startup.mode' = 'group-offsets',
> 'canal-json.table.include' = 'test.*',
> 'format' = 'canal-json'
> );
>
>
>
> 当binlog 数据为非法日期时候。会出现如下报错
> Caused by: java.time.format.DateTimeParseException: Text '-00-00
> 00:00:00' could not be parsed: Invalid value for MonthOfYear (valid values
> 1 - 12): 0
> 这个有办法解决吗。我们上游业务库 数据 就会有这样的非法日期,
> 谢谢回答
>
>
>
>


Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 Thread
这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。

Carmen Free  于2021年1月6日周三 上午10:58写道:

> 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
>
> 紧接着我这边出现了新的异常
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input at [Source:UNKONWN; line: -1, column:
> -1;]
>
> 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
>
> 但是如果遇到了kafka消息为空的情况,这边不能处理吗?
>
> 赵一旦  于2021年1月5日周二 下午9:18写道:
>
> > 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> > ty.plain.PlainLoginModule
> >
> > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
> >
> > Carmen Free  于2021年1月5日周二 下午5:09写道:
> >
> > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> > >
> > > 1、版本说明
> > > flink版本:1.10.2
> > > kafka版本:1.1.0
> > >
> > > 2、kafka鉴权说明
> > > 仅使用了sasl鉴权方式
> > > 在kafka客户端有配置 kafka_server-jass.conf、
> > > server.properties、producer.properties、consumer.properties
> > >
> > > 3、主要配置参数
> > > sasl.mechanism=PLAIN
> > > security.protocol=SASL_PLAINTEXT
> > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > > required username="xx" password="xx-secret";
> > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> > >
> > > 4、用于flink SQL连接的jar包
> > > flink-sql-connector-kafka_2.11-1.10.2.jar
> > > flink-jdbc_2.11-1.10.2.jar
> > > flink-csv-1.10.2-sql-jar.jar
> > >
> > >
> > > 5、我的思路
> > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> > >
> > > 6、启动客户端
> > > ./bin/sql-client.sh embedded -l sql_lib/
> > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> > >
> > >
> > > 7、建表语句:
> > > create table test_hello (
> > > name string
> > > ) with (
> > > ...
> > > ...
> > > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > > 'connector.properties.sasl.jaas.config' =
> > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > > username="xx" password="xx-secret";',
> > > 'format.type' = 'csv'
> > > );
> > >
> > > 建表没有问题,可以正常建表。
> > >
> > > 查询表的时候,就会报错,select * from test_hello;
> > > 报错如下:
> > > could not execute sql statement. Reason:
> > > javax.security.auth.login.loginException: unable to find loginModule
> > class:
> > > org.apache.kafka.common.security.plain.PlainLoginModule
> > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> > >
> > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> > >
> >
>


Re: 修改flink的任务调度

2021-01-05 Thread
我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。

penguin.  于2021年1月6日周三 上午11:15写道:

> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?


Re: checkpoint失败怎么排查

2021-01-05 Thread
那为什么没有日志呢,去机器看日志呗。

 于2021年1月6日周三 上午10:11写道:

> 应该是状态大,超时设了10分钟,还没有达到超时时间。到处找不到相关日志。
>
> 发自我的iPhone
>
> > 在 2021年1月6日,10:03,赵一旦  写道:
> >
> > 没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
> >
> >  于2021年1月6日周三 上午9:53写道:
> >
> >> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
> >>
> >> 发自我的iPhone
>


Re: 请教Flink中关于窗口的问题

2021-01-05 Thread
你这个方法就可以的哈,至于第二个窗口又聚到一个结点的问题本身就是原始问题,基于你的方法缓解即可,第二层不可避免的。
你需要做的是调整合理的参数,使得第二层的数据虽然不均衡,但数据量以及足够低就可以了。
此外,还需要注意,当前key数量假设1w,加10随机就是10w,加100随机就是100w。这个key的膨胀也很严重的。最好的做法是仅针对高数据量的key分拆。

syumialiu  于2021年1月5日周二 下午11:53写道:

>
> 我在一个job中有一些很大的数据(key的种类很少,但是单个key下的数据数量很多),基本要实现的是一个时间滑动窗口结束时,当某个key的数量大于一个固定值后,将该key下的所有原数据输出。我现在的方法是将key加后缀,然后keyBy做窗口,但是这个做完之后还是需要再次keyBy把数据还原回去,并且这个过程又将全量数据拉到了一个节点上,请问有没有一些别的解决方法?
>
>
> | |
> syumialiu
> |
> |
> syumia...@163.com
> |
> 签名由网易邮箱大师定制


Re: checkpoint失败怎么排查

2021-01-05 Thread
没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。

 于2021年1月6日周三 上午9:53写道:

> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
>
> 发自我的iPhone


Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread
I think what you need is
http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .

The isolation.level setting's default value is read_uncommitted. So, maybe
you do not use the default setting?

赵一旦  于2021年1月5日周二 下午9:10写道:

> I do not have this problem, so I guess it is related with the config of
> your kafka producer and consumer, and maybe kafka topic properties or kafka
> server properties also.
>
> Arvid Heise  于2021年1月5日周二 下午6:47写道:
>
>> Hi Daniel,
>>
>> Flink commits transactions on checkpoints while Kafka Streams/connect
>> usually commits on record. This is the typical tradeoff between Throughput
>> and Latency. By decreasing the checkpoint interval in Flink, you can reach
>> comparable latency to Kafka Streams.
>>
>> If you have two exactly once jobs, the second job may only read data that
>> has been committed (not dirty as Chesnay said). If the second job were to
>> consume data that is uncommitted, it will result in duplicates, in case the
>> first job fails and rolls back.
>>
>> You can configure the read behavior with isolation.level. If you want to
>> implement exactly once behavior, you also need to set that level in your
>> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
>> you want to go exactly once [2].
>>
>> If you really want low latency, please also double-check if you really
>> need exactly once.
>>
>> [1]
>> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
>> [2]
>> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>>
>> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
>> wrote:
>>
>>> I don't particularly know the our Kafka connector, but it sounds like a
>>> matter of whether a given consumer does dirty reads.
>>> Flink does not, whereas the other tools you're using do.
>>>
>>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>>
>>> Hello,
>>>
>>> We have 2 flink jobs that communicate with each other through a KAFKA
>>> topic.
>>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>>
>>> We have seen the following behaviour and we want to make sure and ask if
>>> this is the expected behaviour or maybe it is a bug.
>>>
>>> When the first job produces a message to KAFKA, the message is consumed
>>>  by the second job with a latency that depends on the *first* job 
>>> *checkpoint
>>> interval*.
>>>
>>> We are able to read the message using the kafka tool or using another
>>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>>> the checkpoint interval of the first job.
>>>
>>> How come the consumer of the second job depends on the producer
>>> transaction commit time of the first job ?
>>>
>>> BR,
>>> Danny
>>>
>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 Thread
我感觉还是jar的问题。如下尝试下,我懒得去试了。
将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
ty.plain.PlainLoginModule

因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。

Carmen Free  于2021年1月5日周二 下午5:09写道:

> flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
>
> 1、版本说明
> flink版本:1.10.2
> kafka版本:1.1.0
>
> 2、kafka鉴权说明
> 仅使用了sasl鉴权方式
> 在kafka客户端有配置 kafka_server-jass.conf、
> server.properties、producer.properties、consumer.properties
>
> 3、主要配置参数
> sasl.mechanism=PLAIN
> security.protocol=SASL_PLAINTEXT
> sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> required username="xx" password="xx-secret";
> 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
>
> 4、用于flink SQL连接的jar包
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-jdbc_2.11-1.10.2.jar
> flink-csv-1.10.2-sql-jar.jar
>
>
> 5、我的思路
> 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
>
> 6、启动客户端
> ./bin/sql-client.sh embedded -l sql_lib/
> 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
>
>
> 7、建表语句:
> create table test_hello (
> name string
> ) with (
> ...
> ...
> 'connector.properties.sasl.mechanism' = 'PLAIN',
> 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> 'connector.properties.sasl.jaas.config' =
> 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> username="xx" password="xx-secret";',
> 'format.type' = 'csv'
> );
>
> 建表没有问题,可以正常建表。
>
> 查询表的时候,就会报错,select * from test_hello;
> 报错如下:
> could not execute sql statement. Reason:
> javax.security.auth.login.loginException: unable to find loginModule class:
> org.apache.kafka.common.security.plain.PlainLoginModule
> 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
>
> kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
>


Re: flink cpu 利用率

2021-01-05 Thread
可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。



housezhang  于2021年1月5日周二 下午5:44写道:

> 有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread
I do not have this problem, so I guess it is related with the config of
your kafka producer and consumer, and maybe kafka topic properties or kafka
server properties also.

Arvid Heise  于2021年1月5日周二 下午6:47写道:

> Hi Daniel,
>
> Flink commits transactions on checkpoints while Kafka Streams/connect
> usually commits on record. This is the typical tradeoff between Throughput
> and Latency. By decreasing the checkpoint interval in Flink, you can reach
> comparable latency to Kafka Streams.
>
> If you have two exactly once jobs, the second job may only read data that
> has been committed (not dirty as Chesnay said). If the second job were to
> consume data that is uncommitted, it will result in duplicates, in case the
> first job fails and rolls back.
>
> You can configure the read behavior with isolation.level. If you want to
> implement exactly once behavior, you also need to set that level in your
> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
> you want to go exactly once [2].
>
> If you really want low latency, please also double-check if you really
> need exactly once.
>
> [1]
> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
> [2]
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>
> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
> wrote:
>
>> I don't particularly know the our Kafka connector, but it sounds like a
>> matter of whether a given consumer does dirty reads.
>> Flink does not, whereas the other tools you're using do.
>>
>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>
>> Hello,
>>
>> We have 2 flink jobs that communicate with each other through a KAFKA
>> topic.
>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>
>> We have seen the following behaviour and we want to make sure and ask if
>> this is the expected behaviour or maybe it is a bug.
>>
>> When the first job produces a message to KAFKA, the message is consumed
>>  by the second job with a latency that depends on the *first* job *checkpoint
>> interval*.
>>
>> We are able to read the message using the kafka tool or using another
>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>> the checkpoint interval of the first job.
>>
>> How come the consumer of the second job depends on the producer
>> transaction commit time of the first job ?
>>
>> BR,
>> Danny
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: flink 空指针警告

2021-01-05 Thread
这个问题en...出在如下地方:

KeyedStream keyByStream =
signoutTimeAndWM.keyBy(new KeySelector() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime());  //  此处,不可以使用new Date这种当前时间。
}
});

修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date
然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。
然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。

原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。

lp <973182...@qq.com> 于2021年1月5日周二 下午8:11写道:

> operator操作:processWindowFunction的代码如下:
>
> class MyProcessWindowFuncation extends
> ProcessWindowFunction Tuple2String, String>>, String, TimeWindow>{
> private transient MapState>
> eveShareNoMaxPrice;
> private transient ValueState String>>> shareAndMaxPrice;
>
>
> @Override
> public void process(String s, Context context,
> Iterable elements, Collector Tuple2String, String>>> out) throws Exception {
> Iterator iterator = elements.iterator();
>
> //得到每trigger周期内每个shareNo的最大值
> while (iterator.hasNext()) {
> ShareRealTimeData next = iterator.next();
> Tuple2 t2 =
> eveShareNoMaxPrice.get(next.getShareNo());
> if (t2 == null || t2.f1 < next.getCurrentPrice()) {
> eveShareNoMaxPrice.put(next.getShareNo(),
> Tuple2.of(next.getShareName(), next.getCurrentPrice()));
> }
> }
>
>
> TreeMap> shareAndMaxPriceV =
> shareAndMaxPrice.value();
> if (shareAndMaxPriceV == null) {
> shareAndMaxPriceV = new TreeMap(new Comparator() {
> @Override
> public int compare(Double o1, Double o2) {
> return Double.compare(o2, o1);
> }
> });
> }
> Iterator>>
> keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
> while (keysAndMaxPrice.hasNext()) {
> Map.Entry> next =
> keysAndMaxPrice.next();
>
> shareAndMaxPriceV.put(next.getValue().f1,
> Tuple2.of(next.getKey(), next.getValue().f0));
> if (shareAndMaxPriceV.size() > 20) {
> shareAndMaxPriceV.pollLastEntry();
> }
> }
>
> eveShareNoMaxPrice.clear();
> shareAndMaxPrice.clear();
>
> out.collect(shareAndMaxPriceV);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> eveShareNoMaxPrice = getRuntimeContext().getMapState(new
> MapStateDescriptor>("eveShareNoMaxPrice",
> TypeInformation.of(new TypeHint() {
> }), TypeInformation.of(new TypeHint Double>>()
> {
> })));
> shareAndMaxPrice = getRuntimeContext().getState(new
> ValueStateDescriptor String>>>("shareAndMaxPrice", TypeInformation.of(new
> TypeHint>>() {
> })));
> }
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: flink cpu 利用率

2021-01-05 Thread
不纠结几核。如果任务结点本身不多的话,可以提一提再,只要network buffer数量够就好。

爱吃鱼  于2021年1月5日周二 下午4:39写道:

> 24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-05 16:24:30,"赵一旦"  写道:
> >加大并行度。
> >
> >爱吃鱼  于2021年1月5日周二 下午4:18写道:
> >
> >> 怎么提高flink cpu利用率。
> >> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
> >> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
> >> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
>
>
>
>
>
>


Re: flink cpu 利用率

2021-01-05 Thread
加大并行度。

爱吃鱼  于2021年1月5日周二 下午4:18写道:

> 怎么提高flink cpu利用率。
> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。


关于flink rest api的监控接口问题

2021-01-04 Thread
当前vertex的结点监控,有个获取全部指标的接口,和基于get参数逗号分割获取指标值的接口。

现在问题是我的采集脚本在获取监控值时候,因为是get导致超长,于是我5个5个的获取,但这导致我每30s一次采集,每次采集上百次请求,耗时达到几十秒。

是否可以搞个post接口;或者在metrics那个获取全部metric指标id的接口中就直接返回全部value呢?


  1   2   3   >