Re: Regarding state access in UDF

2021-06-30 Thread Ingo Bürk
Hi Kai,

CheckpointedFunction is not an interface meant to be used with UDFs (in the
Table API / SQL sense[1]), but is rather an interface for DataStream
API[2]; the term "user-defined function" has a different meaning there. Did
you actually try it to see if it works? I'd be surprised it it did.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/


Ingo

On Thu, Jul 1, 2021 at 5:17 AM Kai Fu  wrote:

> Hi Ingo,
>
> Thank you for the reply, we actually need more fine-grained control on the
> states in UDF. Per investigation, we found that the states can be simply
> created/accessed via implementing `CheckpointedFunction` interface, please
> advise if there is any side-effect by doing that.
>
> On Wed, Jun 30, 2021 at 10:33 PM Ingo Bürk  wrote:
>
>> Hi Kai,
>>
>> AggregateFunction and TableAggregateFunction are both stateful UDF
>> interfaces. This should cover most scenarios given where they would be
>> used. If you need more fine-grained control you can also always drop down
>> into the DataStream API (using #toDataStream) and work there. Table API /
>> SQL in general are higher-level abstractions where you cannot directly
>> interact with operators.
>>
>> If this doesn't answer your question it would also be great if you could
>> explain your use case more so we can understand it. Thanks!
>>
>>
>> Best
>> Ingo
>>
>> On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:
>>
>>> Hi team,
>>>
>>> We've a use case that needs to create/access state in UDF, while per the
>>> documentation
>>> 
>>> and UDF interface
>>> .
>>> It does not provide such a way for that. We want to know if it is by design
>>> and is there any other approach for it.
>>>
>>> --
>>> *Best wishes,*
>>> *- Kai*
>>>
>>
>
> --
> *Best wishes,*
> *- Kai*
>


Re: Converting Table API query to Datastream API

2021-06-30 Thread Le Xu
Excellent! I'll give it a try.

Thanks!

Le

On Wed, Jun 30, 2021 at 10:14 PM JING ZHANG  wrote:

> Hi Le,
> AFAIK, there are following ways to look deeper into the job, hope it helps.
> Before executing the job,
> 1. Use `explain` statement to explain the logical and optimized plans of a
> query[1].
> 2. Use `ExecutionEnvironment#getExecutionPlan` to print the
> execution plan, And use `visualizer` to visualize the execution plan [2].
> After executing the job,
> 1. Find jobGraph in Flink web UI.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/explain/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_plans/
>
> Best regards,
> JING ZHANG
>
> Le Xu  于2021年7月1日周四 上午5:51写道:
>
>> Thanks -- Is there a way to quickly visualize the Stream operator DAG
>> generated by the TableAPI/SQL queries?
>>
>> Le
>>
>> On Tue, Jun 29, 2021 at 9:34 PM JING ZHANG  wrote:
>>
>>> Hi Le,
>>> link
>>> 
>>>  is
>>> a bit outdated. Since Flink 1.9 version, TableAPI  & SQL is no longer
>>> translated to DataStream API. TableAPI & SQL and DataStream are at the same
>>> level, and both translated into StreamOperator DAG.
>>>
>>> Best regards,
>>> JING ZHANG
>>>
>>> Le Xu  于2021年6月30日周三 上午1:11写道:
>>>
 Hello! I have a basic question about the concept of using Flink Table
 API.
 Based on the link
 
 here it seems like if I implement stream query with Table API the program
 is translated to datastream API eventually during execution. But is there a
 way to visualize how the datastream program looks like

 Thanks,

 Le

>>>


Re: Regarding state access in UDF

2021-06-30 Thread Kai Fu
Hi Ingo,

Thank you for the reply, we actually need more fine-grained control on the
states in UDF. Per investigation, we found that the states can be simply
created/accessed via implementing `CheckpointedFunction` interface, please
advise if there is any side-effect by doing that.

On Wed, Jun 30, 2021 at 10:33 PM Ingo Bürk  wrote:

> Hi Kai,
>
> AggregateFunction and TableAggregateFunction are both stateful UDF
> interfaces. This should cover most scenarios given where they would be
> used. If you need more fine-grained control you can also always drop down
> into the DataStream API (using #toDataStream) and work there. Table API /
> SQL in general are higher-level abstractions where you cannot directly
> interact with operators.
>
> If this doesn't answer your question it would also be great if you could
> explain your use case more so we can understand it. Thanks!
>
>
> Best
> Ingo
>
> On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:
>
>> Hi team,
>>
>> We've a use case that needs to create/access state in UDF, while per the
>> documentation
>> 
>> and UDF interface
>> .
>> It does not provide such a way for that. We want to know if it is by design
>> and is there any other approach for it.
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


Re: Jobmanagers are in a crash loop after upgrade from 1.12.2 to 1.13.1

2021-06-30 Thread Zhu Zhu
Hi Shilpa,

JobType was introduced in 1.13. So I guess the cause is that the client
which creates and submit
the job is still 1.12.2. The client generates a outdated job graph which
does not have its JobType
set and resulted in this NPE problem.

Thanks,
Zhu

Austin Cawley-Edwards  于2021年7月1日周四 上午1:54写道:

> Hi Shilpa,
>
> Thanks for reaching out to the mailing list and providing those logs! The
> NullPointerException looks odd to me, but in order to better guess what's
> happening, can you tell me a little bit more about what your setup looks
> like? How are you deploying, i.e., standalone with your own manifests, the
> Kubernetes integration of the Flink CLI, some open-source operator, etc.?
>
> Also, are you using a High Availability setup for the JobManager?
>
> Best,
> Austin
>
>
> On Wed, Jun 30, 2021 at 12:31 PM Shilpa Shankar 
> wrote:
>
>> Hello,
>>
>> We have a flink session cluster in kubernetes running on 1.12.2. We
>> attempted an upgrade to v 1.13.1, but the jobmanager pods are continuously
>> restarting and are in a crash loop.
>>
>> Logs are attached for reference.
>>
>> How do we recover from this state?
>>
>> Thanks,
>> Shilpa
>>
>


Re: Converting Table API query to Datastream API

2021-06-30 Thread JING ZHANG
Hi Le,
AFAIK, there are following ways to look deeper into the job, hope it helps.
Before executing the job,
1. Use `explain` statement to explain the logical and optimized plans of a
query[1].
2. Use `ExecutionEnvironment#getExecutionPlan` to print the execution plan,
And use `visualizer` to visualize the execution plan [2].
After executing the job,
1. Find jobGraph in Flink web UI.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/explain/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_plans/

Best regards,
JING ZHANG

Le Xu  于2021年7月1日周四 上午5:51写道:

> Thanks -- Is there a way to quickly visualize the Stream operator DAG
> generated by the TableAPI/SQL queries?
>
> Le
>
> On Tue, Jun 29, 2021 at 9:34 PM JING ZHANG  wrote:
>
>> Hi Le,
>> link
>> 
>>  is
>> a bit outdated. Since Flink 1.9 version, TableAPI  & SQL is no longer
>> translated to DataStream API. TableAPI & SQL and DataStream are at the same
>> level, and both translated into StreamOperator DAG.
>>
>> Best regards,
>> JING ZHANG
>>
>> Le Xu  于2021年6月30日周三 上午1:11写道:
>>
>>> Hello! I have a basic question about the concept of using Flink Table
>>> API.
>>> Based on the link
>>> 
>>> here it seems like if I implement stream query with Table API the program
>>> is translated to datastream API eventually during execution. But is there a
>>> way to visualize how the datastream program looks like
>>>
>>> Thanks,
>>>
>>> Le
>>>
>>


OutOfMemory Failure on Savepoint

2021-06-30 Thread Abhishek SP
Hello,

I am observing a failure whenever I trigger a savepoint on my Flink
Application which otherwise runs without issues

The app is deployed via AWS KDA(Kubernetes) with 256 KPU(6 Task managers
with 43 slots each. 1 KPU = 1 vCPU, 4GB Memory, and 50GB Diskspace. It uses
RocksDB backend)

The savepoint completes successfully with a larger cluster 512 KPU. The
savepoint size is about 150 GB which should fit easily within 256 KPU app
as well.

I suspect that there is a resource leak somewhere but the number of threads
and heap memory usage look normal(under 50%).

How should I go about debugging the issue and what other metrics should I
be looking at?
Note that the failure occurs only when a savepoint is triggered

For Job Graph and full exception:
Ref:
https://stackoverflow.com/questions/68077200/flink-application-failure-on-savepoint


Thank you

Best,
Abhishek


Re: Converting Table API query to Datastream API

2021-06-30 Thread Le Xu
Thanks -- Is there a way to quickly visualize the Stream operator DAG
generated by the TableAPI/SQL queries?

Le

On Tue, Jun 29, 2021 at 9:34 PM JING ZHANG  wrote:

> Hi Le,
> link
> 
>  is
> a bit outdated. Since Flink 1.9 version, TableAPI  & SQL is no longer
> translated to DataStream API. TableAPI & SQL and DataStream are at the same
> level, and both translated into StreamOperator DAG.
>
> Best regards,
> JING ZHANG
>
> Le Xu  于2021年6月30日周三 上午1:11写道:
>
>> Hello! I have a basic question about the concept of using Flink Table
>> API.
>> Based on the link
>> 
>> here it seems like if I implement stream query with Table API the program
>> is translated to datastream API eventually during execution. But is there a
>> way to visualize how the datastream program looks like
>>
>> Thanks,
>>
>> Le
>>
>


Re: RocksDB MapState debugging key serialization

2021-06-30 Thread Thomas Breloff
Thanks Yuval.  Indeed it was a serialization issue.  I followed the 
instructions in the docs to set up a local test environment with RocksDB that I 
was able to set a breakpoint in and step through.

I discovered that my key was not properly registered with the Kryo serializer 
and the default FieldSerializer was not producing byte-wise equal 
serializations.

Thanks for the prompt response!
Tom

From: Yuval Itzchakov 
Date: Wednesday, June 30, 2021 at 12:56 PM
To: Thomas Breloff 
Cc: user@flink.apache.org 
Subject: Re: RocksDB MapState debugging key serialization
Here is what the documentation on 
RocksDBStateBackend
 says:

The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB database that 
is (per default) stored in the TaskManager local data directories.
Unlike storing java objects in HashMapStateBackend, data is stored as 
serialized byte arrays, which are mainly defined by the type serializer, 
resulting in key comparisons being byte-wise instead of using Java’s hashCode() 
and equals() methods.

This means that if your keys are not byte-wise equivalent, they won't be 
matched.

On Wed, Jun 30, 2021 at 7:37 PM Thomas Breloff mailto:t...@ec.ai>> 
wrote:
Hello,
I am having trouble with a Flink job which is configured using a RocksDB state 
backend.

Tl;dr: How can I debug the key serialization for RocksDB MapState for a 
deployed Flink job?

Details:

When I “put” a key/value pair into a MapState, and then later try to “get” 
using a key which has the same hashCode/equals as what I put in, I get back 
“null”.

Some things I have verified:


  *   Looping over the “keys()” or “entries()” of the MapState contains the 
expected key (which matches both hashCode and equals)
  *   If I “put” the same key that I’m attempting to “get” with, and then look 
at the “entries”, then both of the keys appear in the map.

I think I understand that the RocksDB version of MapState will use the 
serialized keys, however I have tested what I think is the serializer and it 
returns the same serialization for both objects.

How can I find out the serialized values that are being used for key 
comparison? Can you recommend any possible solutions or debugging strategies 
that would help?

Thank you,
Tom


--
Best Regards,
Yuval Itzchakov.


Re: Jobmanagers are in a crash loop after upgrade from 1.12.2 to 1.13.1

2021-06-30 Thread Austin Cawley-Edwards
Hi Shilpa,

Thanks for reaching out to the mailing list and providing those logs! The
NullPointerException looks odd to me, but in order to better guess what's
happening, can you tell me a little bit more about what your setup looks
like? How are you deploying, i.e., standalone with your own manifests, the
Kubernetes integration of the Flink CLI, some open-source operator, etc.?

Also, are you using a High Availability setup for the JobManager?

Best,
Austin


On Wed, Jun 30, 2021 at 12:31 PM Shilpa Shankar 
wrote:

> Hello,
>
> We have a flink session cluster in kubernetes running on 1.12.2. We
> attempted an upgrade to v 1.13.1, but the jobmanager pods are continuously
> restarting and are in a crash loop.
>
> Logs are attached for reference.
>
> How do we recover from this state?
>
> Thanks,
> Shilpa
>


Re: RocksDB MapState debugging key serialization

2021-06-30 Thread Yuval Itzchakov
Here is what the documentation on RocksDBStateBackend

says:

The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB database
that is (per default) stored in the TaskManager local data directories.
Unlike storing java objects in HashMapStateBackend, data is stored as
serialized byte arrays, which are mainly defined by the type
serializer, *resulting
in key comparisons being byte-wise instead of using Java’s hashCode() and
equals() methods.*

This means that if your keys are not byte-wise equivalent, they won't be
matched.

On Wed, Jun 30, 2021 at 7:37 PM Thomas Breloff  wrote:

> Hello,
>
> I am having trouble with a Flink job which is configured using a RocksDB
> state backend.
>
>
>
> Tl;dr: How can I debug the key serialization for RocksDB MapState for a
> deployed Flink job?
>
>
>
> Details:
>
>
>
> When I “put” a key/value pair into a MapState, and then later try to “get”
> using a key which has the same hashCode/equals as what I put in, I get back
> “null”.
>
>
>
> Some things I have verified:
>
>
>
>- Looping over the “keys()” or “entries()” of the MapState contains
>the expected key (which matches both hashCode and equals)
>- If I “put” the same key that I’m attempting to “get” with, and then
>look at the “entries”, then both of the keys appear in the map.
>
>
>
> I think I understand that the RocksDB version of MapState will use the
> serialized keys, however I have tested what I think is the serializer and
> it returns the same serialization for both objects.
>
>
>
> How can I find out the serialized values that are being used for key
> comparison? Can you recommend any possible solutions or debugging
> strategies that would help?
>
>
>
> Thank you,
>
> Tom
>


-- 
Best Regards,
Yuval Itzchakov.


RocksDB MapState debugging key serialization

2021-06-30 Thread Thomas Breloff
Hello,
I am having trouble with a Flink job which is configured using a RocksDB state 
backend.

Tl;dr: How can I debug the key serialization for RocksDB MapState for a 
deployed Flink job?

Details:

When I “put” a key/value pair into a MapState, and then later try to “get” 
using a key which has the same hashCode/equals as what I put in, I get back 
“null”.

Some things I have verified:


  *   Looping over the “keys()” or “entries()” of the MapState contains the 
expected key (which matches both hashCode and equals)
  *   If I “put” the same key that I’m attempting to “get” with, and then look 
at the “entries”, then both of the keys appear in the map.

I think I understand that the RocksDB version of MapState will use the 
serialized keys, however I have tested what I think is the serializer and it 
returns the same serialization for both objects.

How can I find out the serialized values that are being used for key 
comparison? Can you recommend any possible solutions or debugging strategies 
that would help?

Thank you,
Tom


Re: Job Recovery Time on TM Lost

2021-06-30 Thread Lu Niu
Thanks Gen! cc flink-dev to collect more inputs.

Best
Lu

On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:

> I'm also wondering here.
>
> In my opinion, it's because the JM can not confirm whether the TM is lost
> or it's a temporary network trouble and will recover soon, since I can see
> in the log that akka has got a Connection refused but JM still sends a
> heartbeat request to the lost TM until it reaches heartbeat timeout. But
> I'm not sure if it's indeed designed like this.
>
> I would really appreciate it if anyone who knows more details could
> answer. Thanks.
>


Jobmanagers are in a crash loop after upgrade from 1.12.2 to 1.13.1

2021-06-30 Thread Shilpa Shankar
Hello,

We have a flink session cluster in kubernetes running on 1.12.2. We
attempted an upgrade to v 1.13.1, but the jobmanager pods are continuously
restarting and are in a crash loop.

Logs are attached for reference.

How do we recover from this state?

Thanks,
Shilpa
2021-06-30 16:03:25,965 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 
a1fa9416058026ed7dffeafaf7c21c81 failed.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source) ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
at 

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-30 Thread tao xiao
Hi team,

Does anyone have a clue?

On Mon, Jun 28, 2021 at 3:27 PM tao xiao  wrote:

> My job is very simple as you can see from the code I pasted. I simply
> print out the number to stdout. If you look at the log the number continued
> to print out after checkpoint 1 which indicated no back pressure was
> happening.  It is very easy to reproduce this if you run the code I
> provided in IDE
>
>
> LOG
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
> Checkpoint was declined.
> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> [flink-runtime_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> [flink-runtime_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
> Caused by: org.apache.flink.util.SerializedThrowable: npe
> at
> com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
> ~[classes/:?]
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> 

Re: Protobuf + Confluent Schema Registry support

2021-06-30 Thread Austin Cawley-Edwards
Hi Vishal,

I don't believe there is another way to solve the problem currently besides
rolling your own serializer.

For the Avro + Schema Registry format, is this Table API format[1] what
you're referring to? It doesn't look there have been discussions around
adding a similar format for Protobuf yet, but perhaps you could start one
based on the avro one[2]?

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
[2]:
https://issues.apache.org/jira/browse/FLINK-11160?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22avro%20schema%22


On Wed, Jun 30, 2021 at 4:50 AM Vishal Surana  wrote:

> Using the vanilla kafka producer, I can write protobuf messages to kafka
> while leveraging schema registry support as well. A flink kafka producer
> requires us to explicity provide a serializer which converts the message to
> a producerrecord containing the serialized bytes of the message. We can't
> make use of the KafkaProtoobufSerializer[T] provided by Confluent. Thus the
> only way I could think of would be to create an instance of
> KafkaProtobufSerializer inside a FlinkSerializationSchema class and use it
> to serialize my messages. The problem with that would be that I would have
> to implement registration of the schema and other tasks done by
> KafkaProtobufSerializer.
>
> Is there any other way to solve this problem?
> Is there a plan to support protobuf serialization along with schema
> registry support?
> I noticed you've recently added Avro + Schema Registry support to your
> codebase but haven't documented it. Is it ready for use?
>


Re: Regarding state access in UDF

2021-06-30 Thread Ingo Bürk
Hi Kai,

AggregateFunction and TableAggregateFunction are both stateful UDF
interfaces. This should cover most scenarios given where they would be
used. If you need more fine-grained control you can also always drop down
into the DataStream API (using #toDataStream) and work there. Table API /
SQL in general are higher-level abstractions where you cannot directly
interact with operators.

If this doesn't answer your question it would also be great if you could
explain your use case more so we can understand it. Thanks!


Best
Ingo

On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:

> Hi team,
>
> We've a use case that needs to create/access state in UDF, while per the
> documentation
> 
> and UDF interface
> .
> It does not provide such a way for that. We want to know if it is by design
> and is there any other approach for it.
>
> --
> *Best wishes,*
> *- Kai*
>


Regarding state access in UDF

2021-06-30 Thread Kai Fu
Hi team,

We've a use case that needs to create/access state in UDF, while per the
documentation

and UDF interface
.
It does not provide such a way for that. We want to know if it is by design
and is there any other approach for it.

-- 
*Best wishes,*
*- Kai*


Protobuf + Confluent Schema Registry support

2021-06-30 Thread Vishal Surana
Using the vanilla kafka producer, I can write protobuf messages to kafka while 
leveraging schema registry support as well. A flink kafka producer requires us 
to explicity provide a serializer which converts the message to a 
producerrecord containing the serialized bytes of the message. We can't make 
use of the KafkaProtoobufSerializer[T] provided by Confluent. Thus the only way 
I could think of would be to create an instance of KafkaProtobufSerializer 
inside a FlinkSerializationSchema class and use it to serialize my messages. 
The problem with that would be that I would have to implement registration of 
the schema and other tasks done by KafkaProtobufSerializer. 

Is there any other way to solve this problem? 
Is there a plan to support protobuf serialization along with schema registry 
support?
I noticed you've recently added Avro + Schema Registry support to your codebase 
but haven't documented it. Is it ready for use? 


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-30 Thread Stephan Ewen
Hi Sonam!

To answer this, let me quickly provide some background on the two ways
flink deployments / job submissions work.
See also here for some background:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#deployment-modes

What is common in all setups is that the query compilation / the dataflow
assembly happens where the entry-point program runs.

If you are programatically setting up the application with a
StreamExecutionEnvironment / TableEnvironment, then the query
compilation (and JobGraph generation) happens where the program's
main()-method is. If you are submitting via the SQL Client, then the SQL
Client is the entrypoint program, and the query compilation happens where
the SQLClient runs.

Now, where is that entry-point program executed if you deploy a job? That
depends on your deployment mode.

(1) Session Mode:

Here you have a running cluster with a Dispatcher that has the REST
endpoint that accepts job submissions. Jobs are submitted via HTTP
transporting a serialized JobGraph.
The entry-point can run anywhere and uses a HTTP client to send the
generated JobGraph to the Dispatcher.

==> Here you need to worry about matching the versions of the client (the
entry-point process, like SQL Client) and the deployed session cluster.

(2) Application Mode:

The entry-point (SQL Client or application program) spawns the JobManager
when the program is executed. The jobgraph is passed as a Java object
directly to the spawned JM component. The is an HTTP endpoint, but it is
not for submitting jobs, only for the Web UI and for commands like
cancelling execution.

This mode should allow you to encapsulate a Flink application (a SQL query)
completely self-contained and not need to sync versions between clients and
clusters.



The internal abstraction for all ways to execute the programs are the
PipelineExecutors.
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java

If you look at the subclasses of "PipelineExecutor" you basically see all
built-in deployment modes. To create a customized version of the
Application deployment mode, (or maybe the Job deployment mode) you can dig
for example through the EmbeddedExecutor and
the ApplicationDispatcherBootstrap.

Hope that helps...

Best,
Stephan



On Tue, Jun 29, 2021 at 5:01 PM Sonam Mandal  wrote:

> Hi Matthias,
>
> Thanks for getting back to me. We are trying to build a system where users
> can focus on writing Flink SQL applications and we handle the full
> lifecycle of their Flink cluster and job. We would like to let users focus
> on just their SQL and UDF logic. In such an environment, we cannot enforce
> that all users must use a single Flink version. We intend to have this
> setup in kubernetes where within the same kubernetes cluster we can create
> multiple Flink clusters to which jobs are submitted.
>
> Due to this, using an interactive shell will not be an option, nor do we
> want to directly expose this to users except for testing purposes.
>
> I see the latest 1.13 release now has an option to pass a SQL file as
> input to the SQL client and it’ll take care of running the job. We will
> explore this option as well. I believe this is a new feature which wasn’t
> available in 1.12, right? Does the planning happen in the SQL client or on
> the job manager? We ran into issues with job graph incompatibility if our
> code directly submitted the SQL to the remote environment or if we used
> /bin/flink to run this jar that does the SQL conversion.
>
> We currently have a POC idea which takes the SQL as a file and we wrote a
> simple job runner which reads this SQL and executes it. We are using Flink
> REST APIs to upload this jar and submit the job so that the job graph
> generation happens on the job manager. We no longer see the job graph
> incompatibility issues.
>
> Is there any reason not to use the above approach? We noticed that the
> Flink client (/bin/flink) does job graph generation  itself and not via the
> REST API, any reason why it doesn’t leverage the REST API?
>
> Nice thing about using REST is that we can now run multiple Flink cluster
> versions and our job submission code doesn’t need to know which flink
> client version to use.
>
> We definitely saw this job graph incompatibility with /bin/flink. We still
> need to test out the sql client with the -f option to assess whether we
> will require keeping multiple versions around should we decide to use this
> option. So we were wondering what the recommendation is within the Flink
> community on how to handle such cases. Hope this clarifies our use case
> better.
>
> Also, as for the state incompatibility between major Flink versions, I see
> the thread mentions using a tool to rewrite the savepoints. Is this the
> only recommended way to handle this? Is this safe and does it work in all
> scenarios?
>
> Thanks,
> Sonam
>
>
> --
> *From:* 

回复:普通表join版本表,怎么得到append表

2021-06-30 Thread silence
目前interval join和维表的时态join不会进行回撤,其他场景会产生回撤数据


--
发件人:杨光跃 
发送时间:2021年6月30日(星期三) 17:47
收件人:user-zh@flink.apache.org 
主 题:普通表join版本表,怎么得到append表

大佬们,请教个问题,
insert into sink_2 
select a.`time`,c.cust,b.mobile
from case2_TOPIC_A a
left join card_data b on a.card = b.card
left join view_new_card_info c on a.card = c.card;




case2_TOPIC_A  是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。  
为什么提交的时候要求 
please declare primary key for sink table when query contains update/delete 
record.


我这个只需要追加就可以了吧,该怎么处理呢?


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制



普通表join版本表,怎么得到append表

2021-06-30 Thread 杨光跃
大佬们,请教个问题,
insert into sink_2 
select a.`time`,c.cust,b.mobile
from case2_TOPIC_A a
left join card_data b on a.card = b.card
left join view_new_card_info c on a.card = c.card;




case2_TOPIC_A  是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。  
为什么提交的时候要求 
please declare primary key for sink table when query contains update/delete 
record.


我这个只需要追加就可以了吧,该怎么处理呢?


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制

回复:flink sql 空闲数据源场景如何配置

2021-06-30 Thread 杨光跃
大佬,再请教个问题,
insert into sink_2 
select a.`time`,c.cust,b.mobile
from case2_TOPIC_A a
left join card_data b on a.card = b.card
left join view_new_card_info c on a.card = c.card;




case2_TOPIC_A  是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。  
为什么提交的时候要求 
 please declare primary key for sink table when query contains update/delete 
record.


我这个只需要追加就可以了吧,该怎么处理呢?


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月30日 15:44,杨光跃 写道:


收到了,谢谢。
在sql-client里面执行 :  set table.exec.source.idle-timeout = 10s;
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月30日 15:36,silence 写道:
可参考 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout


--
发件人:杨光跃 
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org 
主 题:flink sql 空闲数据源场景如何配置

在代码中可以通过  .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制

Re: NoSuchMethodError - getColumnIndexTruncateLength after upgrading Flink from 1.11.2 to 1.12.1

2021-06-30 Thread Matthias Pohl
Dependending on the build system used, you could check the dependency tree,
e.g. for Maven it would be `mvn dependency:tree
-Dincludes=org.apache.parquet`

Matthias

On Wed, Jun 30, 2021 at 8:40 AM Thomas Wang  wrote:

> Thanks Matthias. Could you advise how I can confirm this in my environment?
>
> Thomas
>
> On Tue, Jun 29, 2021 at 1:41 AM Matthias Pohl 
> wrote:
>
>> Hi Rommel, Hi Thomas,
>> Apache Parquet was bumped from 1.10.0 to 1.11.1 for Flink 1.12 in
>> FLINK-19137 [1]. The error you're seeing looks like some dependency issue
>> where you have a version other than 1.11.1
>> of org.apache.parquet:parquet-column:jar on your classpath?
>>
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19137
>>
>> On Wed, Jun 23, 2021 at 1:50 AM Rommel Holmes 
>> wrote:
>>
>>> To give more information
>>>
>>> parquet-avro version 1.10.0 with Flink 1.11.2 and it was running fine.
>>>
>>> now Flink 1.12.1, the error msg shows up.
>>>
>>> Thank you for help.
>>>
>>> Rommel
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jun 22, 2021 at 2:41 PM Thomas Wang  wrote:
>>>
 Hi,

 We recently upgraded our Flink version from 1.11.2 to 1.12.1 and one of
 our jobs that used to run ok, now sees the following error. This error
 doesn't seem to be related to any user code. Can someone help me take a
 look?

 Thanks.

 Thomas

 java.lang.NoSuchMethodError:
 org.apache.parquet.column.ParquetProperties.getColumnIndexTruncateLength()I
 at
 org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:282)
 ~[?:?]
 at
 org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
 ~[?:?]
 at
 org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:90)
 ~[?:?]
 at
 org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forGenericRecord$abd75386$1(ParquetAvroWriters.java:65)
 ~[?:?]
 at
 org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
 ~[?:?]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:75)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at
 org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 

Re: Job Recovery Time on TM Lost

2021-06-30 Thread Gen Luo
I'm also wondering here.

In my opinion, it's because the JM can not confirm whether the TM is lost
or it's a temporary network trouble and will recover soon, since I can see
in the log that akka has got a Connection refused but JM still sends a
heartbeat request to the lost TM until it reaches heartbeat timeout. But
I'm not sure if it's indeed designed like this.

I would really appreciate it if anyone who knows more details could answer.
Thanks.


回复:flink sql 空闲数据源场景如何配置

2021-06-30 Thread 杨光跃


收到了,谢谢。 
在sql-client里面执行 :  set table.exec.source.idle-timeout = 10s; 
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月30日 15:36,silence 写道:
可参考 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout


--
发件人:杨光跃 
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org 
主 题:flink sql 空闲数据源场景如何配置

在代码中可以通过  .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制

回复:flink sql 空闲数据源场景如何配置

2021-06-30 Thread silence
可参考 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout


--
发件人:杨光跃 
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org 
主 题:flink sql 空闲数据源场景如何配置

在代码中可以通过  .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制

Re: How can I tell if a record in a bounded job is the last record?

2021-06-30 Thread Yik San Chan
Hi Paul,

Thanks for the suggestion, this sounds like a nice solution. I will give it
a shot.

Best,
Yik San

On Wed, Jun 30, 2021 at 2:26 PM Paul Lam  wrote:

> Hi Yik San,
>
> Maybe you could use watermark to trigger the last flush. Source operations
> will emit MAX_WATERMARK to trigger all the timers when it terminates (see
> [1]).
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
>
> Best,
> Paul Lam
>
> 2021年6月30日 10:38,Yik San Chan  写道:
>
> Hi community,
>
> I have a batch job that consumes records from a bounded source (e.g.,
> Hive), walk them through a BufferingSink as described in [docs](
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction).
> In the BufferingSink, I want to flush out records to the sink in
> 1000-record batches.
>
> Given the source is bounded, I will need to flush out all records when it
> comes to the end, otherwise records buffered (variable bufferedElements)
> will be lost.
>
> An obvious way of doing so is to flush out all records in the `close`
> method. That should work fine.
>
> However, I wonder if it's possible to tell if a record is the last record
> in the `invoke` method? In other words, how to implement the `isLastRecord`
> method below?
>
> ```java
> @Override public void invoke(Tuple2 value, Context
> context) throws Exception {
> bufferedElements.add(value);
> if (bufferedElements.size() == threshold || isLastRecord()) {
> for (Tuple2 element: bufferedElements) {
> // send it to the sink
> }
> bufferedElements.clear();
> }
> }
> ```
>
> Thanks!
>
> Best,
> Yik San
>
>
>


Re: Session cluster configmap removal

2021-06-30 Thread Yang Wang
Hi Sweta,

After FLINK-20695[1] is resolved, once the job in session cluster reached
to the globally terminal state(FAILED, CENCELED),
the HA related ConfigMap will be deleted.

So could you please have a try with Flink version 1.13.1, 1.12.5?

However, we still have some residual ConfigMaps(e.g. rest-server,
resourcemanager, dispatcher) for shared components in the session cluster.
They will be retained even though the session cluster is shutdown. This
will be fixed in FLINK-20219[2].

[1]. https://issues.apache.org/jira/browse/FLINK-20695
[2]. https://issues.apache.org/jira/browse/FLINK-20219

Best,
Yang



Sweta Kalakuntla  于2021年6月30日周三 上午3:14写道:

> Hi,
>
> We have flink session clusters in kubernetes and several long running
> flink jobs deployed in them with HA enabled. After we have enabled HA, we
> are seeing configmaps created for every new job. Whenever we stop/cancel
> any existing jobs, these configmaps do not get deleted. Is that right,
> these configmaps will not be removed unless we shutdown the cluster?
>
> Thanks,
> Sweta
>
>
>


Re: Yarn Application Crashed?

2021-06-30 Thread Piotr Nowojski
You are welcome :)

Piotrek

śr., 30 cze 2021 o 08:34 Thomas Wang  napisał(a):

> Thanks Piotr. This is helpful.
>
> Thomas
>
> On Mon, Jun 28, 2021 at 8:29 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> You should still be able to get the Flink logs via:
>>
>> > yarn logs -applicationId application_1623861596410_0010
>>
>> And it should give you more answers about what has happened.
>>
>> About the Flink and YARN behaviour, have you seen the documentation? [1]
>> Especially this part:
>>
>> > Failed containers (including the JobManager) are replaced by YARN. The
>> maximum number of JobManager container restarts is configured via
>> yarn.application-attempts (default 1). The YARN Application will fail once
>> all attempts are exhausted.
>>
>> ?
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/yarn/#flink-on-yarn-reference
>>
>> pon., 28 cze 2021 o 02:26 Thomas Wang  napisał(a):
>>
>>> Just found some additional info. It looks like one of the EC2 instances
>>> got terminated at the time the crash happened and this job had 7 Task
>>> Managers running on that EC2 instance. Now I suspect it's possible
>>> that when Yarn tried to migrate the Task Managers, there were no idle
>>> containers as this job was using like 99% of the entire cluster. However in
>>> that case shouldn't Yarn wait for containers to become available? I'm not
>>> quite sure how Flink would behave in this case. Could someone provide some
>>> insights here? Thanks.
>>>
>>> Thomas
>>>
>>> On Sun, Jun 27, 2021 at 4:24 PM Thomas Wang  wrote:
>>>
 Hi,

 I recently experienced a job crash due to the underlying Yarn
 application failing for some reason. Here is the only error message I saw.
 It seems I can no longer see any of the Flink job logs.

 Application application_1623861596410_0010 failed 1 times (global limit
 =2; local limit is =1) due to ApplicationMaster for attempt
 appattempt_1623861596410_0010_01 timed out. Failing the application.

 I was running the Flink job using the Yarn session mode with the
 following command.

 export HADOOP_CLASSPATH=`hadoop classpath` &&
 /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached

 I didn't have HA setup, but I believe the underlying Yarn application
 caused the crash because if, for some reason, the Flink job failed, the
 Yarn application should still survive. Please correct me if this is not the
 right assumption.

 My question is how I should find the root cause in this case and what's
 the recommended way to avoid this going forward?

 Thanks.

 Thomas

>>>


Re: NoSuchMethodError - getColumnIndexTruncateLength after upgrading Flink from 1.11.2 to 1.12.1

2021-06-30 Thread Thomas Wang
Thanks Matthias. Could you advise how I can confirm this in my environment?

Thomas

On Tue, Jun 29, 2021 at 1:41 AM Matthias Pohl 
wrote:

> Hi Rommel, Hi Thomas,
> Apache Parquet was bumped from 1.10.0 to 1.11.1 for Flink 1.12 in
> FLINK-19137 [1]. The error you're seeing looks like some dependency issue
> where you have a version other than 1.11.1
> of org.apache.parquet:parquet-column:jar on your classpath?
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-19137
>
> On Wed, Jun 23, 2021 at 1:50 AM Rommel Holmes 
> wrote:
>
>> To give more information
>>
>> parquet-avro version 1.10.0 with Flink 1.11.2 and it was running fine.
>>
>> now Flink 1.12.1, the error msg shows up.
>>
>> Thank you for help.
>>
>> Rommel
>>
>>
>>
>>
>>
>> On Tue, Jun 22, 2021 at 2:41 PM Thomas Wang  wrote:
>>
>>> Hi,
>>>
>>> We recently upgraded our Flink version from 1.11.2 to 1.12.1 and one of
>>> our jobs that used to run ok, now sees the following error. This error
>>> doesn't seem to be related to any user code. Can someone help me take a
>>> look?
>>>
>>> Thanks.
>>>
>>> Thomas
>>>
>>> java.lang.NoSuchMethodError:
>>> org.apache.parquet.column.ParquetProperties.getColumnIndexTruncateLength()I
>>> at
>>> org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:282)
>>> ~[?:?]
>>> at
>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
>>> ~[?:?]
>>> at
>>> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:90)
>>> ~[?:?]
>>> at
>>> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forGenericRecord$abd75386$1(ParquetAvroWriters.java:65)
>>> ~[?:?]
>>> at
>>> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
>>> ~[?:?]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:75)
>>> ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
>>> ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
>>> ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>>> at
>>> 

Re: Yarn Application Crashed?

2021-06-30 Thread Thomas Wang
Thanks Piotr. This is helpful.

Thomas

On Mon, Jun 28, 2021 at 8:29 AM Piotr Nowojski  wrote:

> Hi,
>
> You should still be able to get the Flink logs via:
>
> > yarn logs -applicationId application_1623861596410_0010
>
> And it should give you more answers about what has happened.
>
> About the Flink and YARN behaviour, have you seen the documentation? [1]
> Especially this part:
>
> > Failed containers (including the JobManager) are replaced by YARN. The
> maximum number of JobManager container restarts is configured via
> yarn.application-attempts (default 1). The YARN Application will fail once
> all attempts are exhausted.
>
> ?
>
> Best,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/yarn/#flink-on-yarn-reference
>
> pon., 28 cze 2021 o 02:26 Thomas Wang  napisał(a):
>
>> Just found some additional info. It looks like one of the EC2 instances
>> got terminated at the time the crash happened and this job had 7 Task
>> Managers running on that EC2 instance. Now I suspect it's possible
>> that when Yarn tried to migrate the Task Managers, there were no idle
>> containers as this job was using like 99% of the entire cluster. However in
>> that case shouldn't Yarn wait for containers to become available? I'm not
>> quite sure how Flink would behave in this case. Could someone provide some
>> insights here? Thanks.
>>
>> Thomas
>>
>> On Sun, Jun 27, 2021 at 4:24 PM Thomas Wang  wrote:
>>
>>> Hi,
>>>
>>> I recently experienced a job crash due to the underlying Yarn
>>> application failing for some reason. Here is the only error message I saw.
>>> It seems I can no longer see any of the Flink job logs.
>>>
>>> Application application_1623861596410_0010 failed 1 times (global limit
>>> =2; local limit is =1) due to ApplicationMaster for attempt
>>> appattempt_1623861596410_0010_01 timed out. Failing the application.
>>>
>>> I was running the Flink job using the Yarn session mode with the
>>> following command.
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath` &&
>>> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached
>>>
>>> I didn't have HA setup, but I believe the underlying Yarn application
>>> caused the crash because if, for some reason, the Flink job failed, the
>>> Yarn application should still survive. Please correct me if this is not the
>>> right assumption.
>>>
>>> My question is how I should find the root cause in this case and what's
>>> the recommended way to avoid this going forward?
>>>
>>> Thanks.
>>>
>>> Thomas
>>>
>>


Re: How can I tell if a record in a bounded job is the last record?

2021-06-30 Thread Paul Lam
Hi Yik San,

Maybe you could use watermark to trigger the last flush. Source operations will 
emit MAX_WATERMARK to trigger all the timers when it terminates (see [1]).

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

Best,
Paul Lam

> 2021年6月30日 10:38,Yik San Chan  写道:
> 
> Hi community,
> 
> I have a batch job that consumes records from a bounded source (e.g., Hive), 
> walk them through a BufferingSink as described in 
> [docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>  
> ).
>  In the BufferingSink, I want to flush out records to the sink in 1000-record 
> batches.
> 
> Given the source is bounded, I will need to flush out all records when it 
> comes to the end, otherwise records buffered (variable bufferedElements) will 
> be lost.
> 
> An obvious way of doing so is to flush out all records in the `close` method. 
> That should work fine.
> 
> However, I wonder if it's possible to tell if a record is the last record in 
> the `invoke` method? In other words, how to implement the `isLastRecord` 
> method below?
> 
> ```java
> @Override public void invoke(Tuple2 value, Context context) 
> throws Exception {
> bufferedElements.add(value);
> if (bufferedElements.size() == threshold || isLastRecord()) {
> for (Tuple2 element: bufferedElements) { 
> // send it to the sink 
> } 
> bufferedElements.clear(); 
> } 
> }
> ```
> 
> Thanks!
> 
> Best,
> Yik San



RE: FW: Hadoop3 with Flink

2021-06-30 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Yangze Guo,

Thanks for the reply. I am using flink in Kubernetes environment. Hence can you 
please suggest how to use hadoop3 with flink in k8s.

Regards,
Suchithra

-Original Message-
From: Yangze Guo  
Sent: Monday, June 28, 2021 3:16 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org
Subject: Re: FW: Hadoop3 with Flink

Sorry for the belated reply. In 1.12, you just need to make sure that the 
HADOOP_CLASSPATH environment variable is set up. For more details, please refer 
to [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/

Best,
Yangze Guo

On Mon, Jun 28, 2021 at 5:11 PM V N, Suchithra (Nokia - IN/Bangalore) 
 wrote:
>
> Hi,
>
> Can anyone please share inputs on this?
>
>
>
> Regards,
>
> Suchithra
>
>
>
> From: V N, Suchithra (Nokia - IN/Bangalore)
> Sent: Thursday, June 24, 2021 2:35 PM
> To: user@flink.apache.org
> Subject: Hadoop3 with Flink
>
>
>
> Hello,
>
>
>
> We are using Apache flink 1.12.3 and planning to use Hadoop 3 version. Could 
> you please suggest how to use Hadoop 3 with flink distribution.
>
>
>
> Regards,
>
> Suchithra
>
>
>
>