Re: flink kryo exception

2021-02-07 Thread 赵一旦
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map'
based implementation before the data are transformed to the second task,
and this case savepoint works.  The only problem is that, I should stop the
job and remember the savepoint path, then restart job with the savepoint
path. And now it is : I stop the job, then the job failed and restart
automatically with the generated savepoint.  So I do not need to restart
the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not
sure that I can provide an example to reproduces the problem.

Till Rohrmann  于2021年2月6日周六 上午12:13写道:

> Could you provide us with a minimal working example which reproduces the
> problem for you? This would be super helpful in figuring out the problem
> you are experiencing. Thanks a lot for your help.
>
> Cheers,
> Till
>
> On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:
>
>> Yeah, and if it is different, why my job runs normally.  The problem only
>> occurres when I stop it.
>>
>> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>>
>>> Are you 100% sure that the jar files in the classpath (/lib folder) are
>>> exactly the same on all machines? (It can happen quite easily in a
>>> distributed standalone setup that some files are different)
>>>
>>>
>>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>>>
 Flink1.12.0; only using aligned checkpoint; Standalone Cluster;



 Robert Metzger  于2021年2月5日周五 下午6:52写道:

> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
> which can lead to corrupted data when using UC)
> Can you tell us a little bit about your environment? (How are you
> deploying Flink, which state backend are you using, what kind of job (I
> guess DataStream API))
>
> Somehow the process receiving the data is unable to deserialize it,
> most likely because they are configured differently (different classpath,
> dependency versions etc.)
>
> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>
>> I do not think this is some code related problem anymore, maybe it is
>> some bug?
>>
>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>
>>> Hi all, I find that the failure always occurred in the second task,
>>> after the source task. So I do something in the first chaining task, I
>>> transform the 'Map' based class object to another normal class object, 
>>> and
>>> the problem disappeared.
>>>
>>> Based on the new solution, I also tried to stop and restore job with
>>> savepoint (all successful).
>>>
>>> But, I also met another problem. Also this problem occurs while I
>>> stop the job, and also occurs in the second task after the source task. 
>>> The
>>> log is below:
>>> 2021-02-05 16:21:26
>>> java.io.EOFException
>>> at org.apache.flink.core.memory.DataInputDeserializer
>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>> at org.apache.flink.types.StringValue.readString(StringValue
>>> .java:783)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>> .deserialize(StringSerializer.java:75)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>> .deserialize(StringSerializer.java:33)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>> at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202
>>> )
>>> at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>> at org.apache.flink.runtime.plugable.
>>> NonReusingDeserializationDelegate.read(
>>> NonReusingDeserializationDelegate.java:55)
>>> at org.apache.flink.runtime.io.network.api.serialization.
>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:145)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>> .processInput(StreamTwoInputProcessor.java:92)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .processInput(StreamTask.java:372)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.
>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(Stre

Re: flink kryo exception

2021-02-07 Thread 赵一旦
It also maybe have something to do with my job's first tasks. The second
task have two input, one is the kafka source stream(A), another is
self-defined mysql source as broadcast stream.(B)
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an
offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only
the subtask-0 will do mysql query and send out records, other subtasks do
nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation,
I do not know whether the problem have something to do with it.

赵一旦  于2021年2月7日周日 下午4:05写道:

> The first problem is critical, since the savepoint do not work.
> The second problem, in which I changed the solution, removed the 'Map'
> based implementation before the data are transformed to the second task,
> and this case savepoint works.  The only problem is that, I should stop the
> job and remember the savepoint path, then restart job with the savepoint
> path. And now it is : I stop the job, then the job failed and restart
> automatically with the generated savepoint.  So I do not need to restart
> the job anymore, since what it does automatically is what I want to do.
>
> I have some idea that maybe it is also related to the data? So I am not
> sure that I can provide an example to reproduces the problem.
>
> Till Rohrmann  于2021年2月6日周六 上午12:13写道:
>
>> Could you provide us with a minimal working example which reproduces the
>> problem for you? This would be super helpful in figuring out the problem
>> you are experiencing. Thanks a lot for your help.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:
>>
>>> Yeah, and if it is different, why my job runs normally.  The problem
>>> only occurres when I stop it.
>>>
>>> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>>>
 Are you 100% sure that the jar files in the classpath (/lib folder) are
 exactly the same on all machines? (It can happen quite easily in a
 distributed standalone setup that some files are different)


 On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:

> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>
>
>
> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>
>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
>> which can lead to corrupted data when using UC)
>> Can you tell us a little bit about your environment? (How are you
>> deploying Flink, which state backend are you using, what kind of job (I
>> guess DataStream API))
>>
>> Somehow the process receiving the data is unable to deserialize it,
>> most likely because they are configured differently (different classpath,
>> dependency versions etc.)
>>
>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>
>>> I do not think this is some code related problem anymore, maybe it
>>> is some bug?
>>>
>>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>>
 Hi all, I find that the failure always occurred in the second task,
 after the source task. So I do something in the first chaining task, I
 transform the 'Map' based class object to another normal class object, 
 and
 the problem disappeared.

 Based on the new solution, I also tried to stop and restore job
 with savepoint (all successful).

 But, I also met another problem. Also this problem occurs while I
 stop the job, and also occurs in the second task after the source 
 task. The
 log is below:
 2021-02-05 16:21:26
 java.io.EOFException
 at org.apache.flink.core.memory.DataInputDeserializer
 .readUnsignedByte(DataInputDeserializer.java:321)
 at org.apache.flink.types.StringValue.readString(StringValue
 .java:783)
 at org.apache.flink.api.common.typeutils.base.StringSerializer
 .deserialize(StringSerializer.java:75)
 at org.apache.flink.api.common.typeutils.base.StringSerializer
 .deserialize(StringSerializer.java:33)
 at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
 .deserialize(PojoSerializer.java:411)
 at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
 .deserialize(PojoSerializer.java:411)
 at org.apache.flink.streaming.runtime.streamrecord.
 StreamElementSerializer.deserialize(StreamElementSerializer.java:
 202)
 at org.apache.flink.streaming.runtime.streamrecord.
 StreamElementSerializer.deserialize(StreamElementSerializer.java:46
 )
 at org.apache.flink.runtime.plugable.
 NonReusingDeserializationDelegate.read(
 NonReusingDeserializationDelegate.java:55)
 at org.apache.flink.runtime.io.network.api.serialization.
 Spilling

question on ValueState

2021-02-07 Thread Colletta, Edward
Using FsStateBackend.

I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.

However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.

Can someone clarify what really happens when value() is called?


   public void processElement(M in, Context ctx, Collector out) throws 
Exception {
MyWindow myWindow;
myWindow = windowState.value();
if (myWindow == null) {

ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);
myWindow = new MyWindow(0L, slide, windowSize);
windowState.update(myWindow);
myWindow.eq.add(0L);
}

myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {

ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);
MyWindow myWindow = windowState.value();
myWindow.slide(0L);
out.collect(myWindow.globalAccum);
}




Re: Cannot connect to queryable state proxy

2021-02-07 Thread 陳昌倬
On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote:
> Hi,
> 
> We have problem connecting to queryable state client proxy as described
> in [0]. Any help is appreciated.
> 
> * The port 6125 is opened in taskmanager pod.
> 
>   ```
>   root@-654b94754d-2vknh:/tmp# ss -tlp
>   StateRecv-Q   Send-Q Local 
> Address:Port  Peer Address:Port  Process
>   LISTEN   01024 
> 0.0.0.0:46561  0.0.0.0:*
>   LISTEN   03
> 0.0.0.0:9249   0.0.0.0:*
>   LISTEN   01024 
> 0.0.0.0:6122   0.0.0.0:*
>   LISTEN   01024 
> 10.200.11.3:9067   0.0.0.0:*
>   LISTEN   01024 
> 10.200.11.3:6125   0.0.0.0:*
>   LISTEN   01024 
> 0.0.0.0:38607  0.0.0.0:*
>   ```

The problem is that Flink only listens 10.200.11.3:6125 for queryable
state client proxy, so we need to use correct network to connect to it.
Is there any way we can make Flink to listen to 0.0.0.0 for queryable
state client proxy?


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


UUID in part files

2021-02-07 Thread Dan Hill
Hi.

*Context*
I'm migrating my Flink SQL job to DataStream.  When switching to
StreamingFileSink, I noticed that the part files now do not have a uuid in
them.  "part-0-0" vs "part-{uuid string}-0-0".  This is easy to add with
OutputFileConfig.

*Question*
Is there a reason why the base OutputFileConfig doesn't add the uuid
automatically?  Is this just a legacy issue?  Or do most people not have
the uuid in the file outputs?


Re: question on ValueState

2021-02-07 Thread Yun Tang
Hi,

MemoryStateBackend and FsStateBackend both hold keyed state in 
HeapKeyedStateBackend [1], and the main structure to store data is StateTable 
[2] which holds POJO format objects. That is to say, the object would not be 
serialized when calling update().
On the other hand, RocksDB statebackend would store value with serialized bytes.


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java

Best
Yun Tang


From: Colletta, Edward 
Sent: Sunday, February 7, 2021 19:53
To: user@flink.apache.org 
Subject: question on ValueState


Using FsStateBackend.



I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.



However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.



Can someone clarify what really happens when value() is called?





   public void processElement(M in, Context ctx, Collector out) throws 
Exception {

MyWindow myWindow;

myWindow = windowState.value();

if (myWindow == null) {


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

myWindow = new MyWindow(0L, slide, windowSize);

windowState.update(myWindow);

myWindow.eq.add(0L);

}


myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);

}



@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

MyWindow myWindow = windowState.value();

myWindow.slide(0L);

out.collect(myWindow.globalAccum);

}






Table Cache Problem

2021-02-07 Thread Yongsong He
Hi experts,
I want to cache a temporary table for reuse it

Flink version 1.10.1

the table is consumer from kafka,  struct like:
create table a (
field1 string,
field2 string,
field3 string,
field4 string
)

the sample code looks like:

val settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv = StreamTableEnvironment.create(env, settings)

val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")

temptable.where(condition1) ... then do something
temptable.where(condition2) ... then do otherthing


I want to reuse temptable for higher performance, what operators need or it
already cached in flink sql plan ?

Any help would be appreciated :)


"upsert-kafka" connector not working with Avro confluent schema registry

2021-02-07 Thread Shamit
Hello Team,

As we have two kafka connectors "upsert-kafka" and "kafka". 

I am facing issue with "upsert-kafka" while reading avro message  serialized
using "io.confluent.kafka.serializers.KafkaAvroDeserializer".  
Please note "kafka" connector is working while reading avro message 
serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer".  

Below are the definitions with both the Kafka connector:-

*Table definition with "kafka"connector which is working fine.*

/CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
'lndcdcadsprpslproposalline',
'format'='avro-confluent','avro-confluent.schema-registry.url' = '
http://localhost:8081', 'avro-confluent.schema-registry.subject' =
'lndcdcadsprpslproposalline-value')  /

*Table definition and error with "upsert-kafka"connector which is not
working fine.*

  /  CREATE TABLE proposalLine (PROPOSAL_LINE_ID
bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='
http://localhost:8081',
'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')

ERROR:
 Caused by: java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
... 9 more  /


Please help.

Regards,
Shamit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: hybrid state backends

2021-02-07 Thread Yun Gao

Hi Marco,

Sorry that current statebackend is a global configuration and could 
not be configured differently for different operators.

One possible alternative option to this requirements might be set rocksdb 
as the default statebackend, and for those operators that want to put state
 in memory, a new operator might be implemented by extends 
AbstractStreamOperator
and rewrite the snapshotState() method, and use raw state to snapshot the 
in-memory
data. However, this option would touch some non-user-level api of flink.

Best,
 Yun--
Sender:Marco Villalobos
Date:2021/02/05 19:09:37
Recipient:user
Theme:hybrid state backends

Is it possible to use different statebackends for different operators? There 
are certain situations where I want the state to reside completely in memory, 
and other situations where I want it stored in rocksdb. 


Re: UUID in part files

2021-02-07 Thread Yun Gao
Hi Dan

The SQL add the uuid by default is for the case that users want execute
multiple bounded sql and append to the same directory (hive table), thus
a uuid is attached to avoid overriding the previous output.

The datastream could be viewed as providing the low-level api and
thus it does not add the uuid automatically. And as you have pointed out,
by using OutputFileConfig users could also implement the functionality.

Best,
 Yun


 --Original Mail --
Sender:Dan Hill 
Send Date:Mon Feb 8 07:40:36 2021
Recipients:user 
Subject:UUID in part files

Hi.

Context
I'm migrating my Flink SQL job to DataStream.  When switching to 
StreamingFileSink, I noticed that the part files now do not have a uuid in 
them.  "part-0-0" vs "part-{uuid string}-0-0".  This is easy to add with 
OutputFileConfig.

Question
Is there a reason why the base OutputFileConfig doesn't add the uuid 
automatically?  Is this just a legacy issue?  Or do most people not have the 
uuid in the file outputs?


Re: Re: flink kryo exception

2021-02-07 Thread Yun Gao
Hi yidan,

One more thing to confirm: are you create the savepoint and stop the job all 
together with 

 bin/flink cancel -s [:targetDirectory] :jobId
command ?

Best,
 Yun



 --Original Mail --
Sender:赵一旦 
Send Date:Sun Feb 7 16:13:57 2021
Recipients:Till Rohrmann 
CC:Robert Metzger , user 
Subject:Re: flink kryo exception

It also maybe have something to do with my job's first tasks. The second task 
have two input, one is the kafka source stream(A), another is self-defined 
mysql source as broadcast stream.(B) 
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an 
offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only the 
subtask-0 will do mysql query and send out records, other subtasks do nothing. 
All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation, I do 
not know whether the problem have something to do with it.
赵一旦  于2021年2月7日周日 下午4:05写道:

The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based 
implementation before the data are transformed to the second task, and this 
case savepoint works.  The only problem is that, I should stop the job and 
remember the savepoint path, then restart job with the savepoint path. And now 
it is : I stop the job, then the job failed and restart automatically with the 
generated savepoint.  So I do not need to restart the job anymore, since what 
it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure 
that I can provide an example to reproduces the problem.  
Till Rohrmann  于2021年2月6日周六 上午12:13写道:

Could you provide us with a minimal working example which reproduces the 
problem for you? This would be super helpful in figuring out the problem you 
are experiencing. Thanks a lot for your help.

Cheers,
Till
On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:

Yeah, and if it is different, why my job runs normally.  The problem only 
occurres when I stop it. 
Robert Metzger  于2021年2月5日周五 下午7:08写道:

Are you 100% sure that the jar files in the classpath (/lib folder) are exactly 
the same on all machines? (It can happen quite easily in a distributed 
standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:

Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger  于2021年2月5日周五 下午6:52写道:

Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can 
lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying 
Flink, which state backend are you using, what kind of job (I guess DataStream 
API))

Somehow the process receiving the data is unable to deserialize it, most likely 
because they are configured differently (different classpath, dependency 
versions etc.)
On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:

I do not think this is some code related problem anymore, maybe it is some bug?
赵一旦  于2021年2月5日周五 下午4:30写道:

Hi all, I find that the failure always occurred in the second task, after the 
source task. So I do something in the first chaining task, I transform the 
'Map' based class object to another normal class object, and the problem 
disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint 
(all successful).

But, I also met another problem. Also this problem occurs while I stop the job, 
and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
at org.apache.flink.types.StringValue.readString(StringValue.java:783)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:1

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-07 Thread Yun Gao
Hi Jan,

From my view, I think in Flink Window should be as a "high-level" operation for 
some kind
of aggregation operation and if it could not satisfy the requirements, we could 
at least turn to
using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for each 
key, and increment
the value on each element. Then we could also register time for each key on 
receiving the first 
element for this key,  and in the onTimer callback, we could send the current 
state value, update
the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction


 --Original Mail --
Sender:Jan Brusch 
Send Date:Sat Feb 6 23:44:00 2021
Recipients:user 
Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem
Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
Which sounds simple enough in theory:

messageStream
 .keyBy(//EmitterType + MessageType)
 .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
Time.seconds(30)))
 .map(_ => 1)
 .reduce((x,y) => x + y)
 .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
in flink.

We came up with the following solution for the time being:

messageStream
 .keyBy(//EmitterType + MessageType)
 .window(GlobalWindows.create())
 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
 .evictor(// CustomEvictor: Evict all messages older than n minutes 
BEFORE processing the window)
 .process(// CustomCounter: Count all Messages in Window State);
 .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the window and the process-function will get triggered one 
last time on the now empty window, so we can produce a count of 0.

I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom 
process functions will always keep all messages in state. And, on every 
trigger all elements will have to be touched twice: To compare the 
timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like" 
approach to this. Sorry for the long writeup, but I would love to hear 
your takes.


Best regards
Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Re: Re: flink kryo exception

2021-02-07 Thread 赵一旦
yes, but I use stop not cancel, which also stop and cancel the job together.

Yun Gao  于2021年2月8日周一 上午11:59写道:

> Hi yidan,
>
> One more thing to confirm: are you create the savepoint and stop the job
> all together with
>
>  bin/flink cancel -s [:targetDirectory] :jobId
>
> command ?
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*赵一旦 
> *Send Date:*Sun Feb 7 16:13:57 2021
> *Recipients:*Till Rohrmann 
> *CC:*Robert Metzger , user 
> *Subject:*Re: flink kryo exception
>
>> It also maybe have something to do with my job's first tasks. The second
>> task have two input, one is the kafka source stream(A), another is
>> self-defined mysql source as broadcast stream.(B)
>> In A: I have a 'WatermarkReAssigner', a self-defined operator which add
>> an offset to its input watermark and then forward to downstream.
>> In B: The parallelism is 30, but in my rich function's implementation,
>> only the subtask-0 will do mysql query and send out records, other subtasks
>> do nothing. All subtasks will send max_watermark - 86400_000 as the
>> watermark.
>> Since both the first task have some self-defined source or
>> implementation, I do not know whether the problem have something to do with
>> it.
>>
>> 赵一旦  于2021年2月7日周日 下午4:05写道:
>>
>>> The first problem is critical, since the savepoint do not work.
>>> The second problem, in which I changed the solution, removed the 'Map'
>>> based implementation before the data are transformed to the second task,
>>> and this case savepoint works.  The only problem is that, I should stop the
>>> job and remember the savepoint path, then restart job with the savepoint
>>> path. And now it is : I stop the job, then the job failed and restart
>>> automatically with the generated savepoint.  So I do not need to restart
>>> the job anymore, since what it does automatically is what I want to do.
>>>
>>> I have some idea that maybe it is also related to the data? So I am not
>>> sure that I can provide an example to reproduces the problem.
>>>
>>> Till Rohrmann  于2021年2月6日周六 上午12:13写道:
>>>
 Could you provide us with a minimal working example which reproduces
 the problem for you? This would be super helpful in figuring out the
 problem you are experiencing. Thanks a lot for your help.

 Cheers,
 Till

 On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:

> Yeah, and if it is different, why my job runs normally.  The problem
> only occurres when I stop it.
>
> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>
>> Are you 100% sure that the jar files in the classpath (/lib folder)
>> are exactly the same on all machines? (It can happen quite easily in a
>> distributed standalone setup that some files are different)
>>
>>
>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>>
>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>>
>>>
>>>
>>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>>
 Are you using unaligned checkpoints? (there's a known bug in 1.12.0
 which can lead to corrupted data when using UC)
 Can you tell us a little bit about your environment? (How are you
 deploying Flink, which state backend are you using, what kind of job (I
 guess DataStream API))

 Somehow the process receiving the data is unable to deserialize it,
 most likely because they are configured differently (different 
 classpath,
 dependency versions etc.)

 On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:

> I do not think this is some code related problem anymore, maybe it
> is some bug?
>
> 赵一旦  于2021年2月5日周五 下午4:30写道:
>
>> Hi all, I find that the failure always occurred in the second
>> task, after the source task. So I do something in the first chaining 
>> task,
>> I transform the 'Map' based class object to another normal class 
>> object,
>> and the problem disappeared.
>>
>> Based on the new solution, I also tried to stop and restore job
>> with savepoint (all successful).
>>
>> But, I also met another problem. Also this problem occurs while I
>> stop the job, and also occurs in the second task after the source 
>> task. The
>> log is below:
>> 2021-02-05 16:21:26
>> java.io.EOFException
>> at org.apache.flink.core.memory.DataInputDeserializer
>> .readUnsignedByte(DataInputDeserializer.java:321)
>> at org.apache.flink.types.StringValue.readString(StringValue
>> .java:783)
>> at org.apache.flink.api.common.typeutils.base.
>> StringSerializer.deserialize(StringSerializer.java:75)
>> at org.apache.flink.api.common.typeutils.base.
>> StringSerializer.deserialize(StringSerializer.java:33)
>> at org.apache.flink.api.java.typeuti

Jobmanager stopped because uncaught exception

2021-02-07 Thread Lei Wang
Flink standalone HA.   Flink version 1.12.1

2021-02-08 13:57:50,550 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
process...
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
~[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
~[?:1.8.0_275]
at
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
~[?:1.8.0_275]
at
org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]

Using aliyun oss as statebackend storage.
Before the ERROR, there's a lot of  info message like this:

2021-02-08 13:57:50,452 INFO
 org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
[Server]Unable to execute HT
TP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 6020D2DEA1E11430349E8323


Any insight on this?

Thanks,
Lei


Re: Jobmanager stopped because uncaught exception

2021-02-07 Thread Yang Wang
Maybe it is a known issue[1] and has already been resolved in 1.12.2(will
release soon).
BTW, I think it is unrelated with the aliyun oss info logs.

[1]. https://issues.apache.org/jira/browse/FLINK-20992


Best,
Yang

Lei Wang  于2021年2月8日周一 下午2:22写道:

> Flink standalone HA.   Flink version 1.12.1
>
> 2021-02-08 13:57:50,550 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
> Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
> process...
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
> ~[?:1.8.0_275]
> at
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_275]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>
> Using aliyun oss as statebackend storage.
> Before the ERROR, there's a lot of  info message like this:
>
> 2021-02-08 13:57:50,452 INFO
>  org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
> [Server]Unable to execute HT
> TP request: Not Found
> [ErrorCode]: NoSuchKey
> [RequestId]: 6020D2DEA1E11430349E8323
>
>
> Any insight on this?
>
> Thanks,
> Lei
>