Flink 1.12.8 release

2022-02-15 Thread Joey L
Hi,

Is there a planned release date for 1.12.8 or scheduled release cycle for
minor versions?

Regards,
J


退订

2022-02-15 Thread 天分
退订
发自我的iPhone

Job manager slots are in bad state.

2022-02-15 Thread Josson Paul
We are using Flink version 1.11.2.
At times if task managers are restarted for some reason, the job managers
throw the exception that I attached here. It is an illegal state exception.
We never had this issue with Flink 1.8. It started happening after
upgrading to Flink 1.11.2.

Why are the slots not released if it is in a bad state?. The issue doesn't
get resolved even if I restart all the task managers. It will get resolved
only if I restart Job manager.

java.util.concurrent.CompletionException: java.util.concurrent.
CompletionException: java.lang.IllegalStateException
at org.apache.flink.runtime.jobmaster.slotpool.
SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(
CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(
CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(
CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture
.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(
FutureUtils.java:1132)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(
CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture
.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(
CompletableFuture.java:2251)
at org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils
.java:1100)
at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager
.createRootSlot(SlotSharingManager.java:155)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateMultiTaskSlot(SchedulerImpl.java:477)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSharedSlot(SchedulerImpl.java:311)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.internalAllocateSlot(SchedulerImpl.java:160)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSlotInternal(SchedulerImpl.java:143)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSlot(SchedulerImpl.java:113)
at org.apache.flink.runtime.executiongraph.
SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(
SlotProviderStrategy.java:115)
at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator
.lambda$allocateSlotsFor$0(DefaultExecutionSlotAllocator.java:104)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(
CompletableFuture.java:1106)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(
CompletableFuture.java:2235)
at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator
.allocateSlotsFor(DefaultExecutionSlotAllocator.java:102)
at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlots(
DefaultScheduler.java:339)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.allocateSlotsAndDeploy(DefaultScheduler.java:312)
at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy
.allocateSlotsAndDeploy(EagerSchedulingStrategy.java:76)
at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy
.restartTasks(EagerSchedulingStrategy.java:57)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.lambda$restartTasks$2(DefaultScheduler.java:265)
at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(
CompletableFuture.java:783)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(
CompletableFuture.java:478)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
AkkaRpcActor.java:402)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:195)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at 

Re: Joining Flink tables with different watermark delay

2022-02-15 Thread Meghajit Mazumdar
Hi Francesco,

Thank you so much for your reply. This was really helpful. In reply to your
tips:

*> As described here
,
we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
TVF instead
*
Yes, we are trying to move towards windowing TVFs as well. Some of our
existing jobs still use Group Window Aggregation and hence we are still
using it.

*> You can directly use Window joins

as
well for your query, as they're meant exactly to cover your use case*
Thanks. Looks like it is used along with Windowing TVFs though. But I will
try to explore this.

*> Any particular reason you're creating the input tables from DataStream
instead than creating them directly from Table API using either CREATE
TABLE

or
TableDescriptor?*
We are creating a File Source which can read parquet files from a remote
GCS(Google Cloud Storage) bucket. We had evaluated this

to
create a table but we faced the following challenges :

   - We plan to use this parquet source to create a Hybrid Source later.
   Hence, we had to use a File Source.
   - A call to GCS returns files in lexicographic order. We wanted a high
   level deterministic order in which files are picked for reading and hence
   we resorted to using a File Source with a custom Split Assigner to assign
   the files to the source readers in some order.
   - Creating the table
   

   requires specifying the column names and data types. However, in our case
   we use the Protobuf schema to read the schema for a parquet file. Also,
   some values in the parquet file need some custom type conversion ( int64 ->
   timestamp, for example).


I had a question with regards to this point you mentioned :
*> In other words, it won't drop the content of kafkaTable immediately, but
after both streams are at "the same point in time" (defined by the
watermarks of both streams).*
Does it mean that the output of the join will be flushed to the sink at the
period defined by the minimum watermark ? That is, 60 minutes in the above
case ?
Also, I read here

that
Flink will remove old data from its state in case of interval joins. Does
this mean that data present in both the tables will be removed after the
minimum watermark delay ( 60 minutes in this case) ?

Regards,
Meghajit

On Mon, Feb 14, 2022 at 8:13 PM Francesco Guardiani 
wrote:

> Hi,
>
> So my understanding of your query is that you want to do a join first, and
> then group by a 60 minutes distance and aggregate them. Please correct me
> if I'm wrong.
>
> First of all, the query you've posted is incorrect and should fail, as its
> plan is invalid because it's using a regular join. Regular joins cannot be
> concatenated with other "time operations" like a group by window, as they
> don't produce any watermark.
>
> My suggestion for your query is to use an interval join
> 
> first, and then a group window. For example:
>
> SELECT TUMBLE_START(file_time, INTERVAL '60' MINUTE) AS event_time,
> MAX(TIMESTAMPDIFF(MINUTE, file_time, kafka_time))
> FROM (
>   SELECT fileTable.id AS id, fileTable.event_time AS file_time,
> kafkaTable.event_time AS kafka_time
>   FROM fileTable, kafkaTable
>   WHERE fileTable.id = kafkaTable.id AND 

退订

2022-02-15 Thread Y Luo
退订


Re: Failed to serialize the result for RPC call : requestMultipleJobDetails after Upgrading to Flink 1.14.3

2022-02-15 Thread Chirag Dewan
 Ah, should have looked better. I think 
https://issues.apache.org/jira/browse/FLINK-25732 causes this.
Are there any side effects of this? How can I avoid this problem so that it 
doesn't affect my processing?
Thanks
On Wednesday, 16 February, 2022, 10:19:12 am IST, Chirag Dewan 
 wrote:  
 
 Hi,
We are running a Flink cluster with 2 JMs in HA and 2 TMs on a standalone K8 
cluster. After migrating to 1.14.3, we started to see some exceptions in the JM 
logs:
2022-02-15 11:30:00,100 ERROR 
org.apache.flink.runtime.rest.handler.job.JobIdsHandler      [] POD_NAME: 
eric-bss-em-sm-streamserver-jobmanager-868fd68b5d-zs9pv - Unhandled 
exception.org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed 
to serialize the result for RPC call : requestMultipleJobDetails.        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_321]        at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
 ~[?:1.8.0_321]        at 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_321]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.Actor.aroundReceive(Actor.scala:537) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.Actor.aroundReceive$(Actor.scala:535) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_321]   
     at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) 
[?:1.8.0_321]        at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) 
[?:1.8.0_321]        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) 
[?:1.8.0_321]Caused by: java.io.NotSerializableException: 
java.util.HashMap$Values        at 

Failed to serialize the result for RPC call : requestMultipleJobDetails after Upgrading to Flink 1.14.3

2022-02-15 Thread Chirag Dewan
Hi,
We are running a Flink cluster with 2 JMs in HA and 2 TMs on a standalone K8 
cluster. After migrating to 1.14.3, we started to see some exceptions in the JM 
logs:
2022-02-15 11:30:00,100 ERROR 
org.apache.flink.runtime.rest.handler.job.JobIdsHandler      [] POD_NAME: 
eric-bss-em-sm-streamserver-jobmanager-868fd68b5d-zs9pv - Unhandled 
exception.org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed 
to serialize the result for RPC call : requestMultipleJobDetails.        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_321]        at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
 ~[?:1.8.0_321]        at 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_321]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.Actor.aroundReceive(Actor.scala:537) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.Actor.aroundReceive$(Actor.scala:535) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]        at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_321]   
     at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) 
[?:1.8.0_321]        at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) 
[?:1.8.0_321]        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) 
[?:1.8.0_321]Caused by: java.io.NotSerializableException: 
java.util.HashMap$Values        at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
~[?:1.8.0_321]        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
~[?:1.8.0_321]        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
~[?:1.8.0_321]        at 

????

2022-02-15 Thread ????????


Re: TM OOMKilled

2022-02-15 Thread Xintong Song
Thanks Alexey,

In my experience, common causes for TM OOMKill are:
1. RocksDB uses more memory than expected. Unfortunately, the memory hard
limit is not supported by RocksDB. Flink conservatively estimates RocksDB's
memory footprint and tunes its parameters accordingly, which is not 100%
safe.
2. The job (connectors, udfs, and their dependencies) may need direct and
native memory. When native memory is needed, increasing task off-heap
memory may not be as helpful as increasing the jvm overhead.
3. There could also be memory leaks, leading to continuously increasing
memory footprint. Based on your description that the OOM happens about
every 2days, this is highly suspected.

For 1 & 2, increase jvm overhead would help. For 3, you many need to
investigate the heap/thread dump to find out where the leak come from.

I'd suggest to first increase the jvm overhead see if it fix the problem.
If the problem is not fixed, but the job runs longer before the OOM
happens, then it's likely the 3rd case. Moreover, you can monitor the pod
memory footprint changes if such metrics are available.

Thank you~

Xintong Song



On Tue, Feb 15, 2022 at 11:56 PM Alexey Trenikhun  wrote:

> Hi Xintong,
> I've checked - `state.backend.rocksdb.memory.managed` is not explicitly
> configured, so as you wrote it should be true by default.
>
> Regarding task off-heap, I believe KafkaConsumer needed off-heap memory
> some time ago
>
> --
> *From:* Xintong Song 
> *Sent:* Monday, February 14, 2022 10:06 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: TM OOMKilled
>
> Hi Alexey,
>
> You may want to double check if `state.backend.rocksdb.memory.managed` is
> configured to `true`. (This should be `true` by default.)
>
> Another question that may or may not be related. I noticed that you have
> configured 128MB task off-heap memory, which IIRC the default should be 0.
> Could you share what that is for?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun  wrote:
>
> Hello,
> We use RocksDB, but there is no problem with Java heap, which is limited
> by 3.523gb, the problem with total container memory. The pod is killed
> not due OutOfMemoryError,  but because total container memory exceeds 10gb
>
> Thanks,
> Alexey
> --
> *From:* Caizhi Weng 
> *Sent:* Monday, February 14, 2022 6:42:05 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: TM OOMKilled
>
> Hi!
>
> Heap memory usage depends heavily on your job and your state backend.
> Which state backend are you using and if possible could you share your user
> code or explain what operations your job is doing?
>
> Alexey Trenikhun  于2022年2月15日周二 05:17写道:
>
> Hello,
> We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also
> have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *),
> once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on
> even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't
> see failed save points, so I assume OOM happens right after savepoint
> taken. However OOMKilled doesn't happen on every save point, so maybe this
> is a random correlation.
> I've reserved 2G for JVM overhead, but somehow it is not enough ? Any
> known issues with memory and savepoints? Any suggestions how to
> troubleshoot this?
>
>  Final TaskExecutor Memory configuration:
>Total Process Memory:  10.000gb (10737418240 bytes)
>  Total Flink Memory:  7.547gb (8103395328 bytes)
>Total JVM Heap Memory: 3.523gb (3783262149 bytes)
>  Framework:   128.000mb (134217728 bytes)
>  Task:3.398gb (3649044421 bytes)
>Total Off-heap Memory: 4.023gb (4320133179 bytes)
>  Managed: 3.019gb (3241358179 bytes)
>  Total JVM Direct Memory: 1.005gb (1078775000 bytes)
>Framework: 128.000mb (134217728 bytes)
>Task:  128.000mb (134217728 bytes)
>Network:   772.800mb (810339544 bytes)
>  JVM Metaspace:   256.000mb (268435456 bytes)
>  JVM Overhead:2.203gb (2365587456 bytes)
>
> Thanks,
> Alexey
>
>


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-15 Thread saravana...@gmail.com
Thanks Zhipeng.  Working as expected.  Thanks once again.

Saravanan

On Tue, Feb 15, 2022 at 3:23 AM Zhipeng Zhang 
wrote:

> Hi Saravanan,
>
> One solution could be using a streamOperator to implement `BoundedOneInput`
> interface.
> An example code could be found here [1].
>
> [1]
> https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75
>
> saravana...@gmail.com  于2022年2月15日周二 02:44写道:
>
>> Hi Niklas,
>>
>> Thanks for your reply.  Approach [1] works only if operators are chained
>> (in order words, operators executed within the same task).   Since
>> mapPartition operator parallelism is different from previous operator
>> parallelism, it doesn't fall under the same task(or not chained) .
>>
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
>> https://issues.apache.org/jira/browse/FLINK-14709
>>
>> Saravanan
>>
>> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler 
>> wrote:
>>
>>> Hi Saravanan,
>>>
>>> AFAIK the last record is not treated differently.
>>>
>>> Does the approach in [1] not work?
>>>
>>> Best regards,
>>> Niklas
>>>
>>>
>>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>>
>>>
>>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com 
>>> wrote:
>>> >
>>> > Is there any way to identify the last message inside RichFunction in
>>> BATCH mode ?
>>> >
>>> >
>>> >
>>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
>>> saravana...@gmail.com> wrote:
>>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>>> DataStream api. mapPartition is not available in Flink DataStream.
>>> >
>>> > Current Code using Flink 1.12.x DataSet :
>>> >
>>> > dataset
>>> > .
>>> > .mapPartition(new SomeMapParitionFn())
>>> > .
>>> >
>>> > public static class SomeMapPartitionFn extends
>>> RichMapPartitionFunction {
>>> >
>>> > @Override
>>> > public void mapPartition(Iterable records,
>>> Collector out) throws Exception {
>>> > for (InputModel record : records) {
>>> > /*
>>> > do some operation
>>> >  */
>>> > if (/* some condition based on processing *MULTIPLE*
>>> records */) {
>>> >
>>> > out.collect(...); // Conditional collect
>>>   ---> (1)
>>> > }
>>> > }
>>> >
>>> > // At the end of the data, collect
>>> >
>>> > out.collect(...);   // Collect processed data
>>>  ---> (2)
>>> > }
>>> > }
>>> >
>>> >   • (1) - Collector.collect invoked based on some condition after
>>> processing few records
>>> >   • (2) - Collector.collect invoked at the end of data
>>> >
>>> > Initially we thought of using flatMap instead of mapPartition, but the
>>> collector is not available in close function.
>>> >
>>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>>> case of chained drivers
>>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>>> >
>>> > Note: Our application works with only finite set of data (Batch Mode)
>>> >
>>>
>>>
>
> --
> best,
> Zhipeng
>
>


Re: Exception Help

2022-02-15 Thread Jonathan Weaver
I've narrowed it down to a TableSource that is returning a MAP type as a
column. Only errors when the column is referenced, and not on the first
row, but somewhere in the stream of rows.

On 1.15 master branch (I need the new JSON features in 1.15 for this
project so riding the daily snapshot during development)

In catalog column is defined as
.column("vc", DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING(

My TableFunction is returning the following for the column

  return new GenericMapData(
  fields.toJavaMap(
  v ->
  new Tuple2(
  StringData.fromString(v.getKey()),
  new GenericArrayData(
  v.getValue().isArray()
  ? List.ofAll(() -> ((ArrayNode)
v.getValue()).elements())
  .map(vv ->
StringData.fromString(vv.asText()))

.toJavaArray(StringData[]::new)
  :
List.of(StringData.fromString(v.getValue().asText()))

.toJavaArray(StringData[]::new);
});

Where it's basically looping over a jackson JsonNode parsed from a DB table
and returning as a MAP (the keys and values are sparse amongst hundreds of
possibilities). The values in the Json are either a single text value, or
an array of text values so I'm just turning all values into an array.

There are around ~190 key-values in the map on average.

The SQL that references the column is just

COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,

So looks up a specific key and uses it if it exists, otherwise coalesces to
a generic string.

And I keep getting this exception during the processing on a random row.

Caused by: java.lang.IndexOutOfBoundsException: offset=0, targetOffset=24,
numBytes=8, address=16, targetAddress=16
at
org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
at TableCalcMapFunction$130.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)

Is that enough context or is there something else I can give you all?

Thanks!




On Tue, Feb 15, 2022 at 1:24 PM Sid Kal  wrote:

> Hi Jonathan,
>
> It would be better if you describe your scenario along with the code. It
> would be easier for the community to help.
>
> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, 
> wrote:
>
>> I'm getting the following exception running locally from my IDE
>> (IntelliJ) but seems to not occur
>> when running on a cluster. I'm assuming it may be related to memory
>> settings on the runtime (machine has 64GB of ram avail) but not sure what
>> setting to try and change.
>>
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=3568, numBytes=40, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
>> at TableCalcMapFunction$148.flatMap(Unknown Source)
>> at
>> 

????

2022-02-15 Thread ??????


退订

2022-02-15 Thread 但宝平
退订

Re:flink sql jdbc sink事务提交问题

2022-02-15 Thread Michael Ran
jdbc 连接 mysql 的driver  记得默认就是AutoCommit。phoenix不太清楚
在 2022-02-15 13:25:07,"casel.chen"  写道:
>最近在扩展flink sql jdbc 
>connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。
>源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary()方法,里面根据connection.getAutoFlush()与否来执行connection.flush()操作。
>一开始我没有在phoenix jdbc 
>url上添加;autocommit=true参数,发现变化的数据并没有commit到数据库。后来添加了;autocommit=true参数后执行了connection.commit()方法才将数据提交成功。
>
>
>有几个疑问:
>1. 换成sink进mysql数据库就没有这个问题,难道不同数据库的jdbc sink行为会不一样么?
>2. connection autoflush参数在哪里设置?跟autocommit区别是什么?
>3. 
>buffer条数满了或interval周期达到又或者checkpoint时就会执行flush操作,里面执行的是JdbcBatchingOutputFormat.flush方法,这里我也没有找到connection.commit()操作,数据是如何提交到数据库的呢?不开启事务情况下,执行完statement.executeBatch()就会提交么?


Re:jdbc connector ??????????????????????????????????????????????????????????????

2022-02-15 Thread Michael Ran




?? 2022-02-14 15:40:11??"jianjianjianjianjianjianjianjian" 
<724125...@qq.com.INVALID> ??


  ??jdbc 
connector
  ??1.13 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat ??
   master ?? 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat  



1.13:


master ??







Change column names Pyflink Table/Datastream API

2022-02-15 Thread Francis Conroy
Hi all,

I'm hoping to be able to change the column names when creating a table from
a datastream, the flatmap function generating the stream is returning a
Tuple4.

It's currently working as follows:

inputmetrics = table_env.from_data_stream(ds, Schema.new_builder()
  .column("f0", "BIGINT")
  .column("f1", "STRING")
  .column("f2", "STRING")
  .column("f3", "DOUBLE")
  .build())

I'm trying to rename the columns f0, f1, f2, f3 to proper names e.g.
timestamp, device, name, value. So far I've tried using from_fields, and

column_by_expression("timestamp", "f0")

I'd prefer not to change the output type of my previous flatMapFunction (to
say a named Row) for performance purposes.

Thanks,
Francis

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


How to get memory specific metrics for tasknodes

2022-02-15 Thread Diwakar Jha
Hello,

I'm running Flink 1.11 on AWS EMR using the Yarn application. I'm trying to
access memory metrics(Heap.Max, Heap.Used) per tasknode in CloudWatch. I
have 50 tasknodes and it creates Millions of metrics(including per
operator) though I need only a few metrics per tasknode (Heap.Max,
Heap.Used). It is way too much than my current cloudwatch limit and I also
don't need so many metrics.
Could someone please help me how to get only the tasknode memory specific
metrics ?
I'm referring to this doc :
https://nightlies.apache.org/flink/flink-docs-release-1.7/monitoring/metrics.html#memory

I used the following approach to enable Flink metrics.
1. Enable Flink Metrics
copy /opt/flink-metrics-statsd-x.x.jar into the /lib folder of your Flink
distribution
2.  Add StatsD metric reporter in Flink-conf to send to CloudWatch Agent's
StatsD interface
metrics.reporters: stsd
metrics.reporter.stsd.factory.class:
org.apache.flink.metrics.statsd.StatsDReporterFactory
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
3. Setup tasknode scope
metrics.scope.tm: taskmanager
4. setup Cloudwatch agent to publish the metrics
"metrics":{
  "namespace": "CustomeNamespace/FlinkMemoryMetrics",
  "metrics_collected":{
 "statsd":{
"service_address":":8125",
"metrics_collection_interval":60,
"metrics_aggregation_interval":300
 }
  }
  },

Thanks!


Re: Exception Help

2022-02-15 Thread Sid Kal
Hi Jonathan,

It would be better if you describe your scenario along with the code. It
would be easier for the community to help.

On Tue, 15 Feb 2022, 23:33 Jonathan Weaver,  wrote:

> I'm getting the following exception running locally from my IDE (IntelliJ)
> but seems to not occur
> when running on a cluster. I'm assuming it may be related to memory
> settings on the runtime (machine has 64GB of ram avail) but not sure what
> setting to try and change.
>
> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
> targetOffset=3568, numBytes=40, address=16, targetAddress=16
> at
> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
> at
> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
> at TableCalcMapFunction$148.flatMap(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>
> Was wondering if anyone had any insights or pointers on what could be
> causing that?
>
> Thanks!
> Jonathan
>
>


Exception Help

2022-02-15 Thread Jonathan Weaver
I'm getting the following exception running locally from my IDE (IntelliJ)
but seems to not occur
when running on a cluster. I'm assuming it may be related to memory
settings on the runtime (machine has 64GB of ram avail) but not sure what
setting to try and change.

Caused by: java.lang.IndexOutOfBoundsException: offset=0,
targetOffset=3568, numBytes=40, address=16, targetAddress=16
at
org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
at TableCalcMapFunction$148.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)

Was wondering if anyone had any insights or pointers on what could be
causing that?

Thanks!
Jonathan


Re: Log4j2 configuration

2022-02-15 Thread jonas eyob
1. Ok, thanks!
2. We are using application mode. No changes to the distribution other than
updating the log4j-console.properties file.

content of /lib/:

* flink-csv-1.14.3.jar
* flink-json-1.14.3.jar
* flink-table_2.12-1.14.3.jar
* log4j-api-2.17.1.jar
* log4j-slf4j-impl-2.17.1.jar
* flink-dist_2.12-1.14.3.jar
* flink-shaded-zookeeper-3.4.14.jar
* log4j-1.2-api-2.17.1.jar
* log4j-core-2.17.1.jar

Den tis 15 feb. 2022 kl 16:30 skrev Chesnay Schepler :

> 1) You either need to modify the log4j-console.properties file, or
> explicitly set the log4j.configurationFile property to point to your .xml
> file.
> 2)
> Have you made modifications to the distribution (e.g., removing other
> logging jars from the lib directory)?
> Are you using application mode, or session clusters?
>
> On 15/02/2022 16:41, jonas eyob wrote:
>
> Hey,
>
> We are deploying our Flink Cluster on a standalone Kubernetes with the
> longrunning job written in scala.
>
> We recently upgraded our Flink cluster from 1.12 to 1.14.3 - after which
> we started seeing a few problems related to logging which I have been
> struggling to fix for the past days).
> Related is also an attempt to add, we are also attempting to add a Sentry
> integration for our error logs.
>
> PROBLEM 1 - Error logs not being sent to Sentry.
> We are bundling our code and dependencies into a FAT jar, which includes a
> log4j2.xml specifying the Sentry Appender. But if I understand the
> documentation
> 
> correctly our log4j2.xml won't be picked up by Flink as it already defines
> a set of default logging configurations files (e.g. log4j and logback).
>
> Q: How does Flink resolve logging configurations to use?
>
> I can see the following JVM override params provided when running in our
> dockerized version locally.
>
> -Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6
> b9785d4df-c28n4.log
> 2022-02-15 10:01:59,826 INFO org.apache.flink.runtime.taskexecutor.
> TaskManagerRunner [] - -Dlog4j.configuration=
> file:/opt/flink/conf/log4j-console.properties
> 2022-02-15 10:01:59,827 INFO org.apache.flink.runtime.taskexecutor.
> TaskManagerRunner [] - -Dlog4j.configurationFile=
> file:/opt/flink/conf/log4j-console.properties
> 2022-02-15 10:01:59,830 INFO org.apache.flink.runtime.taskexecutor.
> TaskManagerRunner [] - -Dlogback.configurationFile=
> file:/opt/flink/conf/logback-console.xml
>
> Content of the log4j2.xml (path: src/main/resources):
>
>  packages="org.apache.logging.log4j.core,io.sentry.log4j2"> 
> pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
>  minimumEventLevel="ERROR"/>
> 
> 
>
>
> For our kubernetes deployment we have followed the reference example here
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions
> .
> My assumption is that I would need to also provide the Sentry-related
> configuration to the "log4-console.properties" for it to be picked up by
> the Taskmanager and JobManager?
>
> PROBLEM 2:
> ERROR StatusLogger Log4j2 could not find a logging implementation.
> Please add log4j-core to the classpath. Using SimpleLogger to log to the
> console
>
> I am not sure what's going on here. Following dependencies are bundled
> with the FAT jar
>
> "com.typesafe.scala-logging" %% "scala-logging" % 
> scalaLoggingVersion,"org.slf4j" % "slf4j-api" % 
> "1.7.33","org.apache.logging.log4j" % "log4j-slf4j-impl" % 
> "2.17.0","org.apache.logging.log4j" % "log4j-core" % 
> "2.17.0","org.apache.logging.log4j" %% "log4j-api-scala" % "12.0","io.sentry" 
> % "sentry-log4j2" % "5.6.0",
>
> Confused about what is going on here, possible this might not be Flink
> related matter but I am not sure..any tips on how to best debug this would
> be much appreciated.
> --
> *Thanks,*
> *Jonas*
>
>
>

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*


Re: method select(org.apache.flink.table.api.ApiExpression cannot find symbol .select($("handlingTime"),

2022-02-15 Thread HG

it worked

Table t = tupled3DsTable
.window(Session.withGap(lit(5).minutes()).on($("handlingTime")).as("w"))
.groupBy($("transactionId"))
.select($("handlingTime"), $("transactionId"),
$("originalEvent"), $("handlingTime").sum().over($("w")));


Op di 15 feb. 2022 om 15:50 schreef Chesnay Schepler :

> Aren't you missing a groupBy() between window() and select()?
>
> On 15/02/2022 15:45, HG wrote:
>
> Hi all,
>
> When I execute the code :
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder()
> .column("f0","TIMESTAMP_LTZ(3)") // Only  
> TIMESTAMP_LTZ(0) to TIMESTAMP_LTZ(3) allowed
> .column("f1","STRING")
> .column("f2","STRING")
> .watermark("f0", "SOURCE_WATERMARK()")
> .build()).as("handlingTime", "transactionId", 
> "originalEvent");
>
> tupled3DsTable.printSchema();
>
> Table t = tupled3DsTable
>   
> .window(Session.withGap(lit(5).minutes()).on($("handlingTime")).as("w"))
>   .select($("handlingTime"), $("transactionId"), $("originalEvent"), 
> $("handlingTime").sum().over($("w")));
>
> I get:
>
> error: cannot find symbol
> .select($("handlingTime"), $("transactionId"),
> $("originalEvent"), $("handlingTime").sum().over($("w")));
> ^
>   symbol:   method
> select(org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression)
>   location: interface org.apache.flink.table.api.GroupWindowedTable
>
> The printSchema gives:
>
> (
>   `handlingTime` TIMESTAMP_LTZ(3) *ROWTIME*,
>   `transactionId` STRING,
>   `originalEvent` STRING
>
> )
>
> Regards Hans
>
>
>


Re: Log4j2 configuration

2022-02-15 Thread Chesnay Schepler
1) You either need to modify the log4j-console.properties file, or 
explicitly set the log4j.configurationFile property to point to your 
.xml file.

2)
Have you made modifications to the distribution (e.g., removing other 
logging jars from the lib directory)?

Are you using application mode, or session clusters?

On 15/02/2022 16:41, jonas eyob wrote:

Hey,

We are deploying our Flink Cluster on a standalone Kubernetes with the 
longrunning job written in scala.


We recently upgraded our Flink cluster from 1.12 to 1.14.3 - after 
which we started seeing a few problems related to logging which I have 
been struggling to fix for the past days).
Related is also an attempt to add, we are also attempting to add a 
Sentry integration for our error logs.


PROBLEM 1 - Error logs not being sent to Sentry.
We are bundling our code and dependencies into a FAT jar, which 
includes a log4j2.xml specifying the Sentry Appender. But if I 
understand the documentation 
 
correctly our log4j2.xml won't be picked up by Flink as it already 
defines a set of default logging configurations files (e.g. log4j and 
logback).


Q: How does Flink resolve logging configurations to use?

I can see the following JVM override params provided when running in 
our dockerized version locally.


-Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6b9785d4df-c28n4.log
2022-02-15 10:01:59,826 INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,827 INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,830 INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml


Content of the log4j2.xml (path: src/main/resources):
 packages="org.apache.logging.log4j.core,io.sentry.log4j2">  
 pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> 
 />ref="Sentry"/>



For our kubernetes deployment we have followed the reference example 
here 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions. 

My assumption is that I would need to also provide the Sentry-related 
configuration to the "log4-console.properties" for it to be picked up 
by the Taskmanager and JobManager?


PROBLEM 2:
ERROR StatusLogger Log4j2 could not find a logging implementation.
Please add log4j-core to the classpath. Using SimpleLogger to log to 
the console


I am not sure what's going on here. Following dependencies are bundled 
with the FAT jar

"com.typesafe.scala-logging" %%"scala-logging" % scalaLoggingVersion, "org.slf4j" %"slf4j-api" %"1.7.33", "org.apache.logging.log4j" %"log4j-slf4j-impl" %"2.17.0", 
"org.apache.logging.log4j" %"log4j-core" %"2.17.0", "org.apache.logging.log4j" %%"log4j-api-scala" %"12.0", "io.sentry" %"sentry-log4j2" %"5.6.0",
Confused about what is going on here, possible this might not be Flink 
related matter but I am not sure..any tips on how to best debug this 
would be much appreciated.

--
*Thanks,*
*Jonas*




Re: TM OOMKilled

2022-02-15 Thread Alexey Trenikhun
Hi Xintong,
I've checked - `state.backend.rocksdb.memory.managed` is not explicitly 
configured, so as you wrote it should be true by default.

Regarding task off-heap, I believe KafkaConsumer needed off-heap memory some 
time ago


From: Xintong Song 
Sent: Monday, February 14, 2022 10:06 PM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: TM OOMKilled

Hi Alexey,

You may want to double check if `state.backend.rocksdb.memory.managed` is 
configured to `true`. (This should be `true` by default.)

Another question that may or may not be related. I noticed that you have 
configured 128MB task off-heap memory, which IIRC the default should be 0. 
Could you share what that is for?


Thank you~

Xintong Song


On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
We use RocksDB, but there is no problem with Java heap, which is limited by 
3.523gb, the problem with total container memory. The pod is killed not due 
OutOfMemoryError,  but because total container memory exceeds 10gb

Thanks,
Alexey

From: Caizhi Weng mailto:tsreape...@gmail.com>>
Sent: Monday, February 14, 2022 6:42:05 PM
To: Alexey Trenikhun mailto:yen...@msn.com>>
Cc: Flink User Mail List mailto:user@flink.apache.org>>
Subject: Re: TM OOMKilled

Hi!

Heap memory usage depends heavily on your job and your state backend. Which 
state backend are you using and if possible could you share your user code or 
explain what operations your job is doing?

Alexey Trenikhun mailto:yen...@msn.com>> 于2022年2月15日周二 05:17写道:
Hello,
We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have 
Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in 
while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 
minutes after savepoint start (e.g. 12:18, 4:18) but I don't see failed save 
points, so I assume OOM happens right after savepoint taken. However OOMKilled 
doesn't happen on every save point, so maybe this is a random correlation.
I've reserved 2G for JVM overhead, but somehow it is not enough ? Any known 
issues with memory and savepoints? Any suggestions how to troubleshoot this?

 Final TaskExecutor Memory configuration:
   Total Process Memory:  10.000gb (10737418240 bytes)
 Total Flink Memory:  7.547gb (8103395328 bytes)
   Total JVM Heap Memory: 3.523gb (3783262149 bytes)
 Framework:   128.000mb (134217728 bytes)
 Task:3.398gb (3649044421 bytes)
   Total Off-heap Memory: 4.023gb (4320133179 bytes)
 Managed: 3.019gb (3241358179 bytes)
 Total JVM Direct Memory: 1.005gb (1078775000 bytes)
   Framework: 128.000mb (134217728 bytes)
   Task:  128.000mb (134217728 bytes)
   Network:   772.800mb (810339544 bytes)
 JVM Metaspace:   256.000mb (268435456 bytes)
 JVM Overhead:2.203gb (2365587456 bytes)

Thanks,
Alexey


Log4j2 configuration

2022-02-15 Thread jonas eyob
Hey,

We are deploying our Flink Cluster on a standalone Kubernetes with the
longrunning job written in scala.

We recently upgraded our Flink cluster from 1.12 to 1.14.3 - after which we
started seeing a few problems related to logging which I have been
struggling to fix for the past days).
Related is also an attempt to add, we are also attempting to add a Sentry
integration for our error logs.

PROBLEM 1 - Error logs not being sent to Sentry.
We are bundling our code and dependencies into a FAT jar, which includes a
log4j2.xml specifying the Sentry Appender. But if I understand the
documentation

correctly our log4j2.xml won't be picked up by Flink as it already defines
a set of default logging configurations files (e.g. log4j and logback).

Q: How does Flink resolve logging configurations to use?

I can see the following JVM override params provided when running in our
dockerized version locally.

-Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6
b9785d4df-c28n4.log
2022-02-15 10:01:59,826 INFO org.apache.flink.runtime.taskexecutor.
TaskManagerRunner [] - -Dlog4j
.configuration=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,827 INFO org.apache.flink.runtime.taskexecutor.
TaskManagerRunner [] - -Dlog4j
.configurationFile=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,830 INFO org.apache.flink.runtime.taskexecutor.
TaskManagerRunner [] - -Dlogback
.configurationFile=file:/opt/flink/conf/logback-console.xml

Content of the log4j2.xml (path: src/main/resources):


















For our kubernetes deployment we have followed the reference example here
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions
.
My assumption is that I would need to also provide the Sentry-related
configuration to the "log4-console.properties" for it to be picked up by
the Taskmanager and JobManager?


PROBLEM 2:
ERROR StatusLogger Log4j2 could not find a logging implementation.
Please add log4j-core to the classpath. Using SimpleLogger to log to the
console

I am not sure what's going on here. Following dependencies are bundled with
the FAT jar

"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
"org.slf4j" % "slf4j-api" % "1.7.33",
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.17.0",
"org.apache.logging.log4j" % "log4j-core" % "2.17.0",
"org.apache.logging.log4j" %% "log4j-api-scala" % "12.0",
"io.sentry" % "sentry-log4j2" % "5.6.0",

Confused about what is going on here, possible this might not be Flink
related matter but I am not sure..any tips on how to best debug this would
be much appreciated.
-- 
*Thanks,*
*Jonas*


Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-15 Thread Martijn Visser
Hi Arujit,

I'm also looping in some contributors from the connector and runtime
perspective in this thread. Did you also test the upgrade first by only
upgrading to Flink 1.14 and keeping the FlinkKafkaConsumer? That would
offer a better way to determine if a regression is caused by the upgrade of
Flink or because of the change in connector.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Tue, 15 Feb 2022 at 13:07, Arujit Pradhan 
wrote:

> Hey team,
>
> We are migrating our Flink codebase and a bunch of jobs from Flink-1.9 to
> Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs for
> a week both in 1.9 and 1.14 simultaneously with the same resources and
> configurations and monitored them.
>
> Though most of the jobs are running fine, we have significant performance
> degradation in some of the high throughput jobs during peak hours. As a
> result, we can see high lag and data drops while processing messages from
> Kafka in some of the jobs in 1.14 while in 1.9 they are working just fine.
> Now we are debugging and trying to understand the potential reason for it.
>
> One of the hypotheses that we can think of is the change in the sequence
> of processing in the source-operator. To explain this, adding screenshots
> for the problematic tasks below.
> The first one is for 1.14 and the second is for 1.9. Upon inspection, it
> can be seen the sequence of processing 1.14 is -
>
> data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.
>
> While in 1.9 it was,
>
> data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.
>
> In 1.14 we are using KafkaSource API while in the older version it was
> FlinkKafkaConsumer API. Wanted to understand if it can cause potential
> performance decline as all other configurations/resources for both of the
> jobs are identical and if so then how to avoid it. Also, we can not see any
> unusual behaviour for the CPU/Memory while monitoring the affected jobs.
>
> Source Operator in 1.14 :
> [image: image.png]
> Source Operator in 1.9 :
> [image: image.png]
> Thanks in advance,
> //arujit
>
>
>
>
>
>
>


Re: method select(org.apache.flink.table.api.ApiExpression cannot find symbol .select($("handlingTime"),

2022-02-15 Thread HG
Will try.
The order of clauses is a little bit obscure to me.

I would expect the groupBy to come first
In the docs it only does the window and select
Indeed I need to groupBy.

Will share the complete final class when I have finished this project so
that others can benefit.

On Tue, Feb 15, 2022, 15:50 Chesnay Schepler  wrote:

> Aren't you missing a groupBy() between window() and select()?
>
> On 15/02/2022 15:45, HG wrote:
>
> Hi all,
>
> When I execute the code :
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder()
> .column("f0","TIMESTAMP_LTZ(3)") // Only  
> TIMESTAMP_LTZ(0) to TIMESTAMP_LTZ(3) allowed
> .column("f1","STRING")
> .column("f2","STRING")
> .watermark("f0", "SOURCE_WATERMARK()")
> .build()).as("handlingTime", "transactionId", 
> "originalEvent");
>
> tupled3DsTable.printSchema();
>
> Table t = tupled3DsTable
>   
> .window(Session.withGap(lit(5).minutes()).on($("handlingTime")).as("w"))
>   .select($("handlingTime"), $("transactionId"), $("originalEvent"), 
> $("handlingTime").sum().over($("w")));
>
> I get:
>
> error: cannot find symbol
> .select($("handlingTime"), $("transactionId"),
> $("originalEvent"), $("handlingTime").sum().over($("w")));
> ^
>   symbol:   method
> select(org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression)
>   location: interface org.apache.flink.table.api.GroupWindowedTable
>
>  The printSchema gives:
>
> (
>   `handlingTime` TIMESTAMP_LTZ(3) *ROWTIME*,
>   `transactionId` STRING,
>   `originalEvent` STRING
>
> )
>
>  Regards Hans
>
>
>


Re: method select(org.apache.flink.table.api.ApiExpression cannot find symbol .select($("handlingTime"),

2022-02-15 Thread Chesnay Schepler

Aren't you missing a groupBy() between window() and select()?

On 15/02/2022 15:45, HG wrote:

Hi all,

When I execute the code :
Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder()
 .column("f0","TIMESTAMP_LTZ(3)")// Only TIMESTAMP_LTZ(0) to TIMESTAMP_LTZ(3) 
allowed .column("f1","STRING")
 .column("f2","STRING")
 .watermark("f0","SOURCE_WATERMARK()")
 
.build()).as("handlingTime","transactionId","originalEvent");

 tupled3DsTable.printSchema();
 Table t = tupled3DsTable
   
.window(Session.withGap(lit(5).minutes()).on($("handlingTime")).as("w"))
   .select($("handlingTime"), $("transactionId"), $("originalEvent"), 
$("handlingTime").sum().over($("w")));
I get:

error: cannot find symbol
                .select($("handlingTime"), $("transactionId"), 
$("originalEvent"), $("handlingTime").sum().over($("w")));

                ^
  symbol:   method 
select(org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression)

  location: interface org.apache.flink.table.api.GroupWindowedTable
The printSchema gives:
(
  `handlingTime` TIMESTAMP_LTZ(3) *ROWTIME*,
  `transactionId` STRING,
  `originalEvent` STRING
)
Regards Hans




method select(org.apache.flink.table.api.ApiExpression cannot find symbol .select($("handlingTime"),

2022-02-15 Thread HG
Hi all,

When I execute the code :

Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder()
.column("f0","TIMESTAMP_LTZ(3)") // Only
TIMESTAMP_LTZ(0) to TIMESTAMP_LTZ(3) allowed
.column("f1","STRING")
.column("f2","STRING")
.watermark("f0", "SOURCE_WATERMARK()")
.build()).as("handlingTime", "transactionId",
"originalEvent");

tupled3DsTable.printSchema();

Table t = tupled3DsTable
  
.window(Session.withGap(lit(5).minutes()).on($("handlingTime")).as("w"))
  .select($("handlingTime"), $("transactionId"),
$("originalEvent"), $("handlingTime").sum().over($("w")));

I get:

error: cannot find symbol
.select($("handlingTime"), $("transactionId"),
$("originalEvent"), $("handlingTime").sum().over($("w")));
^
  symbol:   method
select(org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression,org.apache.flink.table.api.ApiExpression)
  location: interface org.apache.flink.table.api.GroupWindowedTable


The printSchema gives:

(
  `handlingTime` TIMESTAMP_LTZ(3) *ROWTIME*,
  `transactionId` STRING,
  `originalEvent` STRING

)


Regards Hans


Re: Unit test harness for Sources

2022-02-15 Thread James Sandys-Lumsdaine
Thanks for the reply. If I upgrade my legacy Sources to use the new split 
Sources is there a better unit test harness for that?

Thanks,

James.

Sent from my iPhone

On 15 Feb 2022, at 13:24, Chesnay Schepler  wrote:


I don't think there is anything of the sort for the legacy sources. I would 
suggest to follow the example at 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/testing/#testing-flink-jobs
 and using a job that only contains the source (+ something to either extract 
the results or verify it within the job).

On 14/02/2022 18:06, James Sandys-Lumsdaine wrote:
Hi all,

I've been using the test harness classes to unit test my stateful 1 and 2 
stream functions. But I also have some stateful legacy Source classes I would 
like to unit test and can't find any documentation or example for that - is 
this possible?

Thanks,

James.



Re: Unit test harness for Sources

2022-02-15 Thread Chesnay Schepler
I don't think there is anything of the sort for the legacy sources. I 
would suggest to follow the example at 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/testing/#testing-flink-jobs 
and using a job that only contains the source (+ something to either 
extract the results or verify it within the job).


On 14/02/2022 18:06, James Sandys-Lumsdaine wrote:

Hi all,

I've been using the test harness classes to unit test my stateful 1 
and 2 stream functions. But I also have some stateful legacy Source 
classes I would like to unit test and can't find any documentation or 
example for that - is this possible?


Thanks,

James.




Re: Save app-global cache used by RichAsyncFunction to Flink State?

2022-02-15 Thread Chesnay Schepler
I'm not sure if this would work, but you could try implementing the 
CheckpointedFunction interface and getting access to state that way.


On 14/02/2022 16:25, Clayton Wohl wrote:
Is there any way to save a custom application-global cache into Flink 
state so that it is used with checkpoints + savepoints? This cache is 
used by a RichAsyncFunction that queries an external database, and 
RichAsyncFunction doesn't support the Flink state functionality directly.


I asked this last week but got no answers. I wanted to ask a second 
time. Thank you :)





Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-15 Thread Arujit Pradhan
Hey team,

We are migrating our Flink codebase and a bunch of jobs from Flink-1.9 to
Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs for
a week both in 1.9 and 1.14 simultaneously with the same resources and
configurations and monitored them.

Though most of the jobs are running fine, we have significant performance
degradation in some of the high throughput jobs during peak hours. As a
result, we can see high lag and data drops while processing messages from
Kafka in some of the jobs in 1.14 while in 1.9 they are working just fine.
Now we are debugging and trying to understand the potential reason for it.

One of the hypotheses that we can think of is the change in the sequence of
processing in the source-operator. To explain this, adding screenshots for
the problematic tasks below.
The first one is for 1.14 and the second is for 1.9. Upon inspection, it
can be seen the sequence of processing 1.14 is -

data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.

While in 1.9 it was,

data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.

In 1.14 we are using KafkaSource API while in the older version it was
FlinkKafkaConsumer API. Wanted to understand if it can cause potential
performance decline as all other configurations/resources for both of the
jobs are identical and if so then how to avoid it. Also, we can not see any
unusual behaviour for the CPU/Memory while monitoring the affected jobs.

Source Operator in 1.14 :
[image: image.png]
Source Operator in 1.9 :
[image: image.png]
Thanks in advance,
//arujit


Re: How to cogroup multiple streams?

2022-02-15 Thread Chesnay Schepler
You could first transform each stream to a common format (in the worst 
case, an ugly Either-like capturing all possible types), union those 
streams, and then do a keyBy + window function.


This is how coGroup is implemented internally.

On 14/02/2022 16:08, Will Lauer wrote:
OK, here's what I hope is a stupid question: what's the most efficient 
way to co-group more than 2 DataStreams together? I'm looking at 
porting a pipeline from pig to flink, and in a couple of places I use 
Pig's COGROUP functionality to simultaneously group 3 or 4 and 
sometimes even more datasets on the same key simultaneously. Looking 
at the Datastream API, I see how to group 2 datastreams, but I don't 
see anything obvious for processing more than two simultaneously. 
Obviously I could cogroup two, then cogroup the result with the next 
one, etc adding each stream serially to the result, but that seems 
inefficient. Is there a better way?


Will


*
*

Will Lauer

*
*

Senior Principal Architect, Audience & Advertising Reporting

Data Platforms & Systems Engineering

*
*

M 508 561 6427

Champaign Office

1908 S. First St

Champaign, IL 61822



Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-15 Thread Zhipeng Zhang
Hi Saravanan,

One solution could be using a streamOperator to implement `BoundedOneInput`
interface.
An example code could be found here [1].

[1]
https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75

saravana...@gmail.com  于2022年2月15日周二 02:44写道:

> Hi Niklas,
>
> Thanks for your reply.  Approach [1] works only if operators are chained
> (in order words, operators executed within the same task).   Since
> mapPartition operator parallelism is different from previous operator
> parallelism, it doesn't fall under the same task(or not chained) .
>
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
> https://issues.apache.org/jira/browse/FLINK-14709
>
> Saravanan
>
> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler 
> wrote:
>
>> Hi Saravanan,
>>
>> AFAIK the last record is not treated differently.
>>
>> Does the approach in [1] not work?
>>
>> Best regards,
>> Niklas
>>
>>
>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>
>>
>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com 
>> wrote:
>> >
>> > Is there any way to identify the last message inside RichFunction in
>> BATCH mode ?
>> >
>> >
>> >
>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
>> saravana...@gmail.com> wrote:
>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>> DataStream api. mapPartition is not available in Flink DataStream.
>> >
>> > Current Code using Flink 1.12.x DataSet :
>> >
>> > dataset
>> > .
>> > .mapPartition(new SomeMapParitionFn())
>> > .
>> >
>> > public static class SomeMapPartitionFn extends
>> RichMapPartitionFunction {
>> >
>> > @Override
>> > public void mapPartition(Iterable records,
>> Collector out) throws Exception {
>> > for (InputModel record : records) {
>> > /*
>> > do some operation
>> >  */
>> > if (/* some condition based on processing *MULTIPLE*
>> records */) {
>> >
>> > out.collect(...); // Conditional collect
>> ---> (1)
>> > }
>> > }
>> >
>> > // At the end of the data, collect
>> >
>> > out.collect(...);   // Collect processed data
>>  ---> (2)
>> > }
>> > }
>> >
>> >   • (1) - Collector.collect invoked based on some condition after
>> processing few records
>> >   • (2) - Collector.collect invoked at the end of data
>> >
>> > Initially we thought of using flatMap instead of mapPartition, but the
>> collector is not available in close function.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>> case of chained drivers
>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>> >
>> > Note: Our application works with only finite set of data (Batch Mode)
>> >
>>
>>

-- 
best,
Zhipeng


Re: Removing unused flink-avro causes savepoint to fail loading

2022-02-15 Thread Chesnay Schepler
Indeed, when flink-avro is on the classpath we automatically register 1 
serializer with Kryo.


There is no switch to ignore this error or to exclude the Avro 
serializer somehow.
As such you'll either need to rewrite the savepoint, with either the 
state-processing-api or by creating a slightly modified version of Flink 
(multiple options on how to implement it; but it would always mean 
restoring once on the custom version with Avro on the classpath, taking 
a savepoint, and then going back to the original version without avro).


On 11/02/2022 10:01, David Causse wrote:

Hi,

While developing a job we mistakenly imported flink-avro as a 
dependency and then we did some cleanups. Sadly it seems that 
flink-avro has registered some kryo serializers that are now required 
to load the savepoints even though we do not use the functionalities 
offered by this module.

The error is (this is using flink 1.12.1):

java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkException: Could not restore 
operator state backend for 
StreamSource_b1a2a2523a4642215643a6a4e58f0d05_(1/1) from any of the 1 
provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)

        ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore operator state backend
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:607)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

        ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.avro.generic.GenericData$Array'
        at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:186)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:90)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
        at 
java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
        at 
java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
        at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:221)
        at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:80)
        at 

Re: flink-netty-shuffle文件占满节点磁盘

2022-02-15 Thread Yingjie Cao
磁盘占满报的错误是什么呢?是iNode不够用还是磁盘空间不够用呢?我理解这个是个目录吧:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa,是说这个目录太多了没被清理掉,导致iNode不足,还是说这个目录下的文件没被清理干净,导致磁盘空间被占满呢?如果作业停掉,会恢复吗(也就是说是说作业本身就是要用这么多磁盘,还是有泄露,即使job停掉数据依然在呢)?
另外就是作业用的什么版本的flink呢?建议开一下数据压缩,如果作业本身确实需要的磁盘空间很大,那压缩应该有利于节省磁盘空间,另外默认是hash
shuffle的实现,文件很多,会占用很多的iNode,容易导致报磁盘空间不足的错误。有一个文档可以参考下:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/batch/blocking_shuffle/
。不过这个文档的内容还没有更新成最新的,估计明天会更新,如果着急的话,也可以直接参考下源代码中的描述:
https://github.com/apache/flink/blob/master/docs/content.zh/docs/ops/batch/blocking_shuffle.md
。

Bai XiangHui(智能平台)  于2022年2月14日周一 17:49写道:

> 各位老师好:
> 执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
> 文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
> 说明:
>
> 1.  批处理模式
> 2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx
> \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘
>
>
> 3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用
>
>
>
> String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
> String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> DataStream blackListStream =
> env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));
>
> MapStateDescriptor type =
> new MapStateDescriptor("blackList_type",
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
> BroadcastStream blackList_b =
> blackListStream.broadcast(type);
>
> DataStream>
> oneDayLog = env.readTextFile(oneDayLogFile)
> .map(new MapFunction String, String, String>>() {
> @Override
> public Tuple5
> map(String line) throws Exception {
> String[] arrs = line.split("\t");
> return new Tuple5<>(arrs[0], arrs[1], arrs[2],
> arrs[3], arrs[4]);
> }
> });
>
> SingleOutputStreamOperator String>> dayOutput = env.readTextFile(historyFileName)
> .flatMap(new FlatParseLong())
> .union(oneDayLog)
> .connect(blackList_b)
> .process(new BroadcastProcessFunction String, String, String, String>, String, Tuple5 String, String>>() {
> private transient ReadOnlyBroadcastState Boolean> broadcastState;
>
> @Override
> public void processElement(Tuple5 String, String, String> value, ReadOnlyContext ctx,
> Collector> out) throws
> Exception {
> if(broadcastState == null){
> broadcastState = ctx.getBroadcastState(type);
> }
> if(value!=null &&
> !broadcastState.contains(value.f0)){
> out.collect(value);
> }
> }
> @Override
> public void processBroadcastElement(String value,
> Context ctx, Collector> out)
> throws Exception {
> if(StringUtils.isNotEmpty(value)){
> BroadcastState broadcastState
> = ctx.getBroadcastState(type);
> broadcastState.put(value, true);
> }
> }
> });
>
>


Re: "No operators defined in streaming topology" error when Flink app still starts successfully

2022-02-15 Thread Chesnay Schepler
When you call env.execute() the StreamExecutionEnvironment is being 
reset, clearing all sources/transformations from it.
That's why env.getExecutionPlan() complains; there aren't any operations 
so a plan cannot be created.


You need to create the execution plan before calling execute().

String executionPlan = env.getExecutionPlan();
env.execute();
logger.info("Started job; executionPlan={}", getExecutionPlan);

On 14/02/2022 17:40, Shane Bishop wrote:

Hi all,

My team has started seeing the error "java.lang.IllegalStateException: 
No operators defined in streaming topology. Cannot execute." However, 
even with this error, the Flink application starts and runs fine, and 
the Flink job renders fine in the Flink Dashboard.


Attached is the full stacktrace.

This error comes from when we call 
StreamExecutionEnvironment#getExecutionPlan(). In the code snippet 
below, we call this method on the last line of the snippet.


From poking around online, I found 
https://stackoverflow.com/questions/54977290/flink-no-operators-defined-in-streaming-topology-cannot-execute, 
which suggests the problem could be that we don't set a sink, but in 
the code below you will see we do set a sink (just maybe not in a way 
that getExecutionPlan() expects).


Can this be safely ignored? Is there something we can do so that 
getExecutionPlan() will work properly, or otherwise fix/suppress this 
error?


Below is the code (some portions have been redacted):

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


DataStream source =
    env.addSource(new RedactedType())
        .uid()
        .name()
        .shuffle();

DataStream> stream =
    AsyncDataStream.unorderedWait(source, new RedactedType(), 1, 
TimeUnit.MILLISECONDS)

        .uid()
        .name();

stream
    .flatMap(new RedactedType())
    .uid()
    .name()
    .flatMap(new RedactedType())
    .uid()
    .name()
    .shuffle()
    .addSink(new RedactedType()) // Set sink
    .uid()
    .name();

env.execute("");
logger.info("Started job; executionPlan={}", env.getExecutionPlan()); 
// line 66


Thanks,
Shane