Re: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-08 Thread Yun Tang
Hi

一般是卡在最后一步从JM写checkpoint meta上面了,建议使用jstack等工具检查一下JM的cpu栈,看问题出在哪里。


祝好
唐云

From: Sun.Zhu <17626017...@163.com>
Sent: Tuesday, March 8, 2022 14:12
To: user-zh@flink.apache.org 
Subject: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

图挂了

https://postimg.cc/Z9XdxwSk













在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:

hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?








Re: Move savepoint to another s3 bucket

2022-03-08 Thread Dawid Wysakowicz

Hi Lukas,

I am afraid you're hitting this bug: 
https://issues.apache.org/jira/browse/FLINK-25952


Best,

Dawid

On 08/03/2022 16:37, Lukáš Drbal wrote:

Hello everyone,

I'm trying to move savepoint to another s3 account but restore always 
failed with some weird 404 error.


We are using lyft k8s operator [1] and flink 1.13.6 (in stacktrace you 
can see version 1.13.6-396a8d44-szn which is just internal build from 
flink commit b2ca390d478aa855eb0f2028d0ed965803a98af1)


What I'm trying to do:

 1. create savepoint for pipeline via ./bin/flink savepoint 
 2. copy data under path configured in state.savepoints.dir from
source s3 to new s3
 3. change all configuration and restore pipeline

Is this steps correct or I'm doing something wrong or unsupported?

All options related to s3 have valid values for new s3 account but 
restore failed with exception bellow. Error message includes original 
path (s3://flink/savepoints/activity-searched-query) which doesn't 
exists on new account so that 404 is expected. But I still don't 
understand why flink tries that path because related config options 
contains new bucket info.
    high-availability.storageDir: 
's3:///ha/pipelines-runner-activity-searched-query'


jobmanager.archive.fs.dir: 's3:///history' 


state.checkpoints.dir:
's3:///checkpoints/activity-searched-query' 


state.savepoints.dir:
's3:///savepoints/activity-searched-query' 


+ valid values for s3.access-key and s3.secret-key

I found original path in _metadata file in savepoint data but changing 
that (search) leads to some weird OOM, I hope this should not 
be needed and that values should be ignored.


state.backend is hashmap if it is important.

Restore back from source butcket works as expected.

Thanks a lot!

Regards,
L.

Stacktrace:

2022-03-08 15:39:25,838 [flink-akka.actor.default-dispatcher-4]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph -
CombineToSearchedQuery -> (LateElementsCounter,
TransformToStreamElement -> Sink: SearchedQueryKafkaSink) (1/2)
(0c0f108c393b9a5b58f861c1032671d0) switched from INITIALIZING to
FAILED on 10.67.158.155:45521-d8d19d @ 10.67.158.155 (dataPort=36341).
org.apache.flink.util.SerializedThrowable: Exception while
creating StreamOperatorStateContext.
at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at java.lang.Thread.run(Thread.java:832) ~[?:?]
Caused by: org.apache.flink.util.SerializedThrowable: Could not
restore keyed state backend for
WindowOperator_bd2a73c53230733509ca171c6476fcc5_(1/2) from any of
the 1 provided restore options.
at

org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
... 10 more
Caused by: org.apache.flink.util.SerializedThrowable: 

Problem about adding custom kryo serializer

2022-03-08 Thread guoliubin85
Hi,

 

I have an entity class built by Google Flatbuf, to raise the performance, I
have tried written a serializer class.

 

public class TransactionSerializer extends Serializer {

@Override

public void write(Kryo kryo, Output output, Transaction transaction) {

ByteBuffer byteBuffer = transaction.getByteBuffer();

byte[] generated = new byte[byteBuffer.remaining()];

byteBuffer.get(generated);

output.writeInt(generated.length, true);

output.writeBytes(generated);

}

 

@Override

public Transaction read(Kryo kryo, Input input, Class
aClass) {

int size = input.readInt(true);

byte[] barr = new byte[size];

input.readBytes(barr);

ByteBuffer buf = ByteBuffer.wrap(barr);

return Transaction.getRootAsTransaction(buf);

}

}

 

And register it to the runtime env before calling env.execute.

 

env.registerTypeWithKryoSerializer(Transaction.class,
TransactionSerializer.class);

env.getConfig().addDefaultKryoSerializer(Transaction.class,
TransactionSerializer.class);

 

 

After that, I executed my job, however, I can see the log like this.

 

2021-11-18 10:35:07,624 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
org.ff.dto.flatbuff.Transaction does not contain a setter for field bb_pos

2021-11-18 10:35:07,624 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class org.ff.dto.flatbuff.Transaction cannot be used as a POJO type because
not all fields are valid POJO fields, and must be processed as GenericType.
Please read the Flink documentation on "Data Types & Serialization" for
details of the effect on performance.

 

It looks like the serializer is not working at all. So what's the problem
about this? I register the serializer in a wrong way? Or do I need to move
the class to somewhere to make the flink classloader recognize it?

 

Thanks in advance.



?????? Flink????????????

2022-03-08 Thread hjw
streaming api ??sql api 
streaming api




----
??: 
   "user-zh"

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

hjw <1010445...@qq.com.invalid ??2022??3??9?? 01:32??

 sql??SELECT color, sum(id) FROM T GROUP BY
 
colorFlinkTgroup
 by 
key??color)??Flink???

Re: Flatmap operator in an Asynchronous call

2022-03-08 Thread Diwakar Jha
Thanks Gen, I will look into customized Source and SpiltEnumerator.

On Mon, Mar 7, 2022 at 10:20 PM Gen Luo  wrote:

> Hi Diwakar,
>
> An asynchronous flatmap function without the support of the framework can
> be problematic. You should not call collector.collect outside the main
> thread of the task, i.e. outside the flatMap method.
>
> I'd suggest using a customized Source instead to process the files, which
> uses a SplitEnumerator to discover the files and SourceReaders to read the
> files. In this way checkpoints can be triggered between two calls of
> pollNext, so you don't have to implement it asynchronously. It would be
> better if the readers read the lines and the records are enriched in a map
> function following.
>
>
>
> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha  wrote:
>
>> Hello Everyone,
>>
>> I'm running a streaming application using Flink 1.11 and EMR 6.01. My use
>> case is reading files from a s3 bucket, filter file contents ( say record)
>> and enrich each record. Filter records and output to a sink.
>> I'm reading 6k files per 15mints and the total number of records is 3
>> billion/15mints. I'm using a flat map operator to convert the file into
>> records and enrich records in a synchronous call.
>>
>> *Problem* : My application fails (Checkpoint timeout) to run if i add
>> more filter criteria(operator). I suspect the system is not able to scale
>> (CPU util as still 20%) because of the synchronous call. I want to convert
>> this flat map to an asynchronous call using AsyncFunction. I was looking
>> for something like an AsyncCollector.collect
>> 
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
>> to complement my current synchronous implementation using flatmap but it
>> seems like this is not available in Flink 1.11.
>>
>> *Question* :
>> Could someone please help me with converting this flatmap operation to an
>> asynchronous call?
>>
>> Please let me know if you have any questions.
>>
>> Best,
>>
>


Re: flink on yarn任务停止发生异常

2022-03-08 Thread Jiangang Liu
异常提示的很明确了,做savepoint的过程中有的task不在running状态,你可以看下你的作业是否发生了failover。

QiZhu Chan  于2022年3月8日周二 17:37写道:

> Hi,
>
> 各位社区大佬们,帮忙看一下如下报错是什么原因造成的?正常情况下客户端日志应该返回一个savepoint路径,但却出现如下异常日志,同时作业已被停止并且查看hdfs有发现当前job产生的savepoint文件
>
>
>
>
>


Re: Flink计算机制疑问

2022-03-08 Thread Jiangang Liu
只存计算结果,来一条数据更新一次状态并且下发出去。具体可以参考下state:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

hjw <1010445...@qq.com.invalid> 于2022年3月9日周三 01:32写道:

> 如下一段sql:SELECT color, sum(id) FROM T GROUP BY
> colorFlink在实际计算中会将T流整个存入状态里,流中来一条数据触发一次全流计算。亦或是状态只存计算结果,来了新的一条数据,在原来同group
> by key(color)结果进行加减即可。这种具体Flink的运行机制请问有文档翻阅或者有规律进行总结吗?谢谢。


Re: Left join query not clearing state after migrating from 1.9.0 to 1.14.3

2022-03-08 Thread Prakhar Mathur
Hi Roman,

Thanks for the reply, here is the screenshot of the latest failed
checkpoint.

[image: Screenshot 2022-03-09 at 11.44.46 AM.png]

I couldn't find the details for the last successful one as we only store
the last 10 checkpoints' details. Also, can you give us an idea of exactly
what details you are looking for?

For the operator, the source operators for both the input streams look
fine, the Interval join operator seems to be having the issue of not
clearing the state it is holding.

IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true,
leftLowerBound=-540, leftUpperBound=0, leftTimeIndex=8,
rightTimeIndex=5], where=[((price_calculation_id = $f92) AND (rowtime0 >=
rowtime) AND (rowtime0 <= (rowtime + 540:INTERVAL MINUTE)))],

We are currently doubting the way we are generating watermarks for the new
Kafka source, they might be creating a problem as the output of
CURRENT_WATERMARK(rowtime)
is coming as null from the SQL Query.

Thanks
Prakhar

On Tue, Mar 8, 2022 at 5:49 PM Roman Khachatryan  wrote:

> Hi Prakhar,
>
> Could you please share the statistics about the last successful and
> failed checkpoints, e.g. from the UI.
> Ideally, with detailed breakdown for the operators that seems problematic.
>
> Regards,
> Roman
>
> On Fri, Mar 4, 2022 at 8:48 AM Prakhar Mathur  wrote:
> >
> > Hi,
> >
> > Can someone kindly help and take a look at this? It's a major blocker
> for us.
> >
> > Thanks,
> > Prakhar
> >
> > On Wed, Mar 2, 2022 at 2:11 PM Prakhar Mathur 
> wrote:
> >>
> >> Hello,
> >>
> >> We recently did a migration of our Flink jobs from version 1.9.0 to
> 1.14.3. These jobs consume from Kafka and produce to respective sinks. We
> are using MemoryStateBackend for our checkpointing and GCS as our remote
> fs. After migration, we found a few jobs that had left join in the SQL
> query started failing where their checkpoint size kept increasing. We
> haven't changed the SQL Query. Following is one of the queries that have
> started failing with the issue mentioned.
> >>
> >> SELECT
> >> table1.field1,
> >> table2.field2,
> >> table2.field3,
> >> table1.rowtime as estimate_timestamp,
> >> table2.creation_time as created_timestamp,
> >> CAST(table2.rowtime AS TIMESTAMP)
> >> FROM
> >> table1
> >> LEFT JOIN table2 ON table1.field1 = coalesce(
> >> nullif(table2.field4, ''),
> >> table2.field5
> >> )
> >> AND table2.rowtime BETWEEN table1.rowtime
> >> AND table1.rowtime + INTERVAL '90' MINUTE
> >> WHERE
> >> table2.field6 IS NOT TRUE
> >>
> >> Few data points:
> >>
> >> On version 1.9.0 it was running on parallelism of 20, now it is not
> even able to run on 40.
> >> On version 1.9.0 the max checkpoint size was going up to 3.5 GB during
> peak hours. Now on 1.14.3, it just keeps on increasing and goes up to 30 GB
> and eventually fails due to lack of resources.
> >> Earlier in version 1.9.0, we were using
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer and now in
> 1.14.3 we have moved to the new Kafka Source.
> >>
> >> Any help will be highly appreciated as these are production jobs.
> >>
> >> Thanks
> >> Prakhar Mathur
>


Re: Controlling group partitioning with DataStream

2022-03-08 Thread Guowei Ma
Hi, Ken

If you are talking about the Batch scene, there may be another idea that
the engine automatically and evenly distributes the amount of data to be
processed by each Stage to each worker node. This also means that, in some
cases, the user does not need to manually define a Partitioner.

At present, Flink has a FLIP-187 [1], which is working in this direction,
but to achieve the above goals may also require the follow up work of
FLIP-186 [2]. After the release of 1.15, we will carry out the
"Auto-rebalancing of workloads" related work as soon as possible, you can
pay attention to the progress of this FLIP.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements

Best,
Guowei


On Wed, Mar 9, 2022 at 8:44 AM Ken Krugler 
wrote:

> Hi Dario,
>
> Just to close the loop on this, I answered my own question on SO.
>
> Unfortunately it seems like the recommended solution is to do the same
> hack I did a while ago, which is to generate (via trial-and-error) a key
> that gets assigned to the target slot.
>
> I was hoping for something a bit more elegant :)
>
> I think it’s likely I could make it work by implementing my own version
> of KeyGroupStreamPartitioner, but as I’d noted in my SO question, that
> would involve use of some internal-only classes, so maybe not a win.
>
> — Ken
>
>
> On Mar 4, 2022, at 3:14 PM, Dario Heinisch 
> wrote:
>
> Hi,
>
> I think you are looking for this answer from David:
> https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc
>
> I think then you could technically create your partitioner - though little
> bit cubersome - by mapping your existing keys to new keys who will have
> then an output to the desired
> group & slot.
>
> Hope this may help,
>
> Dario
> On 04.03.22 23:54, Ken Krugler wrote:
>
> Hi all,
>
> I need to be able to control which slot a keyBy group goes to, in order to
> compensate for a badly skewed dataset.
>
> Any recommended approach to use here?
>
> Previously (with a DataSet) I used groupBy followed by a withPartitioner,
> and provided my own custom partitioner.
>
> I posted this same question to
> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>
> Thanks,
>
> — Ken
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: Re: k8s native session 问题咨询

2022-03-08 Thread Yang Wang
你用新版本试一下,看着是已经修复了

https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

崔深圳  于2022年3月9日周三 10:31写道:

>
>
>
> web ui报错:请求这个接口: /jobs/overview,时而报错, Exception on server
> side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
> Failed to serialize the result for RPC call :
> requestMultipleJobDetails.\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)\n\tat
> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)\n\tat
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)\n\tat
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)\n\tat
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)\nCaused
> by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)\n\tat
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)\n\tat
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)\n\tat
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)\n\tat
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)\n\tat
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)\n\tat
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
> 29 more\n\nEnd of exception on server side"
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-03-09 09:56:21,"yu'an huang"  写道:
> >你好,路由到非master节点会有什么问题吗,非master节点在处理请求时应该会通过HA服务找到Active的Job manager,
> 然后向Active Job manager拿到结果再返回给client.
> >
> >> On 7 Mar 2022, at 7:46 PM, 崔深圳  wrote:
> >>
> >> k8s native session 模式下,配置了ha,job_manager 的数量为3,然后web ui,通过rest
> service访问,总是路由到非master节点,有什么办法使其稳定吗?
> >
>


Re:Re: k8s native session 问题咨询

2022-03-08 Thread 崔深圳



web ui报错:请求这个接口: /jobs/overview,时而报错, Exception on server 
side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
serialize the result for RPC call : requestMultipleJobDetails.\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)\n\tat
 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat 
akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat 
akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat 
akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat 
akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)\n\tat 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)\n\tat
 java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)\n\tat 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)\nCaused
 by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)\n\tat 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)\n\tat
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)\n\tat 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)\n\tat
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)\n\tat 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)\n\tat 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
 29 more\n\nEnd of exception on server side"














在 2022-03-09 09:56:21,"yu'an huang"  写道:
>你好,路由到非master节点会有什么问题吗,非master节点在处理请求时应该会通过HA服务找到Active的Job manager, 然后向Active 
>Job manager拿到结果再返回给client.
>
>> On 7 Mar 2022, at 7:46 PM, 崔深圳  wrote:
>> 
>> k8s native session 模式下,配置了ha,job_manager 的数量为3,然后web ui,通过rest 
>> service访问,总是路由到非master节点,有什么办法使其稳定吗?
>


Re:Re: k8s native session 问题咨询

2022-03-08 Thread 崔深圳
web ui报错:请求这个接口: /jobs/overview,时而报错, Exception on server 
side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
serialize the result for RPC call : requestMultipleJobDetails.\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)\n\tat
 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat 
akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat 
akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat 
akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat 
akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)\n\tat 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)\n\tat
 java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)\n\tat 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)\nCaused
 by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)\n\tat 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)\n\tat
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)\n\tat 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)\n\tat
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)\n\tat 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)\n\tat 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
 29 more\n\nEnd of exception on server side"
在 2022-03-09 09:56:21,"yu'an huang"  写道:
>你好,路由到非master节点会有什么问题吗,非master节点在处理请求时应该会通过HA服务找到Active的Job manager, 然后向Active 
>Job manager拿到结果再返回给client.
>
>> On 7 Mar 2022, at 7:46 PM, 崔深圳  wrote:
>> 
>> k8s native session 模式下,配置了ha,job_manager 的数量为3,然后web ui,通过rest 
>> service访问,总是路由到非master节点,有什么办法使其稳定吗?
>


Re: k8s native session 问题咨询

2022-03-08 Thread yu'an huang
你好,路由到非master节点会有什么问题吗,非master节点在处理请求时应该会通过HA服务找到Active的Job manager, 然后向Active 
Job manager拿到结果再返回给client.

> On 7 Mar 2022, at 7:46 PM, 崔深圳  wrote:
> 
> k8s native session 模式下,配置了ha,job_manager 的数量为3,然后web ui,通过rest 
> service访问,总是路由到非master节点,有什么办法使其稳定吗?



Re: Controlling group partitioning with DataStream

2022-03-08 Thread Ken Krugler
Hi Dario,

Just to close the loop on this, I answered my own question on SO.

Unfortunately it seems like the recommended solution is to do the same hack I 
did a while ago, which is to generate (via trial-and-error) a key that gets 
assigned to the target slot.

I was hoping for something a bit more elegant :)

I think it’s likely I could make it work by implementing my own version of 
KeyGroupStreamPartitioner, but as I’d noted in my SO question, that would 
involve use of some internal-only classes, so maybe not a win.

— Ken


> On Mar 4, 2022, at 3:14 PM, Dario Heinisch  wrote:
> 
> Hi, 
> 
> I think you are looking for this answer from David: 
> https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc
>  
> 
> I think then you could technically create your partitioner - though little 
> bit cubersome - by mapping your existing keys to new keys who will have then 
> an output to the desired
> group & slot. 
> 
> Hope this may help, 
> 
> Dario
> 
> On 04.03.22 23:54, Ken Krugler wrote:
>> Hi all,
>> 
>> I need to be able to control which slot a keyBy group goes to, in order to 
>> compensate for a badly skewed dataset.
>> 
>> Any recommended approach to use here?
>> 
>> Previously (with a DataSet) I used groupBy followed by a withPartitioner, 
>> and provided my own custom partitioner.
>> 
>> I posted this same question to 
>> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>>  
>> 
>> 
>> Thanks,
>> 
>> — Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





replay kinesis events

2022-03-08 Thread Guoqin Zheng
Hi Flink experts,

Wondering if there is a built-in way to replay already-processed events in
an event queue. For example, if I have a flink app processing event stream
from Kinesis. Now if I find a bug in the flink app and make a fix. And I
would like to re-process events that are already processed in the kinesis
stream. Is there a simple mechanism to allow me to do this?

Thanks,
-Guoqin


Re: Question about Flink counters

2022-03-08 Thread Shane Bishop
Hi,

My issue has been resolved through discussion with AWS support.

It turns out that Kinesis Data Analytics reports to CloudWatch in a way I did 
not expect. The way to view the accurate values for Flink counters is with 
Average in CloudWatch metrics.

Below is the response from AWS support, for anyone who might benefit from this 
in the future:

After discussing this issue with our Kinesis Data Analytics team, I was 
informed that currently metrics are reported every 15 seconds, hence every 
minute there would be 4 records, which might be the explanation to why you are 
seeing this number of records. However, please understand that this frequency 
may change in the future so please do not rely on this frequency as the source 
of truth. We would recommend to use base alarms on the AVG, P99 or other 
similar metric statistics rather than looking and examining individual values 
or verifying the numbers of values are as expected.

In addition, depending on how these metrics are calculated, displaying them 
with different statistic would show different graphs and scenarios. Please 
kindly refer to the "CloudWatch statistics definitions" [1] documentation for 
more details on how metric data are aggregated over specified periods of time 
with different statistic choices for metrics. Below are some examples of the 
most commonly used statistic options:
-"SampleCount" is the number of data points during the period.
-"Sum" is the sum of the values of the all data points collected during the 
period.
-"Average" is the value of Sum/SampleCount during the specified period.
-"Percentile (p)" indicates the relative standing of a value in a dataset. For 
example, p99 is the 99th percentile and means that 99 percent of the data 
within the period is lower than this value and 1 percent of the data is higher 
than this value.
-”Minimum“ is the lowest value observed during the specified period.
-“Maximum" is the highest value observed during the specified period.

In answer to your question regarding how custom metrics are pushed, in the 
documentation you have provided [2], it says that "Custom metrics in Kinesis 
Data Analytics use the Apache Flink metric system", Kinesis Data Analytics only 
report these metrics to CloudWatch as you define these custom metrics.

[1] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Statistics-definitions.html
[2] 
https://docs.aws.amazon.com/kinesisanalytics/latest/java/monitoring-metrics-custom.html

Regards,
Shane


Re: PyFlink : submission via rest

2022-03-08 Thread aryan m
Thanks Dian! That worked !

On Sun, Mar 6, 2022 at 10:47 PM Dian Fu  wrote:

> The dedicated REST API is still not supported. However, you could try to
> use PythonDriver just like you said and just submit it like a Java Flink
> job.
>
> Regards,
> Dian
>
> On Sun, Mar 6, 2022 at 3:38 AM aryan m  wrote:
>
>> Thanks Zhilong for taking a look!
>>
>> Primarily I am looking for ways to start it through a REST api [1]   .
>> For Java, I pass along entry-class pointing to a main class in the jar
>> which constructs the job graph and triggers the execute(). How do we
>> accomplish this for pyflink jobs?  The closest I encountered is
>> PythonDriver [2]
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
>> [2]
>> https://github.com/apache/flink/blob/release-1.13/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
>>
>>
>> On Sat, Mar 5, 2022 at 10:37 AM Zhilong Hong 
>> wrote:
>>
>>> Hi, Aryan:
>>>
>>> You could refer to the official docs [1] for how to submit PyFlink jobs.
>>>
>>> $ ./bin/flink run \
>>>   --target yarn-per-job
>>>   --python examples/python/table/word_count.py
>>>
>>> With this command you can submit a per-job application to YARN. The docs
>>> [2] and [3] describe how to submit jobs to the YARN session and the
>>> Kubernetes session.
>>>
>>> $ ./bin/flink run -t yarn-session \
>>>   -Dyarn.application.id=application__YY \
>>>   --python examples/python/table/word_count.py
>>>
>>> $ ./bin/flink run -t kubernetes-session \
>>>   -Dkubernetes.cluster-id=my-first-flink-cluster \
>>>   --python examples/python/table/word_count.py
>>>
>>> Best,
>>> Zhilong
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#submitting-pyflink-jobs
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/yarn/#session-mode
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#session-mode
>>>
>>> On Sun, Mar 6, 2022 at 2:08 AM aryan m  wrote:
>>>
 Hi !
In a session cluster, what is the recommended way to submit a
 pyFlink job via REST ? I am on Flink 1.13 and my job code is available at
 web.upload.dir
 
  .

   Appreciate the help!





Flink????????????

2022-03-08 Thread hjw
sql??SELECT color, sum(id) FROM T GROUP BY 
colorFlinkTgroup
 by 
key??color)??Flink???

Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-08 Thread Filip Karnicki
Hi Roman, Igal (@ below)

Thank you for your answer. I don't think I'll have access to flink's lib
folder given it's a shared Cloudera cluster. The only thing I could think
of is to not include com.google.protobuf in the
classloader.parent-first-patterns.additional setting, and
including protobuf-java 3.7.1 in the uber jar.

I created a jira for this just now + a discuss thread on the dev group
https://issues.apache.org/jira/browse/FLINK-26537

Hi @Igal Shilman  , is the plugin solution outlined
by Roman something that fits in better with Statefun than having the
creators of uber .jars be responsible for using a statefun-compatible
protobuf-java?

Kind regards
Fil

On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan  wrote:

> Hi Filip,
>
> Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
> Or maybe re-writing the dependencies you mentioned to be loaded as
> plugins? [1]
>
> I don't see any other ways to solve this problem.
> Probably Chesnay or Seth will suggest a better solution.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>
>
> Regards,
> Roman
>
> On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki 
> wrote:
> >
> > Hi All!
> >
> > We're running a statefun uber jar on a shared cloudera flink cluster,
> the latter of which launches with some ancient protobuf dependencies
> because of reasons[1].
> >
> > Setting the following flink-config settings on the entire cluster
> >
> > classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> >
> > causes these old protobuf dependencies to get loaded over statefun's
> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
> >
> > We hacked together a version of statefun that doesn't perform the check
> whether the classloader settings contain the three patterns from above, and
> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf
> pattern is not present in the classloader.parent-first-patterns.additional
> setting, then all is well.
> >
> > Aside from removing old hadoop from the classpath, which may not be
> possible given that it's a shared cluster, is there anything we can do
> other than adding a configurable override not to perform the config check
> in StatefulFunctionsConfigValidator to an upcoming statefun core release?
> >
> > Many thanks
> > Fil
> >
> >
> > [1] We're still trying to find out if it's absolutely necessary to have
> these on the classpath.
>


Move savepoint to another s3 bucket

2022-03-08 Thread Lukáš Drbal
Hello everyone,

I'm trying to move savepoint to another s3 account but restore always
failed with some weird 404 error.

We are using lyft k8s operator [1] and flink 1.13.6 (in stacktrace you can
see version 1.13.6-396a8d44-szn which is just internal build from flink
commit b2ca390d478aa855eb0f2028d0ed965803a98af1)

What I'm trying to do:

   1. create savepoint for pipeline via ./bin/flink savepoint 
   2. copy data under path configured in state.savepoints.dir from source
   s3 to new s3
   3. change all configuration and restore pipeline

Is this steps correct or I'm doing something wrong or unsupported?

All options related to s3 have valid values for new s3 account but restore
failed with exception bellow. Error message includes original path
(s3://flink/savepoints/activity-searched-query) which doesn't exists on new
account so that 404 is expected. But I still don't understand why flink
tries that path because related config options contains new bucket info.
high-availability.storageDir:
's3:///ha/pipelines-runner-activity-searched-query'

jobmanager.archive.fs.dir: 's3:///history'

state.checkpoints.dir:
> 's3:///checkpoints/activity-searched-query'

state.savepoints.dir:
> 's3:///savepoints/activity-searched-query'


+ valid values for s3.access-key and s3.secret-key

I found original path in _metadata file in savepoint data but changing that
(search) leads to some weird OOM, I hope this should not be needed
and that values should be ignored.

state.backend is hashmap if it is important.

Restore back from source butcket works as expected.

Thanks a lot!

Regards,
L.

Stacktrace:

2022-03-08 15:39:25,838 [flink-akka.actor.default-dispatcher-4] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph -
> CombineToSearchedQuery -> (LateElementsCounter, TransformToStreamElement ->
> Sink: SearchedQueryKafkaSink) (1/2) (0c0f108c393b9a5b58f861c1032671d0)
> switched from INITIALIZING to FAILED on 10.67.158.155:45521-d8d19d @
> 10.67.158.155 (dataPort=36341).
> org.apache.flink.util.SerializedThrowable: Exception while creating
> StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at java.lang.Thread.run(Thread.java:832) ~[?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: Could not restore
> keyed state backend for
> WindowOperator_bd2a73c53230733509ca171c6476fcc5_(1/2) from any of the 1
> provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> ... 10 more
> Caused by: org.apache.flink.util.SerializedThrowable: Failed when trying
> to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> 

Re: MapState.entries()

2022-03-08 Thread Alexey Trenikhun
Thank you !

From: Schwalbe Matthias 
Sent: Monday, March 7, 2022 11:36:22 PM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: RE: MapState.entries()


Hi Alexey,



To my best knowledge it’s lazy with RocksDBStateBackend, using the Java 
iterator you could even modify the map (e.g. remove()).



Cheers



Thias





From: Alexey Trenikhun 
Sent: Dienstag, 8. März 2022 06:11
To: Flink User Mail List 
Subject: MapState.entries()



Hello,

We are using RocksDBStateBackend, is MapState.entries() call in this case 
"lazy" - deserializes single entry while next(), or MapState.entries() returns 
collection, which is fully loaded into memory?



Thanks,

Alexey

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


RE: Incremental checkpointing & RocksDB Serialization

2022-03-08 Thread Schwalbe Matthias
Hi Vidya,

As to the choice of serializer:

  *   Flink provides two implementations that support state migration, AVRO 
serializer, and Pojo serializer
  *   Pojo serializer happens to be one of the fastest available serializers 
(faster than AVRO)
  *   If your record sticks to Pojo coding rules it is probably a good choice, 
no extra serializer coding needed
  *   See here [1]

As to the extra big incremental checkpoints at the end of a time window:

  *   This is quite plausible,
  *   windowing uses the ‘namespace’ subkey of keyed state
  *   ideally incremental checkpoints only store changes made since the last 
checkpoint, and
  *   on a window change many window instances (i.e. one per key and time 
interval) disappear and are eventually recreated for the next time interval, 
hence the bigger checkpoint
  *   serialization efforts depend on the choice of state backend:
 *   RocksDBStateBackend dominantly uses serializers when reading and 
writing state but to a lesser extend for checkpoints
 *   FsStateBackend does not use serializers when reading and writing state 
but dominantly during checkpoints


In order to improve your situation you need to take a closer look into

  *   The numbers (how many keys, how many active window instances 
(globally/per key), how many events are collected per window instance)
  *   The specific implementation of the rollup/aggregation function
 *   There are setups that store all events and iterate whenever a window 
result is needed (triggered)
 *   Other setups pre-aggregate incoming events and summarize only when a 
window result is needed (triggered)
 *   This choice makes a big difference when it comes to state size

Hope this helps … feel free to get back with further questions 


Thias



[1] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#pojoserializer

From: Vidya Sagar Mula 
Sent: Dienstag, 8. März 2022 02:44
To: Yun Tang 
Cc: user 
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun,

Thank you for the response.


1.  You could tune your job to avoid backpressure. Maybe you can upgrade 
your flink engine to at least flink-1.13 to know how to monitor the back 
pressure status [1].
[VIDYA] - In the view of my organization, it's a very big activity to upgrade 
to Flink version from our current one(1.11). I need to continue for my dev 
activity with 1.11 only.
1.  You can refer to [2] to know how to custom your serializer.
[VIDYA] - Thanks for providing me with the link references for custom 
serializer. I am wondering, how is the serialization part in the incremental 
checkpointing is different from Full checkpointing. My pipeline logic is same 
for both Full checkpoint and Incremental checkpoint, except the checkpoint.type 
variable change and some other env variables. But, the code pipeline logic 
should be same for both types of checkpoints.

- Full checkpoint of pipeline is not taking considerably long time when 
compared to incremental checkpointing at the end of the window. I see the 
backpressure is High and CPU utilization is high with incremental 
checkpointing. Thread dump shows the stack related to serialization. How is the 
serialization part different between full checkpointing vs Incremental 
checkpointing? I know, RocksDB library has some serializers for Incremental.

- While I am not writing custom serializer for my pipeline in case of Full 
checkpointing, is it the general pattern to implement custom serializer in case 
of Incremental?

- With respect with serializers for Full vs Incremental checkpointing, What's 
the general usage pattern across the Flink community? If I write custom 
serializer for Incremental, how does it go with Full checkpointing.

Please clarify.

Thanks,
Vidya.




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

On Sun, Mar 6, 2022 at 12:11 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vidya,


  1.  You could tune your job to avoid backpressure. Maybe you can upgrade your 
flink engine to at least flink-1.13 to know how to monitor the back pressure 
status [1]
  2.  You can refer to [2] to know how to custom your serializer.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

Best,
Yun Tang

From: Vidya Sagar Mula mailto:mulasa...@gmail.com>>
Sent: Sunday, March 6, 2022 4:16
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more details. 
Can you please clarify my inline questions?

> Why is 

Re: Flink Checkpoint Timeout

2022-03-08 Thread Mahantesh Patil
I see for every consequential checkpoint timeout fail , number of tasks
which completed checkpointing keeps decreasing, why would that happen? Does
flink try to process data beyond old checkpoint barrier which failed to
complete due to timeout?


On Tue, Mar 8, 2022 at 12:48 AM yidan zhao  wrote:

> If the checkpoint timeout leads to the job's fail, then the job will be
> recovered and data will be reprocessed from the last completed checkpoint.
> If the job doesn't fail, then not.
>
> Mahantesh Patil  于2022年3月8日周二 14:47写道:
>
>> Hello Team,
>>
>> What happens after checkpoint timeout?
>>
>> Does Flink reprocess data from the previous checkpoint for all tasks?
>>
>> I have one compute intensive operator with parallelism of 20 and only one
>> of the parallel tasks seems to get stuck because of data skew. On
>> checkpoint timeout , will data be reprocessed or continue processing new
>> data? If not, will increasing checkpoint timeout help.
>>
>> Checkpoint Configuration:
>>
>> CheckpointingMode.EXACTLY_ONCE;
>>
>> CheckPointTimeOut 10 min;
>>
>> MinPauseBetweenCheckpoints 30 sec;
>>
>> CheckPointingInterval 30 sec;
>>
>>
>> Thanks,
>> Mahantesh
>>
>>


Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-08 Thread Roman Khachatryan
Hi Filip,

Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
Or maybe re-writing the dependencies you mentioned to be loaded as plugins? [1]

I don't see any other ways to solve this problem.
Probably Chesnay or Seth will suggest a better solution.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/


Regards,
Roman

On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki  wrote:
>
> Hi All!
>
> We're running a statefun uber jar on a shared cloudera flink cluster, the 
> latter of which launches with some ancient protobuf dependencies because of 
> reasons[1].
>
> Setting the following flink-config settings on the entire cluster
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
> causes these old protobuf dependencies to get loaded over statefun's 
> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
>
> We hacked together a version of statefun that doesn't perform the check 
> whether the classloader settings contain the three patterns from above, and 
> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf 
> pattern is not present in the classloader.parent-first-patterns.additional 
> setting, then all is well.
>
> Aside from removing old hadoop from the classpath, which may not be possible 
> given that it's a shared cluster, is there anything we can do other than 
> adding a configurable override not to perform the config check in 
> StatefulFunctionsConfigValidator to an upcoming statefun core release?
>
> Many thanks
> Fil
>
>
> [1] We're still trying to find out if it's absolutely necessary to have these 
> on the classpath.


Savepoint API challenged with large savepoints

2022-03-08 Thread Schwalbe Matthias
Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) that 
contained lots of obsolete data points and overwhelmed our infrastructure (i.e. 
failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the 
savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API 
uneasy with larger savepoints, found simple solutions ...

I would like to contribute my findings and 'fixes', however on my corporate 
infrastructure I cannot fork/build Flink locally nor PR the changes later on.

Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:


  *   (We are currently on Flink 1.13 (RocksDB state backend) but all findings 
apply as well to the latest version)
  *   WritableSavepoint.write(...) falls back to JobManagerCheckpointStorage 
which restricts savepoint size to 5MiB
 *   See relevant exception stack here [1]
 *   This is because SavepointTaskManagerRuntimeInfo.getConfiguration() 
always returns empty Configuration, hence
 *   Neither "state.checkpoint-storage" nor "state.checkpoints.dir" are/can 
be configured
 *   'fix': provide SavepointTaskManagerRuntimeInfo.getConfiguration() with 
a meaningful implementation and set configuration in 
SavepointEnvironment.getTaskManagerInfo()
  *   When loading a state, MultiStateKeyIterator load and bufferes the whole 
state in memory before it event processes a single data point
 *   This is absolutely no problem for small state (hence the unit tests 
work fine)
 *   MultiStateKeyIterator ctor sets up a java Stream that iterates all 
state descriptors and flattens all datapoints contained within
 *   The java.util.stream.Stream#flatMap function causes the buffering of 
the whole data set when enumerated later on
 *   See call stack [2]
*   I our case this is 150e6 data points (> 1GiB just for the pointers 
to the data, let alone the data itself ~30GiB)
 *   I'm not aware of some instrumentation if Stream in order to avoid the 
problem, hence
 *   I coded an alternative implementation of MultiStateKeyIterator that 
avoids using java Stream,
 *   I can contribute our implementation (MultiStateKeyIteratorNoStreams)
  *   I found out that, at least when using LocalFileSystem on a windows 
system, read I/O to load a savepoint is unbuffered,
 *   See example stack [3]
 *   i.e. in order to load only a long in a serializer, it needs to go into 
kernel mode 8 times and load the 8 bytes one by one
 *   I coded a BufferedFSDataInputStreamWrapper that allows to opt-in 
buffered reads on any FileSystem implementation
 *   In our setting savepoint load is now 30 times faster
 *   I've once seen a Jira ticket as to improve savepoint load time in 
general (lost the link unfortunately), maybe this approach can help with it
 *   not sure if HDFS has got the same problem
 *   I can contribute my implementation

Looking forward to your comments


Matthias (Thias) Schwalbe


[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR 
BatchTask - Error in task code:  MapPartition 
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state 
is larger than the maximum permitted memory-backed state. Size=180075318 , 
maxSize=5242880 . Consider using a different state backend, like the File 
System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
at 
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
at 
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
at 
org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at 
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
at 

Re: Left join query not clearing state after migrating from 1.9.0 to 1.14.3

2022-03-08 Thread Roman Khachatryan
Hi Prakhar,

Could you please share the statistics about the last successful and
failed checkpoints, e.g. from the UI.
Ideally, with detailed breakdown for the operators that seems problematic.

Regards,
Roman

On Fri, Mar 4, 2022 at 8:48 AM Prakhar Mathur  wrote:
>
> Hi,
>
> Can someone kindly help and take a look at this? It's a major blocker for us.
>
> Thanks,
> Prakhar
>
> On Wed, Mar 2, 2022 at 2:11 PM Prakhar Mathur  wrote:
>>
>> Hello,
>>
>> We recently did a migration of our Flink jobs from version 1.9.0 to 1.14.3. 
>> These jobs consume from Kafka and produce to respective sinks. We are using 
>> MemoryStateBackend for our checkpointing and GCS as our remote fs. After 
>> migration, we found a few jobs that had left join in the SQL query started 
>> failing where their checkpoint size kept increasing. We haven't changed the 
>> SQL Query. Following is one of the queries that have started failing with 
>> the issue mentioned.
>>
>> SELECT
>> table1.field1,
>> table2.field2,
>> table2.field3,
>> table1.rowtime as estimate_timestamp,
>> table2.creation_time as created_timestamp,
>> CAST(table2.rowtime AS TIMESTAMP)
>> FROM
>> table1
>> LEFT JOIN table2 ON table1.field1 = coalesce(
>> nullif(table2.field4, ''),
>> table2.field5
>> )
>> AND table2.rowtime BETWEEN table1.rowtime
>> AND table1.rowtime + INTERVAL '90' MINUTE
>> WHERE
>> table2.field6 IS NOT TRUE
>>
>> Few data points:
>>
>> On version 1.9.0 it was running on parallelism of 20, now it is not even 
>> able to run on 40.
>> On version 1.9.0 the max checkpoint size was going up to 3.5 GB during peak 
>> hours. Now on 1.14.3, it just keeps on increasing and goes up to 30 GB and 
>> eventually fails due to lack of resources.
>> Earlier in version 1.9.0, we were using 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer and now in 
>> 1.14.3 we have moved to the new Kafka Source.
>>
>> Any help will be highly appreciated as these are production jobs.
>>
>> Thanks
>> Prakhar Mathur


Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-08 Thread Jingsong Li
Thanks all for your discussions.

I'll share my opinion here:

1. Hive SQL and Hive-like SQL are the absolute mainstay of current
Batch ETL in China. Hive+Spark (HiveSQL-like)+Databricks also occupies
a large market worldwide.

- Unlike OLAP SQL (such as presto, which is ansi-sql rather than hive
sql), Batch ETL is run periodically, which means that a large number
of Batch Pipelines have already been built, and if they need to be
migrated to a new system, it will be extremely costly to migrate the
SQLs.

2. Our current Hive dialect is immature and we need to put more effort
to decouple it from the flink planner.

Best,
Jingsong

On Tue, Mar 8, 2022 at 4:27 PM Zou Dan  wrote:
>
> Hi Martijn,
> Thanks for bringing this up.
> Hive SQL (using in Hive & Spark) plays an important role in batch processing, 
> it has almost become de facto standard in batch processing. In our company, 
> there are hundreds of thousands of spark jobs each day.
> IMO, if we want to promote Flink batch, Hive syntax compatibility is a 
> crucial point of it.
> Thanks to this feature, we have migrated 800+ Spark jobs to Flink smoothly.
>
> So, I quite agree with putting more effort into Hive syntax compatibility.
>
> Best,
> Dan Zou
>
> 2022年3月7日 19:23,Martijn Visser  写道:
>
> query
>
>


Re: Flink Checkpoint Timeout

2022-03-08 Thread yidan zhao
If the checkpoint timeout leads to the job's fail, then the job will be
recovered and data will be reprocessed from the last completed checkpoint.
If the job doesn't fail, then not.

Mahantesh Patil  于2022年3月8日周二 14:47写道:

> Hello Team,
>
> What happens after checkpoint timeout?
>
> Does Flink reprocess data from the previous checkpoint for all tasks?
>
> I have one compute intensive operator with parallelism of 20 and only one
> of the parallel tasks seems to get stuck because of data skew. On
> checkpoint timeout , will data be reprocessed or continue processing new
> data? If not, will increasing checkpoint timeout help.
>
> Checkpoint Configuration:
>
> CheckpointingMode.EXACTLY_ONCE;
>
> CheckPointTimeOut 10 min;
>
> MinPauseBetweenCheckpoints 30 sec;
>
> CheckPointingInterval 30 sec;
>
>
> Thanks,
> Mahantesh
>
>


Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-08 Thread Zou Dan
Hi Martijn,
Thanks for bringing this up.
Hive SQL (using in Hive & Spark) plays an important role in batch processing, 
it has almost become de facto standard in batch processing. In our company, 
there are hundreds of thousands of spark jobs each day.
IMO, if we want to promote Flink batch, Hive syntax compatibility is a crucial 
point of it.
Thanks to this feature, we have migrated 800+ Spark jobs to Flink smoothly.

So, I quite agree with putting more effort into Hive syntax compatibility.

Best,
Dan Zou

> 2022年3月7日 19:23,Martijn Visser  写道:
> 
> query



Re: Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-08 Thread Jark Wu
Hi Martijn,

Thanks for starting this discussion. I think it's great
for the community to to reach a consensus on the roadmap
of Hive query syntax.

I agree that the Hive project is not actively developed nowadays.
However, Hive still occupies the majority of the batch market
and the Hive ecosystem is even more active now. For example,
the Apache Kyuubi[1] is a new project that is a JDBC server
which is compatible with HiveServer2. And the Apache Iceberg
and Apache Hudi are mainly using Hive Metastore as the table catalog.
The Spark SQL is 99% compatible with Hive SQL. We have to admit
that Hive is the open-source de facto standard for batch processing.

As far as I can see, almost all the companies (including ByteDance,
Kuaishou, NetEase, etc..) in China are using Hive SQL for batch
processing, even the underlying is using Spark as the engine.
I don't know how the batch users can migrate to Flink if Flink
doesn't provide the Hive compatibility. IMO, in the short term,
Hive syntax compatibility is the ticket for us to have a seat
in the batch processing. In the long term, we can drop it and
focus on Flink SQL itself both for batch and stream processing.

Regarding the maintenance concern you raised, I think that's a good
point and they are in the plan. The Hive dialect has already been
a plugin and option now, and the implementation is located in
hive-connector module. We still need some work to make the Hive
dialect purely rely on public APIs, and the Hive connector should be
decopule with table planner. At that time, we can move the whole Hive
connector into a separate repository (I guess this is also in the
externalize connectors plan).

What do you think?

Best,
Jark

[1]:
https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_thriftserver.html
[2]: https://iceberg.apache.org/docs/latest/spark-configuration/
[3]: https://hudi.apache.org/docs/next/syncing_metastore/

On Tue, 8 Mar 2022 at 11:46, Mang Zhang  wrote:

> Hi Martijn,
>
> Thanks for driving this discussion.
>
> +1 on efforts on more hive/spark syntax compatibility.The hive/spark
> syntax is the most popular in batch computing.Within our company, many
> users have the desire to use Flink to realize the integration of streaming
> and batching,and some users have been running in production for months.And
> we have integrated Flink with our internal remote shuffle service, flink
> save user a lot of development and maintenance costs,user feedback is very
> good.Enrich flink's ecology and provide users with more choices, so I think
> pluggable support for hive/spark dialects is very necessary.We need better
> designs for future multi-source fusion.
>
>
>
>
>
>
>
> Best regards,
>
> Mang Zhang
>
>
>
>
>
> At 2022-03-07 20:52:42, "Jing Zhang"  wrote:
> >Hi Martijn,
> >
> >Thanks for driving this discussion.
> >
> >+1 on efforts on more hive syntax compatibility.
> >
> >With the efforts on batch processing in recent versions(1.10~1.15), many
> >users have run batch processing jobs based on Flink.
> >In our team, we are trying to migrate most of the existing online batch
> >jobs from Hive/Spark to Flink. We hope this migration does not require
> >users to modify their sql.
> >Although Hive is not as popular as it used to be, Hive SQL is still alive
> >because many users still use Hive SQL to run spark jobs.
> >Therefore, compatibility with more HIVE syntax is critical to this
> >migration work.
> >
> >Best,
> >Jing Zhang
> >
> >
> >
> >Martijn Visser  于2022年3月7日周一 19:23写道:
> >
> >> Hi everyone,
> >>
> >> Flink currently has 4 APIs with multiple language support which can be
> used
> >> to develop applications:
> >>
> >> * DataStream API, both Java and Scala
> >> * Table API, both Java and Scala
> >> * Flink SQL, both in Flink query syntax and Hive query syntax
> (partially)
> >> * Python API
> >>
> >> Since FLIP-152 [1] the Flink SQL support has been extended to also
> support
> >> the Hive query syntax. There is now a follow-up FLINK-26360 [2] to
> address
> >> more syntax compatibility issues.
> >>
> >> I would like to open a discussion on Flink directly supporting the Hive
> >> query syntax. I have some concerns if having a 100% Hive query syntax is
> >> indeed something that we should aim for in Flink.
> >>
> >> I can understand that having Hive query syntax support in Flink could
> help
> >> users due to interoperability and being able to migrate. However:
> >>
> >> - Adding full Hive query syntax support will mean that we go from 6
> fully
> >> supported API/language combinations to 7. I think we are currently
> already
> >> struggling with maintaining the existing combinations, let another one
> >> more.
> >> - Apache Hive is/appears to be a project that's not that actively
> developed
> >> anymore. The last release was made in January 2021. It's popularity is
> >> rapidly declining in Europe and the United State, also due Hadoop
> becoming
> >> less popular.
> >> - Related to the previous topic, other software like Snowflake,
> >>