Job hanging taking savepoint on legacy Flink

2023-03-21 Thread Le Xu
Hello!

I would like to run a legacy flink project on top of old-version Flink
(1.4.1) and I'm getting error when trying to cancel a job with savepoint.
Specifically, it reports the following error on requestBuffer:

My understanding would be that the save point operation probably requires
all outstanding messages to be processed, which somehow requires larger
buffer space (not entirely sure about this). However, it seems that my job
has no problem processing regular messages as long as I'm not cancelling it
with savepoint. And I have reduced the "web.backpressure.refresh-interval"
to 100 to force it to check back pressure frequently, but it still leads to
this error.

I am aware that I'd probably get more configuration knobs by running a
newer version of Flink but this particular version has some particular
modified functionalities I want to try. Any suggestions?


2023-03-21 23:04:59,718 WARN org.apache.flink.runtime.taskmanager.Task -
Task 'Source: bid-source -> Filter -> Flat Map -> flatmap-timestamp (10/32)'
did not react to cancelling signal, but is stuck in method:
java.lang.Object.wait(Native Method)
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(
LocalBufferPool.java:222)
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking
(LocalBufferPool.java:191)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(
RecordWriter.java:146)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
RecordWriter.java:92)
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(
StreamRecordWriter.java:84)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(
RecordWriterOutput.java:106)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:88)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:43)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement
(TimestampsAndPunctuatedWatermarksOperator.java:52)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
TimestampedCollector.java:51)
ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
.flatMap(KeyedHighestBidCount.java:245)
ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
.flatMap(KeyedHighestBidCount.java:173)
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
StreamFlatMap.java:50)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.StreamSourceContexts$
ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
org.apache.flink.streaming.api.operators.StreamSourceContexts$
WatermarkContext.collect(StreamSourceContexts.java:394)
source.NexmarkDynamicBatchSourceFunction.run(
NexmarkDynamicBatchSourceFunction.java:403)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
86)
org.apache.flink.streami

org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException

2023-03-21 Thread Ajinkya Pathrudkar
I am writing to inform you of a recent update we made to our Flink version,
upgrading from 1.14 to 1.15, along with a shift from Java 8 to Java 11.
Since the update, are seeing below exception

We would appreciate any insights you may have regarding this issue, and any
suggestions on how to proceed would be greatly appreciated

org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException

org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
Could not send message
[LocalRpcInvocation(ResourceManagerGateway.registerJobManager(JobMasterId,
ResourceID, String, JobID, Time))] from sender [unknown] to recipient
[akka.tcp://fl...@xx.xxx.7.6:46017/user/rpc/resourcemanager_2], because the
recipient is unreachable. This can either mean that the recipient has been
terminated or that the remote RpcService is currently not reachable. Could
not send message
[LocalRpcInvocation(ResourceManagerGateway.registerJobManager(JobMasterId,
ResourceID, String, JobID, Time))] from sender [unknown] to recipient
[akka.tcp://fl...@xx.xxx.7.6:46017/user/rpc/resourcemanager_2], because the
recipient is unreachable. This can either mean that the recipient has been
terminated or that the remote RpcService is currently not reachable.
Recipient [Actor[akka://flink/user/rpc/resourcemanager_2#1878283075]] had
already been terminated. Message of type
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage].


Thanks,
Ajinkya


Re: Handling JSON Serialization without Kryo

2023-03-21 Thread Ken Krugler
Hi Rion,

I’m using Gson to deserialize to a Map.

1-2 records/second sounds way too slow, unless each record is enormous.

— Ken

> On Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:
> 
> Hi Ken,
> 
> Thanks for the response. I hadn't tried exploring the use of the Record 
> class, which I'm assuming you're referring to a flink.types.Record, to read 
> the JSON into. Did you handle this via using a mapper to read the properties 
> in (e.g. Gson, Jackson) as fields or take a different approach? Additionally, 
> how has your experience been with performance? Kryo with the existing job 
> leveraging JsonObjects (via Gson) is horrific (~1-2 records/second) and can't 
> keep up with the speed of the producers, which is the impetus behind 
> reevaluating the serialization.
> 
> I'll explore this a bit more.
> 
> Thanks,
> 
> Rion
> 
> On Mon, Mar 20, 2023 at 10:28 PM Ken Krugler  > wrote:
> Hi Rion,
> 
> For my similar use case, I was able to make a simplifying assumption that my 
> top-level JSON object was a record.
> 
> I then registered a custom Kryo serde that knew how to handle the handful of 
> JsonPrimitive types for the record entries.
> 
> I recently looked at extending that to support arrays and nested records, but 
> haven’t had to do that.
> 
> — Ken
> 
> 
>> On Mar 20, 2023, at 6:56 PM, Rion Williams > > wrote:
>> 
>> Hi Shammon,
>> 
>> Unfortunately it’s a data stream job. I’ve been exploring a few options but 
>> haven’t found anything I’ve decided on yet. I’m currently looking at seeing 
>> if I can leverage some type of partial serialization to bind to the 
>> properties that I know the job will use and retain the rest as a JSON blob. 
>> I’ve also consider trying to store the fields as a large map of 
>> string-object pairs and translating thay into a string prior to writing to 
>> the sinks.
>> 
>> Still accepting any/all ideas that I come across to see if I can handle this 
>> in an efficient, reasonable way.
>> 
>> Thanks,
>> 
>> Rion
>> 
>>> On Mar 20, 2023, at 8:40 PM, Shammon FY >> > wrote:
>>> 
>>> 
>>> Hi Rion
>>> 
>>> Is your job datastream or table/sql? If it is a table/sql job, and you can 
>>> define all the fields in json you need, then you can directly use json 
>>> format [1] to parse the data. 
>>> 
>>> You can also customize udf functions to parse json data into struct data, 
>>> such as map, row and other types supported by flink
>>> 
>>> 
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
>>>  
>>> 
>>> 
>>> Best,
>>> Shammon FY
>>> 
>>> 
>>> On Sun, Mar 19, 2023 at 7:44 AM Rion Williams >> > wrote:
>>> Hi all,
>>> 
>>> I’m reaching out today for some suggestions (and hopefully a solution) for 
>>> a Flink job that I’m working on. The job itself reads JSON strings from a 
>>> Kafka topic and reads those into JSONObjects (currently via Gson), which 
>>> are then operated against, before ultimately being written out to Kafka 
>>> again.
>>> 
>>> The problem here is that the shape of the data can vary wildly and 
>>> dynamically. Some records may have properties unique to only that record, 
>>> which makes defining a POJO difficult. In addition to this, the JSONObjects 
>>> fall by to Kryo serialization which is leading to atrocious throughput.
>>> 
>>> I basically need to read in JSON strings, enrich properties on these 
>>> objects, and ultimately write them to various sinks.  Is there some type of 
>>> JSON-based class or library or an approach I could use to accomplish this 
>>> in an efficient manner? Or if possibly a way to partially write a POJO that 
>>> would allow me to interact with sections/properties of the JSON while 
>>> retaining other properties that might be dynamically present or unique to 
>>> the message?
>>> 
>>> Any advice or suggestions would be welcome! I’ll also be happy to provide 
>>> any additional context if it would help!
>>> 
>>> Thanks,
>>> 
>>> Rion
>>> 
>>> (cross-posted to users+dev for reach)
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: [External] Re: Way to add columns with defaults to the existing table and recover from the savepoint

2023-03-21 Thread Ashish Khatkar via user
Hi Shammon,

Schema evolution works with avro type state, and Flink Table API uses
RowData and has a serializer (RowDataSerializer) for it which doesn't allow
change in column structure. Regarding state processor api, we are not
creating any state in our service, we simply use Flink sql as a blackbox
and let it handle the state. We simply create sql tables out of avro schema
in StreamTableEnvironment and run queries on those sql tables by creating
StatementSet and calling the execute() on it.

I found flink doc on upgrade and evolution [1] and according to the doc it
is not possible to achieve this.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution

Cheers
Ashish

On Tue, Mar 21, 2023 at 1:51 AM Shammon FY  wrote:

> Hi Ashish
>
> State compatibility is a complex issue, and you can review the state
> evolution [1] and state processor [2] docs to see if there's a solution for
> your problem.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> Best,
> Shammon FY
>
>
> On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user <
> user@flink.apache.org> wrote:
>
>> Hi all,
>>
>> I need help in understanding if we can add columns with defaults, let's
>> say NULL to the existing table and recover the job from the savepoint.
>>
>> We are using flink-1.16.0 table API and RocksDB as backend to provide a
>> service to our users to run sql queries. The tables are created using the
>> avro schema and when the schema is changed in a compatible manner i.e
>> adding a field with default, we are unable to recover the job from the
>> savepoint. This is the error we get after the schema is upgraded.
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new state 
>> serializer 
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must 
>> not be incompatible with the old state serializer 
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).
>>
>> We tried to debug the issue and this error originates from
>>
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot
>>  -> resolveSchemaCompatibility line 343:345
>>
>> which checks the length of the type array and also the logicalType for
>> each element or you can say columns.
>>
>> Is there a way to restore and evolve a table using table-api when the
>> avro schema evolves in a compatible manner? If not, is there any plan to
>> provide upgrades and evolutions with table apis?
>>
>> Cheers,
>> Ashish Khatkar
>>
>


Kubernetes skip sidecar failure

2023-03-21 Thread Evgeniy Lyutikov
Hello everybody!
We're using Flink 1.14 and kubernetes operator 1.2.0, the pod template 
configures the use of the haproxy sidecar container for load balancing on a 
persistence checkpoint in S3 storage.
Sometimes this haproxy sidecar exits and flink completely restarts the 
taskmamager module and the running job.

2023-03-20 04:59:59,526 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker my-job-taskmanager-5-15 is terminated. Diagnostics: Pod terminated, 
container termination statuses: [haproxy(exitCode=139, reason=Error, 
message=null)]
2023-03-20 04:59:59,526 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Closing TaskExecutor connection my-job-taskmanager-5-15 because: Pod 
terminated, container termination statuses: [haproxy(exitCode=139, 
reason=Error, message=null)]
2023-03-20 04:59:59,527 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, 
taskHeapSize=5.400gb (5798205768 bytes), taskOffHeapSize=1024.000mb (1073741824 
bytes), networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=5.100gb 
(5476083384 bytes), numSlots=3}, current pending count: 1.
2023-03-20 04:59:59,527 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2023-03-20 04:59:59,529 INFO  org.apache.flink.configuration.Configuration  
   [] - Config uses fallback configuration key 
'kubernetes.service-account' instead of key 
'kubernetes.taskmanager.service-account'
2023-03-20 04:59:59,529 INFO  org.apache.flink.configuration.Configuration  
   [] - Config uses fallback configuration key 
'kubernetes.service-account' instead of key 
'kubernetes.taskmanager.service-account'
2023-03-20 04:59:59,529 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils 
   [] - The service account configured in pod template will be 
overwritten to 'flink' because of explicitly configured options.
2023-03-20 04:59:59,531 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating new 
TaskManager pod with name my-job-taskmanager-5-45 and resource <14336,10.0>.
2023-03-20 04:59:59,560 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Discard registration from TaskExecutor my-job-taskmanager-5-15 at 
(akka.tcp://flink@10.68.15.205:6122/user/rpc/taskmanager_0) because the 
framework did not recognize it
2023-03-20 04:59:59,607 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
my-job-taskmanager-5-45 is created.
2023-03-20 04:59:59,617 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: my-job-taskmanager-5-45
2023-03-20 04:59:59,617 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker my-job-taskmanager-5-45 with resource spec WorkerResourceSpec 
{cpuCores=10.0, taskHeapSize=5.400gb (5798205768 bytes), 
taskOffHeapSize=1024.000mb (1073741824 bytes), networkMemSize=1024.000mb 
(1073741824 bytes), managedMemSize=5.100gb (5476083384 bytes), numSlots=3}.

Is there some way to specify that only the flink-main-container status should 
be monitored and not react to sidecar crashes?


"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


Re: Unsubscribe

2023-03-21 Thread Hang Ruan
Hi, please send an email to user-unsubscr...@flink.apache.org to unsubscribe
.

Best,
Hang

laxmi narayan  于2023年3月21日周二 15:26写道:

> Unsubscribe --
> Hi ,
>
>
>
> Thank you.
>


Unsubscribe

2023-03-21 Thread laxmi narayan
Unsubscribe --
Hi ,



Thank you.